开箱即用!动态修改RocketMQ线程池

简介: 动态修改RocketMQ线程池,那我们如何才能够通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。另外,从功能性以及健壮性而言还有许多值得我们思考的地方,科宇通过查看线程池运行时指标、负载报警、配置日志管理来提高我们对线程池的管理。

前言

上一篇文章中提到了动态修改RocketMQ线程池,那我们如何才能够通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。

另外,从功能性以及健壮性而言还有许多值得我们思考的地方,科宇通过查看线程池运行时指标、负载报警、配置日志管理来提高我们对线程池的管理。

SpringBoot RocketMQ 实践

版本信息:

rocketmq-spring-boot-starter : 2.2.2

spring-boot-starter :2.3.2.RELEASE

为了满足业务需求,我们初始化三个 Consumer。

并且设置成同一个 Topic 和不同的消费者组 ConsumerGroup,线程池数量都设置为 20。

@Service@RocketMQMessageListener(topic="test-topic", consumerGroup="my-consumer_test-topic", consumeThreadNumber=20)
publicclassMessageConsumeimplementsRocketMQListener {
@OverridepublicvoidonMessage(Stringmessage) {
log.info(Thread.currentThread().getName());
log.info("Message: {}", message);
    }
}
@Service@RocketMQMessageListener(topic="test-topic", consumerGroup="my-consumer_test-topic_1", consumeThreadNumber=20)
publicclassMessageConsumeimplementsRocketMQListener {
@OverridepublicvoidonMessage(Stringmessage) {
log.info(Thread.currentThread().getName());
log.info("Message: {}", message);
    }
}
@Service@RocketMQMessageListener(topic="test-topic", consumerGroup="my-consumer_test-topic_2", consumeThreadNumber=20)
publicclassMessageConsumeimplementsRocketMQListener {
@OverridepublicvoidonMessage(Stringmessage) {
log.info(Thread.currentThread().getName());
log.info("Message: {}", message);
    }
}

生产者:

@GetMapping("/message/send")
publicStringsendMessage() {
IntStream.range(0, 1000).forEach(i-> {
rocketMQTemplate.convertAndSend("test-topic", newDate().toString());
    });
return"success";
}

执行结果:

我们可以看到三个消费者都监听到了Topic的消息,并进行了消费。但是这个时候,由于生产消息数量暴增,需要提高消费者的并行消费速度。

思考片刻,只有最常规的使用方式就是修改线程池后,重新启动消费者。但是目前生产正在跑着业务数据,重启消费者必定会造成大量数据堆积甚至是丢失。

这时候如果能有一个页面动态修改线程池数量,那一定是非常赞的!

如何进行动态修改 RocketMQ 线程数?

思路:

  1. 收集消费者端信息。
  2. 将消费者端的线程池实例注册到服务中。
  3. 定时心跳检查实例状态。
  4. 从容器中拿到对应线程池,调用原生方法进行参数修改。

流程图

实现逻辑:

第一步,收集消费者端信息。


第二步,通过DefaultRocketMQListenerContainer类获取到Bean对象的集合,将消费者执行器保存到容器中。

@OverridepublicvoidonApplicationEvent(ApplicationStartedEventevent) {
MapcontainerMap=ApplicationContextHolder.getBeansOfType(DefaultRocketMQListenerContainer.class);
try {
for (DefaultRocketMQListenerContainercontainer : containerMap.values()) {
DefaultMQPushConsumerdefaultMQPushConsumer=container.getConsumer();
if (defaultMQPushConsumer!=null) {
ConsumeMessageServiceconsumeMessageService=defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getConsumeMessageService();
ThreadPoolExecutorconsumeExecutor= (ThreadPoolExecutor) ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor");
rocketmqConsumeExecutor.put(container.getConsumerGroup(), consumeExecutor);
            }
        }
    } catch (Exceptionex) {
log.error("Failed to get RocketMQ thread pool.", ex);
    }
}


第三步,将消费者端实例注册到服务中。

第四步,调用接口将服务注册。

