RocketMQ消息重试机制解析!

简介: RocketMQ消息重试机制解析!


由于网络抖动、服务宕机等一些不确定的因素,RocketMQ在发送消息的时候很有可能出现消息发送或者消费失败的问题。

所以RocketMQ消息重试分为2种:

Producer端重试和Consumer端重试。

Producer端重试

生产者端的消息失败,也就是ProducerMQ上发消息没有发送成功。

  • 比如网络抖动致使生产者发送消息到MQ失败。

这种消息失败重试可以手动设置发送失败重试的次数。

producer.setRetryTimesWhenSendFailed(3);

官方说明

Producersend方法本身支持内部重试。

重试逻辑:

  • 默认至多重试2次。
  • 这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。

如果本身向Broker发送消息产生超时异常,就不会再重试。

  • 以上策略也是在一定程度上保证了消息可以发送成功。

如果业务对消息可靠性要求比较高,建议增加相应的重试逻辑:

  • 比如调用send同步方法发送失败时,则尝试将消息存储到DB
  • 然后由后台线程定时重试,确保消息一定到达Broker

重试策略

消息发送重试有三种策略:

同步发送失败策略、异步发送失败策略和消息刷盘失败策略。

同步发送失败策略:

普通消息,消息发送默认采用round-robin策略(轮转)来选择所发送到的队列。

  • 如果发送失败,默认重试2次。

但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker

DefaultMQProducer producer = new DefaultMQProducer("pg");
// 设置同步发送失败时重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限为5s,默认10s
producer.setSendMsgTimeout(5000);

异步发送失败策略:

异步发送失败重试时,异步重试不会选择其他Broker,仅在当前Broker上做重试。

  • 所以该策略无法保证消息不丢失。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定异步发送失败后不进行重试发送
producer.setRetryTimesWhenSendAsyncFailed(0);

消息刷盘失败策略:

消息刷盘超时,默认是不会将消息尝试发送到其他Broker

对于重要消息可以通过在Broker的配置文件设置retryAnotherBrokerWhenNotStoreOK属性为true来开启。

几种情况

异步发送在发送过程中出现异常进行重试:

在解析请求结果时,发现响应状态码有其它异常(消息可能未正确被Broker处理)会继续进行重试。

  • 重试依然选择当前Broker

但是如果响应结果不为空的话,即使处理响应时发生异常也不会进行重试。

同步发送时:

如果发送过程中没有异常,但是发送结果不OK,也会选择另一个Broker继续进行重试。

顺序消息发送失败不进行重试:

顺序消息:指的是同步+指定消息队列的方式发送。

Consumer端重试

消息正常的到了消费者,结果消费者发生异常,处理失败了。

例如反序列化失败,消息数据本身无法处理等。

顺序消息

顺序消息的消费重试

顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。

  • 消费重试默认间隔时间为1000ms

重试期间应用会出现消息消费被阻塞的情况。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 顺序消息消费失败的消费重试时间间隔,单位毫秒,默认为1000,其取值范围为[10, 30000]
consumer.setSuspendCurrentQueueTimeMillis(100);

由于对顺序消息的重试是无休止的,不间断的,直至消费成功。

  • 所以,对于顺序消息的消费,务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。

注意: 顺序消息没有发送失败重试机制,但具有消费失败重试机制。

消费状态

顺序消费目前两个状态:SUCCESSSUSPEND_CURRENT_QUEUE_A_MOMENT

SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暂停消费一下:

  • SuspendCurrentQueueTimeMillis时间间隔后再重试一下,而不是放到重试队列里。
public enum ConsumeOrderlyStatus {
    SUCCESS,
    
    @Deprecated
    ROLLBACK,
    
    @Deprecated
    COMMIT,
    
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

并发消息

并发消息的消费重试

在并发消费中,可能会有多个线程同时消费一个队列的消息。

因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。

  • 所有并发消费也可以称之为无序消费

对于无序消息(普通消息、延时消息、事务消息):

  • Consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。

注意:

无序消息的重试只针对集群消费模式生效。

广播消费模式不提供失败重试特性:即消费失败后,失败消息不再重试,继续消费新的消息。

消费状态

Consumer端消息消费有两种状态:

一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)。

