自顶向下学习 RocketMQ(十):消息重投和消息重试

简介: 生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。

消息重投


“生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。如下方法可以设置消息重试策略:


  • retryTimesWhenSendFailed: 同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1 次。不会选择上次失败的>broker,尝试向其他 broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、>MQClientException 和部分 MQBrokerException 时会重投。
  • retryTimesWhenSendAsyncFailed: 异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK: 消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启。


此外,只有 普通消息 具有发送重试机制,顺序消息是没有的。


retryTimesWhenSendFailed


同步发送失败策略


DefaultMQProducer producer = new DefaultMQProducer("pg"); 
producer.setNamesrvAddr("rocketmqOS:9876"); 
// 设置同步发送失败时重试发送的次数,默认为 2 次 
producer.setRetryTimesWhenSendFailed(3); 
// 设置发送超时时限为 5s,默认 3s 
producer.setSendMsgTimeout(5000);


在我们 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置里,我们可以这样配置:


25.jpg


通过源码可以看到,它的默认值是 2:



26.jpg


retryTimesWhenSendAsyncFailed


异步发送失败策略


DefaultMQProducer producer = new DefaultMQProducer("pg"); 
producer.setNamesrvAddr("rocketmqOS:9876"); 
// 指定异步发送失败后不进行重试发送 
producer.setRetryTimesWhenSendAsyncFailed(0);


在我们 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置里,我们可以这样配置:


27.jpg


通过源码可以看到,它的默认值也是 2:


28.jpg


retryAnotherBrokerWhenNotStoreOK


消息刷盘失败策略


消息刷盘超时( Master 、 Slave ),默认是不会将消息尝试发送到其他 Broker。对于重要消息可以通过在 Broker 的配置文件设置 retryAnotherBrokerWhenNotStoreOK 属性为 true 来开启。


在我们 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置里,我们可以这样配置:


29.jpg


消息重试


“Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:


  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试。
  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

RocketMQ 会为每个消费组都设置一个 Topic 名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。


消费者消费某条消息失败后,会根据消息重试机制将该消息重新投递,若达到重试次数后消息还没有成功被消费,则消息将被投入死信队列。一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。


suspendCurrentQueueTimeMillis


同步消费(顺序消息)消息模式下消费失败后再次消费的时间间隔。默认值:1000 ms

在我们 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置里,我们可以这样配置:


30.jpg


顺序消息的重试是无休止的,不间断的,直至消费成功,所以,对于顺序消息的消费, 务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。


顺序消息没有发送失败重试机制,但具有消费失败重试机制


MaxReconsumeTimes


无序消息(包括普通消息、延时消息、定时消息和事务消息)的最大重试次数可通过自定义参数 MaxReconsumeTimes 取值进行配置。默认值为 16 次,该参数取值无最大限制,建议使用默认值。


间隔时间根据重试次数阶梯变化,取值范围:1 秒~2 小时。不支持自定义配置。


31.jpg


若最大重试次数小于等于 16 次,则间隔时间按照无序消息重试间隔时间阶梯变化。若最大重试次数大于 16 次,则超过 16 次的间隔时间均为 2 小时。


delayLevelWhenNextConsume


异步消费消息模式下消费失败重试策略:


  • -1, 不重复,直接放入死信队列
  • 0,broker 控制重试策略
  • 0,client 控制重试策略


默认值:0.


在我们 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置里,我们可以这样配置:


32.jpg


死信队列


当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。


正常情况下无法被消费的消息称为 死信消息(Dead-Letter Message),存储死信消息的特殊队列称为 死信队列(Dead-Letter Queue)。


对于 无序消息集群消费 下的重试消费,默认允许每条消息最多重试 16 次,如果消息重试 16 次后仍然失败,消息将被投递至 死信队列


特征


  • 不会再被消费者正常消费
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除


特性


  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。名称为 %DLQ%consumerGroup@consumerGroup
  • 如果一个 Group ID 未产生死信消息,则不会为其创建相应的死信队列
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
3月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
3月前
|
消息中间件
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
62 0
|
4月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
130 1
|
5月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
5月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
6月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
58 0
|
6月前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
773 0
|
6月前
|
消息中间件 存储 缓存
消息队列学习之rocketmq
【4月更文挑战第1天】消息队列学习之rocketmq
46 0
|
6月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
92 0