RocketMQ-没有消费者的消息堆积场景分析

本文涉及的产品
应用实时监控服务-应用监控,每月50GB免费额度
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: RocketMQ-没有消费者的消息堆积场景分析

背景介绍

前面几篇文章分析了几个引起消息堆积的典型场景,分别是:

这次的消息堆积场景之前没有遇到过,记录下来以备忘。

问题描述

分析过程

初步判断

为了便于表达和理解,我们只关注与该问题有关的部分逻辑。

因为消息堆积量不断在增加,所以判断该Group ID已经在Broker上有了订阅关系,很可能是使用该Group ID的Consumer实例下线后没有取消订阅关系导致的,如图:

正常运行

在正常情况下,控制台上可以看到Group ID的【订阅关系】及【消费者状态】,如图:

异常之后

异常之后就变成了【问题描述】中的样子,此时我们不清楚:

  • 该GID订阅了哪个topic
  • 该GID被哪个应用消费者使用后出现的异常
  • 该GID对应的消息生产者是哪个

在以上事情没有弄清楚之前,也不敢对该GID做取消订阅、删除之类的操作。

确定topic

消息堆积是通过消费者的offset信息统计的,该信息存储在Broker上的store/config/consumerOffset.json中,consumerOffset.json格式如图:

我们在consumerOffset.json文件中找到了GID对应的topic,此处有个细节(后面代码处有解释):

  • 该GID在groupTopicMap中没有重试队列Topic
  • 该GID在offsetTable中没有重试队列Topic上的offset

确定Producer

通过Topic查询Message

通过MessageID确定ECS IP

通过上面的查询无法直接定位到ECS,我们可以通过Message ID计算出ECS IP,方法如下:

String ip = MessageClientIDSetter.getIPStrFromID(Message ID)

如果懒得写代码,也可以使用arthas来查询:

此时整个链路逐渐清晰起来了,还缺少最关键的Consumer信息。

确定Consumer

代码Review

查询了近期发版的所有代码,没有找到与该GID相关的信息。

Broker端找线索

我们试图通过Broker端的日志来确认两件事情:

  • 该GID的Consumer在什么时候从哪些IP建立了与Broker的交互
  • 该GID的Consumer在什么时候从哪些IP断开了与Broker的交互

Broker heartBeat

通过以上代码打印的日志,我们可以过滤出该GID与Broker建立交互时候的相关信息。

Broker unregisterClient

在Consumer实例shutdown的时候,会向Broker发送unregisterClient请求,会调用ConsumerManager中相应的unregisterConsumer方法:

通过以上代码打印的日志,我们可以过滤出该GID与Broker断开交互时候的相关信息。

理想是美好的,现实是残酷的

Broker端最多保留了不到2天的日志,所以这条路也走不通了。

柳暗花明

同时我们也在想:除了程序,还有其他途径变更这种订阅关系吗?答案是有的。

命令行

控制台

到这里估计您已经知道引起这次消息堆积的原因了。

经验总结

  • 完善监控告警、提高应急响应能力
  • 最小权限原则
  • RocketMQ控制台是否应该增加操作记录的功能?
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 算法 数据库
如果解决MQ消息堆积问题
如果解决MQ消息堆积问题
|
5月前
|
消息中间件 架构师 Java
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
美团面试:对比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常见问题?
|
11月前
|
消息中间件 存储 监控
MQ线上大规模消息堆积问题处理及使用场景详解
【11月更文挑战第21天】在如今的高并发互联网应用中,消息队列(Message Queue,简称MQ)扮演着至关重要的角色
651 1
|
6月前
|
存储 消息中间件 缓存
RocketMQ原理—3.源码设计简单分析下
本文介绍了Producer作为生产者是如何创建出来的、启动时是如何准备好相关资源的、如何从拉取Topic元数据的、如何选择MessageQueue的、与Broker是如何进行网络通信的,Broker收到一条消息后是如何存储的、如何实时更新索引文件的、如何实现同步刷盘以及异步刷盘的、如何清理存储较久的磁盘数据的,Consumer作为消费者是如何创建和启动的、消费者组的多个Consumer会如何分配消息、Consumer会如何从Broker拉取一批消息。
279 11
RocketMQ原理—3.源码设计简单分析下
|
6月前
|
消息中间件 Java 数据管理
RocketMQ原理—2.源码设计简单分析上
本文介绍了NameServer的启动脚本、启动时会解析哪些配置、如何初始化Netty网络服务器、如何启动Netty网络服务器,介绍了Broker启动时是如何初始化配置的、BrokerController的创建以及包含的组件、BrokerController的初始化、启动、Broker如何把自己注册到NameServer上、BrokerOuterAPI是如何发送注册请求的,介绍了NameServer如何处理Broker的注册请求、Broker如何发送定时心跳
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
956 2
|
9月前
|
消息中间件 存储 Java
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
45岁资深架构师尼恩在其读者群中分享了关于如何提升RocketMQ顺序消费性能的高并发面试题解析。面对10W QPS的高并发场景,尼恩详细讲解了RocketMQ的调优策略,包括专用方案如增加ConsumeQueue数量、优化Topic设计等,以及通用方案如硬件配置(CPU、内存、磁盘、网络)、操作系统调优、Broker配置调整、客户端配置优化、JVM调优和监控与日志分析等方面。通过系统化的梳理,帮助读者在面试中充分展示技术实力,获得面试官的认可。相关真题及答案将收录于《尼恩Java面试宝典PDF》V175版本中,助力求职者提高架构、设计和开发水平。
招行面试:10Wqps场景,RocketMQ 顺序消费 的性能 如何提升 ?
|
7月前
|
消息中间件 存储 前端开发
MQ有什么应用场景
MQ有什么应用场景
|
11月前
|
消息中间件 存储 Java
MQ线上消息乱序问题处理及场景详解
【11月更文挑战第22天】在现代分布式系统中,消息队列(MQ)作为核心组件,承担着异步处理、削峰填谷和系统解耦的重任。
496 1
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
335 0

相关产品

  • 云消息队列 MQ