RocketMQ消费失败消息深入分析(consumer,broker的具体处理逻辑)

简介: 消息队列是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题

前言

消息队列是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。由于每个消息队列都有它的优势和劣势,我们公司对于不同的场景使用了不同类型的消息队列。对于RocketMQ消费端存在消息消费失败的情况,通常有两种方式,一种是consumer端知道怎么处理,另一种是consumer不能处理(broker处理),本文对后一种情况进行介绍,consumer获取到消息但不能正常处理(ack),接下来这个消费失败的消息在Broker里面如何存储和重新让consumer消费,针对这个流程做了深入的分析。本文中的P代表producer,C代表consumer,本文的consumeQueue对应前面的topic下面的队列。

目录

  • RocketMQ的消费与存储结构
  • RocketMQ的消费失败消息处理逻辑
  • Broker端处理失败消息任务的启动
  • Consumer发回消费失败消息流程
  • Broker写发回失败消息的流程

RocketMQ的消费与存储结构

正常情况下,P发送消息到broker,消息内容写到commitlog,消息内容在commitlog的位置信息(索引)写到consumerQueue,C读取consumerQueue的内容消费消息。
image.png

RocketMq的存储结构:
image.png

本文的内容涉及上面的消费队列服务(consumerQueue,%RETRY%groupName属于consumerQueue),定时消息服务(SCHEDULE TOPIC XXXX)两个模块,C与broker的的消息消费只涉及到consumerQueue,定时消息服务只在broker内部起作用。

RocketMQ的消费失败消息处理逻辑

consumer消费失败消息处理流程图如下:
image.png

在下面的代码和流程分析中请结合这个图进行分析。其中SCHEDULE TOPIC XXXX和%RETRY%groupName的queue都存储在目录 ~/store/consumequeue 里面:ll ~/store/consumequeue 如下:
image.png

ll ~/store/consumequeue/SCHEDULE TOPIC XXXX 如下:
image.png

从上图可以看出SCHEDULE TOPIC XXXX的队列名称是从2开始到17,对应的delayLevel为3到18,3对应10s,18对应2h,在类MessageStoreConfig中这样定义延时时间:String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。SCHEDULE TOPIC XXXX这个topic只对内部使用,对于consumer只能消费到retry队列的数据。consumer消费失败的消息发回broker后总是先写到SCHEDULE TOPIC XXXX里面,然后schedule service读取SCHEDULE TOPICXXXX里面的数据写到retry队列,consumer消费retry队列的数据,这样就完成了一个循环,从这个过程也能看到,一个消费失败的消息体每次发回broker需要在commitLog里面存储两份(topic为SCHEDULE TOPIC XXXX的一份这个主要是为schedule service控制延时用的,topic为%RETRY%groupName的一份)。
当我们想查看现在的延时消息数量,我们可以查看SCHEDULE TOPIC XXXX的offset来得知,使用CLI Admin Tool工具输入命令“sh mqadmin brokerStatus”查看处理进度。如下图:
image.png
其中每行为一个队列,图中第一列为队列的名称,图中第二列参数为当前队列处理的offset,图中第三列为当前队列最大存储的offset。通过第三列和第二列的值相减能得出当前的队列的消息数量。

Broker端处理失败消息任务的启动

image.png
ScheduleMessageService根据messageDelayLevel维护了每个延迟level对应的队列编号,以及每个队列编号对应的offset。在start方法里面会启动18个timerTask(DeliverDelayedMessageTimerTask),每个对应一个level,初始offset为0。然后就是定时任务读取SCHEDULE TOPIC XXXX队列里面的消息进行判断,如果消息的delayLevel对应的时间满足重新消费,那么就会忘consumeQueue里面写这个消息,等待C重新来消费。

Consumer发回消费失败消息流程

