RocketMQ消息重试机制解析!

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: RocketMQ消息重试机制解析!


由于网络抖动、服务宕机等一些不确定的因素,RocketMQ在发送消息的时候很有可能出现消息发送或者消费失败的问题。

所以RocketMQ消息重试分为2种:

Producer端重试和Consumer端重试。

Producer端重试

生产者端的消息失败,也就是ProducerMQ上发消息没有发送成功。

  • 比如网络抖动致使生产者发送消息到MQ失败。

这种消息失败重试可以手动设置发送失败重试的次数。

producer.setRetryTimesWhenSendFailed(3);

官方说明

Producersend方法本身支持内部重试。

重试逻辑:

  • 默认至多重试2次。
  • 这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。

如果本身向Broker发送消息产生超时异常,就不会再重试。

  • 以上策略也是在一定程度上保证了消息可以发送成功。

如果业务对消息可靠性要求比较高,建议增加相应的重试逻辑:

  • 比如调用send同步方法发送失败时,则尝试将消息存储到DB
  • 然后由后台线程定时重试,确保消息一定到达Broker

重试策略

消息发送重试有三种策略:

同步发送失败策略、异步发送失败策略和消息刷盘失败策略。

同步发送失败策略:

普通消息,消息发送默认采用round-robin策略(轮转)来选择所发送到的队列。

  • 如果发送失败,默认重试2次。

但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker

DefaultMQProducer producer = new DefaultMQProducer("pg");
// 设置同步发送失败时重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限为5s,默认10s
producer.setSendMsgTimeout(5000);

异步发送失败策略:

异步发送失败重试时,异步重试不会选择其他Broker,仅在当前Broker上做重试。

  • 所以该策略无法保证消息不丢失。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定异步发送失败后不进行重试发送
producer.setRetryTimesWhenSendAsyncFailed(0);

消息刷盘失败策略:

消息刷盘超时,默认是不会将消息尝试发送到其他Broker

对于重要消息可以通过在Broker的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启。

几种情况

异步发送在发送过程中出现异常进行重试:

在解析请求结果时,发现响应状态码有其它异常(消息可能未正确被Broker处理)会继续进行重试。

  • 重试依然选择当前Broker

但是如果响应结果不为空的话,即使处理响应时发生异常也不会进行重试。

同步发送时:

如果发送过程中没有异常,但是发送结果不OK,也会选择另一个Broker继续进行重试。

顺序消息发送失败不进行重试:

顺序消息:指的是同步+指定消息队列的方式发送。

Consumer端重试

消息正常的到了消费者,结果消费者发生异常,处理失败了。

例如反序列化失败,消息数据本身无法处理等。

顺序消息

顺序消息的消费重试

顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。

  • 消费重试默认间隔时间为1000ms

重试期间应用会出现消息消费被阻塞的情况。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 顺序消息消费失败的消费重试时间间隔,单位毫秒,默认为1000,其取值范围为[10, 30000]
consumer.setSuspendCurrentQueueTimeMillis(100);

由于对顺序消息的重试是无休止的,不间断的,直至消费成功。

  • 所以,对于顺序消息的消费,务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。

注意: 顺序消息没有发送失败重试机制,但具有消费失败重试机制。

消费状态

顺序消费目前两个状态:SUCCESSSUSPEND_CURRENT_QUEUE_A_MOMENT

SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暂停消费一下:

  • SuspendCurrentQueueTimeMillis时间间隔后再重试一下,而不是放到重试队列里。
public enum ConsumeOrderlyStatus {
    SUCCESS,
    
    @Deprecated
    ROLLBACK,
    
    @Deprecated
    COMMIT,
    
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

并发消息

并发消息的消费重试

在并发消费中,可能会有多个线程同时消费一个队列的消息。

因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。

  • 所有并发消费也可以称之为无序消费

对于无序消息(普通消息、延时消息、事务消息):

  • Consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。

注意:

无序消息的重试只针对集群消费模式生效。

广播消费模式不提供失败重试特性:即消费失败后,失败消息不再重试,继续消费新的消息。

消费状态

Consumer端消息消费有两种状态:

一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)。

Consumer为了保证消息消费成功,只有使用方明确表示消费成功。

  • 返回CONSUME_SUCCESSRocketMQ才会认为消息消费成功。

若是消息消费失败,只要返回:ConsumeConcurrentlyStatus.RECONSUME_LATER

  • RocketMQ就会认为消息消费失败了,要重新投递。
public enum ConsumeConcurrentlyStatus {
    CONSUME_SUCCESS,
    RECONSUME_LATER;   
}

重试机制

为了保证消息是确定被至少消费成功一次,RocketMQ会把这批消息重发回Broker

  • Topic不是原Topic而是一个RETRY Topic

在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递。

而若是一直这样重复消费都持续失败到必定次数(默认16次),就会投递到死信队列

在启动Broker的过程当中,能够观察到以下输出:

2024-09-19 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

RECONSUME_LATER策略:

若是消费失败,那么1S后再次消费,若是失败,那么5S后,再次消费,…… 直至2H后若是消费还失败。

  • 那么该条消息就会终止发送给消费者了。

消息重试间隔时间如下:

