前言
上一篇文章中提到了动态修改RocketMQ线程池,那我们如何才能够通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。
另外,从功能性以及健壮性而言还有许多值得我们思考的地方,科宇通过查看线程池运行时指标、负载报警、配置日志管理来提高我们对线程池的管理。
SpringBoot RocketMQ 实践
版本信息:
rocketmq-spring-boot-starter : 2.2.2
spring-boot-starter :2.3.2.RELEASE
为了满足业务需求,我们初始化三个 Consumer。
并且设置成同一个 Topic 和不同的消费者组 ConsumerGroup,线程池数量都设置为 20。
topic="test-topic", consumerGroup="my-consumer_test-topic", consumeThreadNumber=20) (publicclassMessageConsumeimplementsRocketMQListener { publicvoidonMessage(Stringmessage) { log.info(Thread.currentThread().getName()); log.info("Message: {}", message); } } topic="test-topic", consumerGroup="my-consumer_test-topic_1", consumeThreadNumber=20) (publicclassMessageConsumeimplementsRocketMQListener { publicvoidonMessage(Stringmessage) { log.info(Thread.currentThread().getName()); log.info("Message: {}", message); } } topic="test-topic", consumerGroup="my-consumer_test-topic_2", consumeThreadNumber=20) (publicclassMessageConsumeimplementsRocketMQListener { publicvoidonMessage(Stringmessage) { log.info(Thread.currentThread().getName()); log.info("Message: {}", message); } }
生产者:
"/message/send") (publicStringsendMessage() { IntStream.range(0, 1000).forEach(i-> { rocketMQTemplate.convertAndSend("test-topic", newDate().toString()); }); return"success"; }
执行结果:
我们可以看到三个消费者都监听到了Topic的消息,并进行了消费。但是这个时候,由于生产消息数量暴增,需要提高消费者的并行消费速度。
思考片刻,只有最常规的使用方式就是修改线程池后,重新启动消费者。但是目前生产正在跑着业务数据,重启消费者必定会造成大量数据堆积甚至是丢失。
这时候如果能有一个页面动态修改线程池数量,那一定是非常赞的!
如何进行动态修改 RocketMQ 线程数?
思路:
- 收集消费者端信息。
- 将消费者端的线程池实例注册到服务中。
- 定时心跳检查实例状态。
- 从容器中拿到对应线程池,调用原生方法进行参数修改。
流程图
实现逻辑:
第一步,收集消费者端信息。
第二步,通过DefaultRocketMQListenerContainer类获取到Bean对象的集合,将消费者执行器保存到容器中。
publicvoidonApplicationEvent(ApplicationStartedEventevent) { MapcontainerMap=ApplicationContextHolder.getBeansOfType(DefaultRocketMQListenerContainer.class); try { for (DefaultRocketMQListenerContainercontainer : containerMap.values()) { DefaultMQPushConsumerdefaultMQPushConsumer=container.getConsumer(); if (defaultMQPushConsumer!=null) { ConsumeMessageServiceconsumeMessageService=defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getConsumeMessageService(); ThreadPoolExecutorconsumeExecutor= (ThreadPoolExecutor) ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor"); rocketmqConsumeExecutor.put(container.getConsumerGroup(), consumeExecutor); } } } catch (Exceptionex) { log.error("Failed to get RocketMQ thread pool.", ex); } }
第三步,将消费者端实例注册到服务中。
第四步,调用接口将服务注册。
publicDiscoveryClient(HttpAgenthttpAgent, InstanceInfoinstanceInfo) { ... register(); } booleanregister() { log.info("{}{} - registering service...", PREFIX, appPathIdentifier); StringurlPath=BASE_PATH+"/apps/register/"; ResultregisterResult; try { registerResult=httpAgent.httpPostByDiscovery(urlPath, instanceInfo); } catch (Exceptionex) { registerResult=Results.failure(ErrorCodeEnum.SERVICE_ERROR); log.error("{}{} - registration failed: {}", PREFIX, appPathIdentifier, ex.getMessage()); } if (log.isInfoEnabled()) { log.info("{}{} - registration status: {}", PREFIX, appPathIdentifier, registerResult.isSuccess() ?"success" : "fail"); } returnregisterResult.isSuccess(); }
第五步,定时心跳检查消费者实例状态。
第六步,初始化定时任务线程池,实现心跳线程,检查实例信息逻辑。
publicDiscoveryClient(HttpAgenthttpAgent, InstanceInfoinstanceInfo) { // ...initScheduledTasks(); } // 定时任务线程池privatevoidinitScheduledTasks() { scheduler.scheduleWithFixedDelay(newHeartbeatThread(), 30, 30, TimeUnit.SECONDS); } // 心跳线程publicclassHeartbeatThreadimplementsRunnable { publicvoidrun() { if (renew()) { lastSuccessfulHeartbeatTimestamp=System.currentTimeMillis(); } } } privatebooleanrenew() { ResultrenewResult; try { InstanceInfo.InstanceRenewinstanceRenew=newInstanceInfo.InstanceRenew() .setAppName(instanceInfo.getAppName()) .setInstanceId(instanceInfo.getInstanceId()) .setLastDirtyTimestamp(instanceInfo.getLastDirtyTimestamp().toString()) .setStatus(instanceInfo.getStatus().toString()); // 检查实例信息是否正常renewResult=httpAgent.httpPostByDiscovery(BASE_PATH+"/apps/renew", instanceRenew); if (Objects.equals(ErrorCodeEnum.NOT_FOUND.getCode(), renewResult.getCode())) { longtimestamp=instanceInfo.setIsDirtyWithTime(); booleansuccess=register(); ThreadPoolAdapterRegisteradapterRegister=ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class); adapterRegister.register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } returnsuccess; } returnrenewResult.isSuccess(); } catch (Exceptionex) { log.error(PREFIX+"{} - was unable to send heartbeat!", appPathIdentifier, ex); returnfalse; } } // 注册实例信息publicvoidregister() { MapthreadPoolAdapterMap=ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); ListthreadPoolAdapterCacheConfigs=getThreadPoolAdapterCacheConfigs(threadPoolAdapterMap); doRegister(threadPoolAdapterCacheConfigs); }
第七步,从容器中拿到对应线程池,调用原生方法进行参数修改。
最后,通过 threadPoolKey 拿到执行器,更新核心线程数和最大线程数。
publicbooleanupdateThreadPool(ThreadPoolAdapterParameterthreadPoolAdapterParameter) { StringthreadPoolKey=threadPoolAdapterParameter.getThreadPoolKey(); // 通过threadPoolKey拿到执行器ThreadPoolExecutorrocketMQConsumeExecutor=rocketmqConsumeExecutor.get(threadPoolKey); if (rocketMQConsumeExecutor!=null) { intoriginalCoreSize=rocketMQConsumeExecutor.getCorePoolSize(); intoriginalMaximumPoolSize=rocketMQConsumeExecutor.getMaximumPoolSize(); // 更新核心线程数和最大线程数rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}", threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()), String.format(CHANGE_DELIMITER, originalMaximumPoolSize, rocketMQConsumeExecutor.getMaximumPoolSize())); returntrue; } log.warn("[{}] RocketMQ consuming thread pool not found.", threadPoolKey); returnfalse; }
我们在消费者端建立了三个消费者 Consumer,通过前端页面可以看到对应 RocketMQ 消费者线程池。
我们现在对第一个消费者进行参数的修改。
通过后台的日志,我们可以看到已经修改成功了。
2022-12-2615:35:35.472INFO9726--- [nio-8099-exec-2] c.h.s.s.c.ThreadPoolAdapterController : [RocketMQ] Changethird-partythreadpooldata. key: my-consumer_test-topic, coreSize: 100, maximumSize: 1002022-12-2615:35:35.473INFO9726--- [nio-8099-exec-2] c.h.a.r.RocketMQThreadPoolAdapter : [my-consumer_test-topic] RocketMQconsumptionthreadpoolparameterchange. coreSize: 20=>100, maximumSize: 20=>100
我们重新进行消息的生产,看看具体的效果,我们看到线程数量已经发生了变化/
利用这种方式,我们就可以解决常规重启系统来修改参数的问题。
上面我们主要利用了Hippo4j对RocketMQ线程池数量进行了修改,为了提高应用的健壮性,我们还有可以利用Hippo4j做到实时查看线程池运行时指标、负载报警、配置日志管理等事情。
如果大家对具体的代码实现感兴趣,欢迎大家到以下两个平台进行指导~
GitHub:https://github.com/opengoofy/hippo4j
Gitee:https://gitee.com/magegoofy/hippo4j
什么是 Hippo4j
为了避免大家重复的造轮子,大家只要在项目中引入 Hippo4j ,就能够进行 Dubbo、Hystrix、RabbitMQ、RocketMQ 等消费线程池运行时数据查看和线程数变更。
Hippo4j 通过对 JDK 线程池增强,以及扩展三方框架底层线程池等功能,为业务系统提高线上运行保障能力。
提供以下功能支持:
- 全局管控 - 管理应用线程池实例。
- 动态变更 - 应用运行时动态变更线程池参数,包括不限于:核心、最大线程数、阻塞队列容量、拒绝策略等。
- 通知报警 - 内置四种报警通知策略,线程池活跃度、容量水位、拒绝策略以及任务执行时间超长。
- 运行监控 - 实时查看线程池运行时数据,最近半小时线程池运行数据图表展示。
- 功能扩展 - 支持线程池任务传递上下文;项目关闭时,支持等待线程池在指定时间内完成任务。
- 多种模式 - 内置两种使用模式:依赖配置中心和无中间件依赖。
- 容器管理 - Tomcat、Jetty、Undertow 容器线程池运行时查看和线程数变更。
- 框架适配 - Dubbo、Hystrix、RabbitMQ、RocketMQ 等消费线程池运行时数据查看和线程数变更。
快速开始
对于本地演示目的,请参阅Quick start
演示环境: http://console.hippo4j.cn/index.html