image.png
在ConsumeRequest的run方法里面也就是业务端处理消息的线程里面,对于status是非success的交给ConsumeMessageConcurrentlyService(本文只讨论并行消费的模式,串行模式类似)的sendMessageBack方法处理,这个方法主要设置delayLevel(context.getDelayLevelWhenNextConsume()),然后传递给DefaultMQPushConsumerImpl.sendMessageBack找到对应的消息来源queue,把这个消息发送到这个queue里面,也就是说消费失败的消息发回broker还是会在之前的那个queue里面。发回broker后本地再过5秒重试消费一次,如果这次成功,下次就不再消费。上面流程的类图:
image.png
ConsumeRequest的run方法里面会调用ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this)来处理消费结果状态,在cluster(集群模式)下设置新的消息delayLevle值然后把失败的消息发回Broker,广播模式不发回。注意ConsumeConcurrentlyContext的delayLevelWhenNextConsume属性说明-1直接放到死信队列,0又broker每次对重试消费次数加1来控制重试策略,大于0由consumer控制重试消费策略(在listener的consumeMessage方法里面有个context:context.setDelayLevelWhenNextConsume(4)设置为1分钟延时消费),默认值为0。

Broker写发回失败消息的流程

image.png
broker端收到消费失败消息后通过consumerSendMsgBack(P发送的消息不由这个处理,区分通过消息头的type)方法设置当前消息的delayTimeLevel,这里计算delayTimeLevel,第一次重试默认consumer发回为0,延迟为延迟等级为0+3=3;如果第一次不为0表明是consumer控制的情况,直接取出delayTimeLevel,也就是和ConsumeConcurrentlyContext(consumer端控制)的delayLevelWhenNextConsume配置一致。设置好delayLevelTime后就交给DefaultMessageStore的putMessage方法,DefaultMessageStore的putMessage方法通过Commitlog的putMessage来写入文件,这里需要重点关注的是在这个方法里面通过msg.getDelayTimeLevel() > 0这个条件,修改当前消息topic为SCHEDULETOPIC XXXX,原来的topic保留在property里面,在ScheduleMessageService里面判断消息满足条件后会把消息的topic改为真实的topic,通常是retry,接着写到consumeQueue里面,C对于%RETRY%consumerGroup这个topic在程序里面默认是订阅的不需用户指定,然后队列Id的计算方式为queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()),即msg.getDelayTimeLevel()-1,和前面的截图2到17编号一致,然后消息体写到commitlog文件和索引写到SCHEDULE TOPIC XXXX队列。类图如下:
image.png

SendMessageProcessor处理远程发来的消息,包括P和C的,方法里面通过RequestCode.CONSUMER SEND MSG_BACK来判断是不是重试发回的消息。然后会判断这个消息对应的topic为%RETRY%consumerGroup的是否创建过,没有则创建;接下来的处理就和上面的流程图一样了。

总结

本文围绕consumer端消费失败后RocketMQ各个模块的处理逻辑进行了源码的深入分析。相信有了以上的知识学习和实践之后,当业务应用遇到了类似的问题就可以胸有成竹的应对了。

来源:微信公众号 Java杂记


image.png

欢迎扫码加入阿里云开发者社区的【11大垂直技术领域开发者社群】

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
223 2
|
4月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 负载均衡 算法
聊聊 RocketMQ中 Topic,Queue,Consumer,Consumer Group的关系
本文详细解析了RocketMQ中Topic、Queue、Consumer及Consumer Group之间的关系。文中通过图表展示了Topic可包含多个Queue,Queue分布在不同Broker上;Consumer组内多个消费者共享消息;并深入探讨了集群消费与广播消费模式下Queue与Consumer的关系,以及Rebalancing机制在实例增减时如何确保负载均衡。理解这些关系有助于更好地掌握RocketMQ的工作原理,提升系统运维效率。
93 2
|
3月前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
115 1
|
3月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
229 0
RocketMQ—一次连接namesvr失败的案例分析
|
3月前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
193 0
|
4月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
4月前
|
消息中间件 存储 索引
MetaQ/RocketMQ 原理问题之Consumer在MetaQ中工作的问题如何解决
MetaQ/RocketMQ 原理问题之Consumer在MetaQ中工作的问题如何解决
|
4月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决