重试次数 与上次重试的间隔时间 重试次数 与上次重试的间隔时间
1 10秒 9 7分钟
2 30秒 10 8分钟
3 1分钟 11 9分钟
4 2分钟 12 10分钟
5 3分钟 13 20分钟
6 4分钟 14 30分钟
7 5分钟 15 1小时
8 6分钟 16 2小时

某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试。

  • 超过这个时间范围消息将不再重试投递,而被投递至死信队列

修改消费重试次数:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消费重试次数
consumer.setMaxReconsumeTimes(10);

基本原理

重试的 MessageRocketMQ 的做法并不是将其投递回原 Topic,而是重试队列

每个 ConsumerGroup 都有自己的重试队列:

  • 其名称是由特定的前缀拼接上 ConsumerGroup 所组成,默认 %RETRY%+消费者组名称
  • 所以在 Consumer 启动时,就会同时消费其 ConsumerGroup 对应的重试队列普通队列

消费失败的 MessageConsumer 会将其投回 Broker

  • 相当于这条 Message 已经被消费掉了,之后重试的只是内容相同、但实际不是同一条的 Message
  • 然后会校验重试的次数,如果达到16次则会进入死信队列 组成为 %DLQ%+消费者组名称
  • 未达到最大重试次数,则会根据重试间隔时间等级将其投递到延迟队列SCHEDULE_TOPIC_XXXX中。
  • 然后等到了延迟等级对应的时间之后,再投递到 ConsumerGroup 所对应的重试队列当中,供后续消费。

消息重复

如果消费端收到两条一样的消息,应该怎样处理?

《RocketMQ 原理简介》中讲到:

RocketMQ 无法避免消息重复。

所以如果业务对消费重复非常敏感,务必要在业务侧去重,有以下几种去重方式:

消费端处理消息的业务逻辑保持幂等性

  • 如何保证幂等性,可以看我之前的文章!

保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。

  • 利用一张日志表来记录已经处理成功的消息的ID。
  • 如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

最后

觉得有收获,希望帮忙点赞,转发下哈,谢谢,谢谢


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
ly~
|
1月前
|
消息中间件 存储 供应链
RocketMQ 消息的重试机制有什么优缺点?
RocketMQ 消息重试机制提高了消息处理的可靠性和系统的适应性,简化了错误处理,但也会增加系统延迟、可能导致消息重复处理并占用系统资源。适用于需要高可靠性的场景,如金融交易和电商订单处理。
ly~
56 5
|
2月前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
76 3
EMQ
|
6月前
|
Linux 网络性能优化
MQTT 5.0 报文解析 03:SUBSCRIBE 与 UNSUBSCRIBE
在 MQTT 中,SUBSCRIBE 报文用于发起订阅请求,SUBACK 报文用于返回订阅结果。而 UNSUBSCRIBE 和 UNSUBACK 报文则在取消订阅时使用。相比于取消订阅,订阅操作更加常用。不过在本文中,我们仍然会一并介绍订阅与取消订阅报文的结构与组成。
EMQ
362 5
MQTT 5.0 报文解析 03:SUBSCRIBE 与 UNSUBSCRIBE
ly~
|
1月前
|
消息中间件 存储 数据库连接
RocketMQ 消息的重试机制是怎样的?
RocketMQ的消息重试机制确保消息消费失败时能自动重试,直至成功。默认重试16次,时间间隔逐次翻倍,从10秒至数分钟不等。重试在同组内不同消费者间进行,由异常抛出或特定状态返回触发。支持自定义重试次数与时间间隔,建议合理配置避免无限重试,保障系统稳定性和性能。
ly~
737 2
EMQ
|
6月前
|
运维 Linux 网络性能优化
MQTT 5.0 报文解析 05:DISCONNECT
在 MQTT 中,客户端和服务端可以在断开网络连接前向对端发送一个 DISCONNECT 报文,来指示连接关闭的原因。客户端发送的 DISCONNECT 报文还可以影响服务端在连接断开后的行为,例如是否发送遗嘱消息,是否更新会话过期间隔。
EMQ
141 0
MQTT 5.0 报文解析 05:DISCONNECT
|
3月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
71 2
|
3月前
|
消息中间件 Java Spring
RabbitMQ重试机制
RabbitMQ重试机制
95 4
|
3月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
67 0
EMQ
|
5月前
|
安全 开发工具 数据安全/隐私保护
MQTT 5.0 报文解析 06:AUTH
MQTT 5.0 引入了增强认证特性,它使 MQTT 除了简单密码认证和 Token 认证以外,还能够支持质询/响应风格的认证。为了实现这一点,它在原先 CONNECT 和 CONNACK 报文的基础上,又引入了 AUTH 报文来实现任意多次的认证数据交换,以支持各种不同类型的认证机制,例如 SCRAM、Kerberos 认证等等。
EMQ
290 4
MQTT 5.0 报文解析 06:AUTH
EMQ
|
6月前
|
JSON Linux 网络性能优化
MQTT 5.0 报文解析 02:PUBLISH 与 PUBACK
本文将介绍在 MQTT 中用于传递应用消息的 PUBLISH 报文以及它的响应报文。不管是客户端向服务端发布消息,还是服务端向订阅端转发消息,都需要使用 PUBLISH 报文。决定消息流向的主题、消息的实际内容和 QoS 等级,都包含在 PUBLISH 报文中。
EMQ
378 10
MQTT 5.0 报文解析 02:PUBLISH 与 PUBACK

推荐镜像

更多