publicDiscoveryClient(HttpAgenthttpAgent, InstanceInfoinstanceInfo) {
    ...
register();
}
booleanregister() {
log.info("{}{} - registering service...", PREFIX, appPathIdentifier);
StringurlPath=BASE_PATH+"/apps/register/";
ResultregisterResult;
try {
registerResult=httpAgent.httpPostByDiscovery(urlPath, instanceInfo);
    } catch (Exceptionex) {
registerResult=Results.failure(ErrorCodeEnum.SERVICE_ERROR);
log.error("{}{} - registration failed: {}", PREFIX, appPathIdentifier, ex.getMessage());
    }
if (log.isInfoEnabled()) {
log.info("{}{} - registration status: {}", PREFIX, appPathIdentifier, registerResult.isSuccess() ?"success" : "fail");
    }
returnregisterResult.isSuccess();
}



第五步,定时心跳检查消费者实例状态。

第六步,初始化定时任务线程池,实现心跳线程,检查实例信息逻辑。

publicDiscoveryClient(HttpAgenthttpAgent, InstanceInfoinstanceInfo) {
// ...initScheduledTasks();
}
// 定时任务线程池privatevoidinitScheduledTasks() {
scheduler.scheduleWithFixedDelay(newHeartbeatThread(), 30, 30, TimeUnit.SECONDS);
}
// 心跳线程publicclassHeartbeatThreadimplementsRunnable {
@Overridepublicvoidrun() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp=System.currentTimeMillis();
        }
    }
}
privatebooleanrenew() {
ResultrenewResult;
try {
InstanceInfo.InstanceRenewinstanceRenew=newInstanceInfo.InstanceRenew()
                .setAppName(instanceInfo.getAppName())
                .setInstanceId(instanceInfo.getInstanceId())
                .setLastDirtyTimestamp(instanceInfo.getLastDirtyTimestamp().toString())
                .setStatus(instanceInfo.getStatus().toString());
// 检查实例信息是否正常renewResult=httpAgent.httpPostByDiscovery(BASE_PATH+"/apps/renew", instanceRenew);
if (Objects.equals(ErrorCodeEnum.NOT_FOUND.getCode(), renewResult.getCode())) {
longtimestamp=instanceInfo.setIsDirtyWithTime();
booleansuccess=register();
ThreadPoolAdapterRegisteradapterRegister=ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class);
adapterRegister.register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
            }
returnsuccess;
        }
returnrenewResult.isSuccess();
    } catch (Exceptionex) {
log.error(PREFIX+"{} - was unable to send heartbeat!", appPathIdentifier, ex);
returnfalse;
    }
}
// 注册实例信息publicvoidregister() {
MapthreadPoolAdapterMap=ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
ListthreadPoolAdapterCacheConfigs=getThreadPoolAdapterCacheConfigs(threadPoolAdapterMap);
doRegister(threadPoolAdapterCacheConfigs);
}

第七步,从容器中拿到对应线程池,调用原生方法进行参数修改。

最后,通过 threadPoolKey 拿到执行器,更新核心线程数和最大线程数。

@OverridepublicbooleanupdateThreadPool(ThreadPoolAdapterParameterthreadPoolAdapterParameter) {
StringthreadPoolKey=threadPoolAdapterParameter.getThreadPoolKey();
// 通过threadPoolKey拿到执行器ThreadPoolExecutorrocketMQConsumeExecutor=rocketmqConsumeExecutor.get(threadPoolKey);
if (rocketMQConsumeExecutor!=null) {
intoriginalCoreSize=rocketMQConsumeExecutor.getCorePoolSize();
intoriginalMaximumPoolSize=rocketMQConsumeExecutor.getMaximumPoolSize();
// 更新核心线程数和最大线程数rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize());
rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize());
log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()),
String.format(CHANGE_DELIMITER, originalMaximumPoolSize, rocketMQConsumeExecutor.getMaximumPoolSize()));
returntrue;
    }
log.warn("[{}] RocketMQ consuming thread pool not found.", threadPoolKey);
returnfalse;
}


我们在消费者端建立了三个消费者 Consumer,通过前端页面可以看到对应 RocketMQ 消费者线程池。

我们现在对第一个消费者进行参数的修改。

