背景介绍
前面几篇文章分析了几个引起消息堆积的典型场景,分别是:
- 业务逻辑处理慢导致消息堆积的场景,MQ-消息堆积-业务线程阻塞案例分析
- JDK BUG导致消息堆积的场景,MQ-消息堆积-JDK Bug导致线程阻塞案例分析
- RocketMQ SDK BUG导致消息堆积的场景,一次RocketMQ ons SDK Bug导致消息不断堆积到重试队列的案例分析,同时在这篇文章也简单介绍了由于业务处理类返回RECONSUME_LATER,返回null,抛出未捕获异常导致消息堆积到重试队列的场景。
这次的消息堆积场景之前没有遇到过,记录下来以备忘。
问题描述
分析过程
初步判断
为了便于表达和理解,我们只关注与该问题有关的部分逻辑。
因为消息堆积量不断在增加,所以判断该Group ID已经在Broker上有了订阅关系,很可能是使用该Group ID的Consumer实例下线后没有取消订阅关系导致的,如图:
正常运行
在正常情况下,控制台上可以看到Group ID的【订阅关系】及【消费者状态】,如图:
异常之后
异常之后就变成了【问题描述】中的样子,此时我们不清楚:
- 该GID订阅了哪个topic
- 该GID被哪个应用消费者使用后出现的异常
- 该GID对应的消息生产者是哪个
在以上事情没有弄清楚之前,也不敢对该GID做取消订阅、删除之类的操作。
确定topic
消息堆积是通过消费者的offset信息统计的,该信息存储在Broker上的store/config/consumerOffset.json中,consumerOffset.json格式如图:
我们在consumerOffset.json文件中找到了GID对应的topic,此处有个细节(后面代码处有解释):
- 该GID在groupTopicMap中没有重试队列Topic
- 该GID在offsetTable中没有重试队列Topic上的offset
确定Producer
通过Topic查询Message
通过MessageID确定ECS IP
通过上面的查询无法直接定位到ECS,我们可以通过Message ID计算出ECS IP,方法如下:
String ip = MessageClientIDSetter.getIPStrFromID(Message ID)
如果懒得写代码,也可以使用arthas来查询:
此时整个链路逐渐清晰起来了,还缺少最关键的Consumer信息。
确定Consumer
代码Review
查询了近期发版的所有代码,没有找到与该GID相关的信息。
Broker端找线索
我们试图通过Broker端的日志来确认两件事情:
- 该GID的Consumer在什么时候从哪些IP建立了与Broker的交互
- 该GID的Consumer在什么时候从哪些IP断开了与Broker的交互
Broker heartBeat
通过以上代码打印的日志,我们可以过滤出该GID与Broker建立交互时候的相关信息。
Broker unregisterClient
在Consumer实例shutdown的时候,会向Broker发送unregisterClient请求,会调用ConsumerManager中相应的unregisterConsumer方法:
通过以上代码打印的日志,我们可以过滤出该GID与Broker断开交互时候的相关信息。
理想是美好的,现实是残酷的
Broker端最多保留了不到2天的日志,所以这条路也走不通了。
柳暗花明
同时我们也在想:除了程序,还有其他途径变更这种订阅关系吗?答案是有的。
命令行
控制台
到这里估计您已经知道引起这次消息堆积的原因了。
经验总结
- 完善监控告警、提高应急响应能力
- 最小权限原则
- RocketMQ控制台是否应该增加操作记录的功能?