Consumer为了保证消息消费成功,只有使用方明确表示消费成功。

  • 返回CONSUME_SUCCESSRocketMQ才会认为消息消费成功。

若是消息消费失败,只要返回:ConsumeConcurrentlyStatus.RECONSUME_LATER

  • RocketMQ就会认为消息消费失败了,要重新投递。
public enum ConsumeConcurrentlyStatus {
    CONSUME_SUCCESS,
    RECONSUME_LATER;   
}

重试机制

为了保证消息是确定被至少消费成功一次,RocketMQ会把这批消息重发回Broker

  • Topic不是原Topic而是一个RETRY Topic

在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递。

而若是一直这样重复消费都持续失败到必定次数(默认16次),就会投递到死信队列

在启动Broker的过程当中,能够观察到以下输出:

2024-09-19 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

RECONSUME_LATER策略:

若是消费失败,那么1S后再次消费,若是失败,那么5S后,再次消费,…… 直至2H后若是消费还失败。

  • 那么该条消息就会终止发送给消费者了。

消息重试间隔时间如下:

重试次数 与上次重试的间隔时间 重试次数 与上次重试的间隔时间
1 10秒 9 7分钟
2 30秒 10 8分钟
3 1分钟 11 9分钟
4 2分钟 12 10分钟
5 3分钟 13 20分钟
6 4分钟 14 30分钟
7 5分钟 15 1小时
8 6分钟 16 2小时

某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试。

  • 超过这个时间范围消息将不再重试投递,而被投递至死信队列

修改消费重试次数:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消费重试次数
consumer.setMaxReconsumeTimes(10);

基本原理

重试的 MessageRocketMQ 的做法并不是将其投递回原 Topic,而是重试队列

每个 ConsumerGroup 都有自己的重试队列:

  • 其名称是由特定的前缀拼接上 ConsumerGroup 所组成,默认 %RETRY%+消费者组名称
  • 所以在 Consumer 启动时,就会同时消费其 ConsumerGroup 对应的重试队列普通队列

消费失败的 MessageConsumer 会将其投回 Broker

  • 相当于这条 Message 已经被消费掉了,之后重试的只是内容相同、但实际不是同一条的 Message
  • 然后会校验重试的次数,如果达到16次则会进入死信队列 组成为 %DLQ%+消费者组名称
  • 未达到最大重试次数,则会根据重试间隔时间等级将其投递到延迟队列SCHEDULE_TOPIC_XXXX中。
  • 然后等到了延迟等级对应的时间之后,再投递到 ConsumerGroup 所对应的重试队列当中,供后续消费。

消息重复

如果消费端收到两条一样的消息,应该怎样处理?

《RocketMQ 原理简介》中讲到:

RocketMQ 无法避免消息重复。

所以如果业务对消费重复非常敏感,务必要在业务侧去重,有以下几种去重方式:

消费端处理消息的业务逻辑保持幂等性

  • 如何保证幂等性,可以看我之前的文章!

保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。

  • 利用一张日志表来记录已经处理成功的消息的ID。
  • 如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

最后

觉得有收获,希望帮忙点赞,转发下哈,谢谢,谢谢


相关文章
|
2天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1517 4
|
29天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
5天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
501 19
|
2天前
|
存储 SQL 关系型数据库
彻底搞懂InnoDB的MVCC多版本并发控制
本文详细介绍了InnoDB存储引擎中的两种并发控制方法:MVCC(多版本并发控制)和LBCC(基于锁的并发控制)。MVCC通过记录版本信息和使用快照读取机制,实现了高并发下的读写操作,而LBCC则通过加锁机制控制并发访问。文章深入探讨了MVCC的工作原理,包括插入、删除、修改流程及查询过程中的快照读取机制。通过多个案例演示了不同隔离级别下MVCC的具体表现,并解释了事务ID的分配和管理方式。最后,对比了四种隔离级别的性能特点,帮助读者理解如何根据具体需求选择合适的隔离级别以优化数据库性能。
179 1
|
8天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
21天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
9天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
451 5
|
7天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
314 2
|
23天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
25天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2608 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析