通过后台的日志,我们可以看到已经修改成功了。

2022-12-2615:35:35.472INFO9726--- [nio-8099-exec-2] c.h.s.s.c.ThreadPoolAdapterController    : [RocketMQ] Changethird-partythreadpooldata. key: my-consumer_test-topic, coreSize: 100, maximumSize: 1002022-12-2615:35:35.473INFO9726--- [nio-8099-exec-2] c.h.a.r.RocketMQThreadPoolAdapter        : [my-consumer_test-topic] RocketMQconsumptionthreadpoolparameterchange. coreSize: 20=>100, maximumSize: 20=>100

我们重新进行消息的生产,看看具体的效果,我们看到线程数量已经发生了变化/

利用这种方式,我们就可以解决常规重启系统来修改参数的问题。

上面我们主要利用了Hippo4j对RocketMQ线程池数量进行了修改,为了提高应用的健壮性,我们还有可以利用Hippo4j做到实时查看线程池运行时指标、负载报警、配置日志管理等事情。


如果大家对具体的代码实现感兴趣,欢迎大家到以下两个平台进行指导~

GitHub:https://github.com/opengoofy/hippo4j

Gitee:https://gitee.com/magegoofy/hippo4j

什么是 Hippo4j

为了避免大家重复的造轮子,大家只要在项目中引入 Hippo4j ,就能够进行 Dubbo、Hystrix、RabbitMQ、RocketMQ 等消费线程池运行时数据查看和线程数变更


Hippo4j 通过对 JDK 线程池增强,以及扩展三方框架底层线程池等功能,为业务系统提高线上运行保障能力。

提供以下功能支持:

  • 全局管控 - 管理应用线程池实例。
  • 动态变更 - 应用运行时动态变更线程池参数,包括不限于:核心、最大线程数、阻塞队列容量、拒绝策略等。
  • 通知报警 - 内置四种报警通知策略,线程池活跃度、容量水位、拒绝策略以及任务执行时间超长。
  • 运行监控 - 实时查看线程池运行时数据,最近半小时线程池运行数据图表展示。
  • 功能扩展 - 支持线程池任务传递上下文;项目关闭时,支持等待线程池在指定时间内完成任务。
  • 多种模式 - 内置两种使用模式:依赖配置中心无中间件依赖
  • 容器管理 - Tomcat、Jetty、Undertow 容器线程池运行时查看和线程数变更。
  • 框架适配 - Dubbo、Hystrix、RabbitMQ、RocketMQ 等消费线程池运行时数据查看和线程数变更。


快速开始

对于本地演示目的,请参阅Quick start

演示环境: http://console.hippo4j.cn/index.html


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 cobar JavaScript
读完 RocketMQ 源码,我学会了如何优雅的创建线程
读完 RocketMQ 源码,我学会了如何优雅的创建线程
读完 RocketMQ 源码,我学会了如何优雅的创建线程
|
消息中间件 Java 中间件
【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析
【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析
284 2
【Alibaba中间件技术系列】「RocketMQ技术专题」带你一起去探索RocketMQ服务架构的线程模型分析
|
消息中间件 cobar Java
读完 RocketMQ 源码,我学会了如何优雅的创建线程
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时、高可靠的消息发布与订阅服务。 这篇文章,笔者整理了 RocketMQ 源码中创建线程的几点技巧,希望大家读完之后,能够有所收获。
读完 RocketMQ 源码,我学会了如何优雅的创建线程
|
消息中间件 安全 Java
Kafka/RocketMQ 多线程消费时如何保证消费顺序?
kafka 的消费类 KafkaConsumer 是非线程安全的,因此用户无法在多线程中共享一个 KafkaConsumer 实例,且 KafkaConsumer 本身并没有实现多线程消费逻辑,如需多线程消费,还需要用户自行实现,在这里我会讲到 Kafka 两种多线程消费模型。
1179 0
Kafka/RocketMQ 多线程消费时如何保证消费顺序?
|
1天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
5天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
7天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
25 3
|
12天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
15天前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
52 5
|
19天前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
59 4

热门文章

最新文章