一次RocketMQ ons SDK Bug导致消息不断堆积到重试队列的案例分析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
应用实时监控服务-应用监控,每月50GB免费额度
可观测监控 Prometheus 版,每月50GB免费额度
简介: 一次RocketMQ ons SDK Bug导致消息不断堆积到重试队列的案例分析

背景介绍

系统运行在专有云,应用运行时环境是EDAS Container( EDAS Container是EDAS 平台 HSF 应用运行的基础容器,EDAS Container 包含 Ali-Tomcat 和 Pandora),消息处理使用的是【ons SDK】,消息消费者使用【PUSH】方式【批量】消费【普通消息】,MessageModel是【CLUSTERING】。

为了解决RocketMQ Producer某个性能问题,对Pandora进行了升级(主要是升级RocketMQ版本)。

下面从技术角度对升级中遇到的问题及分析过程进行总结,积累经验以避免类似问题的发生。

问题描述

Pandora升级完成后,我们在RocketMQ控制台看到【消费者状态】->【实时消息堆积量】有8亿条,而每个Consumer实例堆积量是几十条,如图1:

在【消费者状态】->【连接详情】有消息消费失败的情况,如图2:

在应用服务器ons.log也可以实时查看消息消费的指标信息,如图3:

这部分的统计指标的实现可以查看:org.apache.rocketmq.client.stat.ConsumerStatsManager

分析过程

根据我们前面几篇关于MQ消息堆积的文章,可以知道:

  1. 消息堆积总量与Consumer实例消息堆积量相符的情况下,通常是Consumer消费能力弱导致堆积
  2. 消息堆积总量大,而Consumer实例消息堆积量很小的情况下,通常是消息堆积在了重试队列中

从【问题描述】看,更像是发生了第二种情况导致的消息堆积。

总体思路

消息在重试队列中堆积,说明Consumer实例消费消息的时候出现了某些异常,导致Consumer实例将消息发送到了Broker重试队列中,所以我们分析【哪些地方】调用了【发送消息到Broker重试队列的接口】就基本抓住了这个问题的关键。

通过分析RocketMQ源码,发现主要有两个地方调用了【发送消息到Broker重试队列的接口】:

  1. 一个是org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService的内部类ConsumeRequest
  2. 另一个是org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService中清理过期Message的定时任务(最终是交给每个ProcessQueue来清理各自的Message)

ConsumeRequest

下面贴一下主要的代码,与该问题不相关的代码省略掉了;如果不想看代码,可以跳过此处,查看后面的流程图。

为了便于理解,我们使用流程图来表达下图4中代码主要逻辑,见图5:

分析上面流程及代码,发现ConsumeConcurrentlyContext类的ackIndex变量是分析消息成功与失败的核心变量。

是否业务处理异常?

RocketMQ框架在业务处理类出现下面情况的时候,认为消息消费失败:

  1. 业务处理类返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  2. 业务处理类返回null
  3. 业务处理类抛出异常

通过业务处理类日志可以确定业务没有返回ConsumeConcurrentlyStatus.RECONSUME_LATER的情况;

从代码可以看出,当出现2、3情况的时候,框架会将warn日志打印到ons.log中,通过过滤ons.log中“consumeMessage exception”和“consumeMessage return null”关键词,没有相应的日志记录,所以也不是这两种情况造成的。

备注:

当出现2、3情况的时候,ons.log日志中并没有打印出线程栈信息,如果想具体定位异常产生的位置,可以通过arthas stack命令进行分析。

arthas watch processConsumeResult

既然发送失败消息到Broker重试队列是在processConsumeResult方法调用的,那么我们可以分析下该方法的入参及返回值情况。

watch com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService 
processConsumeResult "{params,returnObj}" "target.consumeGroup=='GID_CID_XXX'" -x 3 -n3

watch正常机器

watch异常机器

通过上面的watch,我们找到了问题最关键的地方,我们用下面的场景来分析下ackIndex不同值的影响。

场景一

  1. 业务处理类批量消费了【8】条数据,消费成功返回:CONSUME_SUCCESS
  2. ackIndex=Integer.MAX_VALUE
  3. RocketMQ框架分析消费成功了【8】条,失败【0】条
  4. 因为都消费成功了,不会将消息发送到Broker重试队列中

场景二

  1. 业务处理类批量消费了【8】条数据,消费成功返回:CONSUME_SUCCESS
  2. ackIndex=0
  3. RocketMQ框架分析消费成功了【1】条,失败【7】条
  4. 因为有【7】条消费失败,所以会将【7】条消费失败的消息发送到Broker重试队列中

arthas watch setAckIndex

既然有地方在修改ackIndex,先验证下我们的判断是否正确。

watch com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext setAckIndex "{params,returnObj}" "params[0]==0"

通过观察,确实有地方在不断将ackIndex的值修改为0。

arthas stack setAckIndex

我们继续定位是什么地方将ackIndex修改为0的。

stack com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext setAckIndex "params[0]==0" -n 2

通过线程栈可知BatchConsumerImpl类调用了ConsumeConcurrentlyContext.setAckIndex方法。

arthas jad BatchConsumerImpl

没有源码的情况下,我们可以使用arthas jad对类进行反编译。

jad com.aliyun.openservices.ons.api.impl.rocketmq.BatchConsumerImpl

ConsumeContext类实例字段acknowledgeIndex默认值是多少呢?如果是0,问题的原因就找到了。

athas jad ConsumeContext

没有源码的情况下,我们可以使用arthas jad对类进行反编译。

jad com.aliyun.openservices.ons.api.ConsumeContext

通过上面代码可以看出,ConsumeContext类实例字段acknowledgeIndex的默认值是0。

ProcessQueue

通过上面的分析,我们已经定位到了问题,ProcessQueue做下简单描述,不做具体分析了。

解决办法

由上面的分析,这个问题属于RocketMQ ons SDK的一个Bug,修复就交给相应的产研团队来fix吧。

经验总结

1-5-10,1分钟发现,5分钟定位,10分钟恢复。

当故障发生的时候,需要【1】最短时间内发现(监控报警是否做好),需要【10】最快的速度恢复(变更管理和预案是否做好),【5】似乎不是最主要的。

参考资料

EDAS基本概念

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
ly~
|
2月前
|
消息中间件 存储 监控
如何查看 RocketMQ 消息的重试次数和时间间隔?
RocketMQ消息重试次数和时间间隔可通过查看消费者和Broker日志、使用管理控制台的监控页面和消息查询功能,或通过分析消费者代码和RocketMQ客户端库代码等方式获取。日志中常有消费失败重试的明确记录,控制台可监控消费情况推断重试状态,代码分析则适合技术用户深入了解。
ly~
253 3
|
4月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
286 2
|
6月前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
51 0
|
4月前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
125 1
|
4月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
295 0
RocketMQ—一次连接namesvr失败的案例分析
|
5月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
159 1
|
6月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
180 1
|
6月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
7月前
|
消息中间件 网络协议 开发工具
MQ产品使用合集之rocketmq5.x只有tcp接入点吗,python sdk需要http接入点,请问怎么使用
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
224 2
|
2月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
95 5

相关产品

  • 云消息队列 MQ
  • 下一篇
    DataWorks