ActiveMQ系列: ActiveMQ 的死信队列与消费重试机制

简介: maximumRedeliveryDelay:最大传送延迟,只在 useExponentialBackOff 为 true 时有效(V5.5),假设首次重连间隔为 10ms,倍数为 2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为 40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为 -1。

ActiveMQ系列: ActiveMQ 的死信队列与消费重试机制


前面有介绍与基础、应用实践部分,有兴趣的可以移步:


初步认识了ActiveMQ:https://blog.csdn.net/qq_26975307/article/details/98875098


结合JavaSE进行初尝试:https://blog.csdn.net/qq_26975307/article/details/98968854


详细讲讲JMS:https://blog.csdn.net/qq_26975307/article/details/99408962


JMS的可靠性:https://phubing.blog.csdn.net/article/details/99412285


结合 Spring,基于配置文件的使用 ActiveMQ:https://phubing.blog.csdn.net/article/details/99413883


结合 SpringBoot,基于 application.xml 使用ActiveMQ:https://blog.csdn.net/qq_26975307/article/details/99415899


ActiveMQ传输协议:https://blog.csdn.net/qq_26975307/article/details/100147542


ActiveMQ 的持久化机制https://blog.csdn.net/qq_26975307/article/details/100161024


基于LevelDB和 Zookeeper 的数据复制集群:https://blog.csdn.net/qq_26975307/article/details/100560821


高级特性之异步投递、延时投递与定时投递:https://blog.csdn.net/qq_26975307/article/details/100624740


此篇也是最后一篇了,来搞搞 ActiveMQ 的死信队列与消费重试机制


1、先抛几个问题


1.具体哪些情况会引起消息重发?

2.消息重发时间的间隔 和 重发次数各是多少?

3.有毒消息 Poison ACK


2、消费重试机制


2.1、官网拜读


不以官网为标准的 ActiveMQ 学习都是耍流氓http://activemq.apache.org/redelivery-policy


2.2、部分属性说明


1、collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用 useCollisionAvoidance 参数时才生效。也就是在延迟时间上再加一个时间波动范围。默认值为0.15。


2、maximumRedeliveries:最大重传次数,达到最大重连次数后抛出异常。为 -1 时不限制次数,为 0 时表示不进行重传。默认值为6。


3、maximumRedeliveryDelay:最大传送延迟,只在 useExponentialBackOff 为 true 时有效(V5.5),假设首

次重连间隔为 10ms,倍数为 2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为 40ms,当重连

时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为 -1。


4、initialRedeliveryDelay:初始重发延迟时间,默认1000L


5、redeliveryDelay:重发延迟时间,当initialRedeliveryDelay=0时生效,默认1000L


6、useCollisionAvoidance:启用防止冲突功能,默认false


7、useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认false


8、backOffMultiplier:重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。默认是5


20190910172828586.png


2.3、案例验证与讲解


ActiveMQ 控制台效果


20190911132007106.jpg


那我超过6次还是不行呢?


1. 处理失败 指的是MessageListener的onMessage方法里抛出RuntimeException。


2. Message头里有两个相关字段:Redelivered默认为false,redeliveryCounter默认为0。


3. 消息先由broker发送给consumer,consumer调用listener,如果处理失败,本地redeliveryCounter++,给broker一个特定应答,broker端的message里redeliveryCounter++,延迟一点时间继续调用,默认1s。超过6次,则给broker另一个特定应答,broker就直接发送消息到DLQ。


4. 如果失败2次,consumer重启,则broker再推过来的消息里,redeliveryCounter=2,本地只能再重试4次即会进入DLQ。


5. 重试的特定应答发送到broker,broker即会在内存将消息的redelivered设置为true,redeliveryCounter++,但是这两个字段都没有持久化,即没有修改存储中的消息记录。所以broker重启时这两个字段会被重置为默认值。


JmsProducer_Redelivery 修改


20190915094244261.png


2019091509440490.png


JmsConsumer_Redelivery 修改


20190915094630606.png

20190915094644864.png


2.4、整合spring后如何使用


2.4.1、applicationContext.xml


<!--定义ReDelivery(重发机)机制-->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--是否每次尝试重新发送失败后,增长这个等待时间-->
        <property name="useExponentialBackOff" value="true"></property>
        <!--重发魔觉,默认为6片这望设置为3故-->
        <property name="maximumRedeliveries" value="3"></property>
        <!--重发时间间隔,默认1秒-->
        <property name="initialRedeliveryDelay" value="1000"></property>
        <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待 500*2 毫秒,这里的2是value-->
        <property name="backOffMultiplier" value="2"></property>
        <!--
            最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms 倍数为2,
            那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,
            当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔散为最大重连时间间隔。
        -->
        <property name="maximumRedeliveryDelay" value="1000"></property>
    </bean>
    <!--创建链接工厂-->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"></property>
        <!--引用重发机制-->
        <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>
    </bean>


2.4.2、Messages are redelivered to a client when any of the following occurs:


官网拜读:http://activemq.apache.org/message-redelivery-and-dlq-handling


1.使用并rollback()调用事务会话  。

2.交易会话在commit()被调用之前关闭  。

3.会话正在使用  CLIENT_ACKNOWLEDGE并被  Session.recover()调用。

4.客户端连接超时(可能正在执行的代码花费的时间超过配置的超时时间)。


3、死信队列


3.1、官网拜读


依旧万年不变的:http://activemq.apache.org/message-redelivery-and-dlq-handling.html


3.2、什么是死信队列


ActiveMQ 中引入了“死信队列”(Dead Letter Queue)的概念。即一条消息再被重发了多次后(默认为重发6次redeliveryCounter==6),将会被ActiveMQ移入“死信队列”。


开发人员可以在这个  Queue中查看处理出错的消息,进行人工干预。


By default, ActiveMQ will not place undeliverable non-persistent messages on the dead-letter queue. The rationale for this behavior is that if the application doesn’t care enough to make the message persistent, then there is little or no value in recording that the message was undeliverable. If you do want to place non-persistent messages on the dead-letter queue, then you should set processNonPersistent="true" on the dead-letter strategy.


(默认情况下,ActiveMQ不会在死信队列上放置无法传递的非持久性消息。这种行为的基本原理是,如果应用程序不够谨慎使消息持久化,那么记录消息无法传递的价值很小或没有价值。如果您确实希望在死信队列上放置非持久性消息,那么您应该设置processNonPersistent="true"死信策略。)


3.3、死信队列的使用:处理失败的消息


举个栗子:一个订单在队列中的应用


20190911135004404.jpg


1、实际中在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。


2、核心业务队列,就是比如上图专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。


3、假如第三方物流系统故障了此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问或者标志位处理失败。


4、一旦标志这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中。


5、然后看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程,监控第三方物流系统是否正常,能否请求的,不停的监视。


6、一旦发现对方恢复正常,这个后台线程就从死信队列消费出来处理失败的订单,重新执行发货和配送的通知逻辑。


4、死信队列的配置介绍


1、SharedDeadLetterStrategy


2、IndividualDeadLetterStrategy


3、配置案例


1.自动删除过期消息

2.存放非持久消息到死信队列中


4.1、SharedDeadLetterStrategy


将所有的DeadLetter保存在一个共享的队列中,这是 ActiveMQ broker 端默认的策略。


共享队列默认为 “ActiveMQ.DLQ”,可以通过 “deadLetterQueue” 属性来设定。


<deadLetterStrategy>
     <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>


4.2、IndividualDeadLetterStrategy


把DeadLetter放入各自的死信通道中,


对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue.”;


对于Topic而言,死信通道的前缀默认为“ActiveMQ.DLQ.Topic.”;


比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order”,这里使用“queuePrefix”“topicPrefix”来指定上述前缀。


默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;


不过你也可以指定为Topic。


<policyEntry queue="order">
    <deadLetterStrategy>
       <individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="false"/>
    </deadLetterStrategy>
</policyEntry>


将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic。


属性“useQueueForTopicMessages”,此值表示是否将Topic的DeadLetter保存在Queue中,默认为true。


4.3、配置案例


4.3.1、自动删除过期消息


有时需要直接删除过期的消息而不需要发送到死队列中,“processExpired”表示是否将过期消息放入死信队列,默认为true;


<policyEntry queue=">">
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired="false"/>
    </deadLetterStrategy>
</policyEntry>


4.3.2、存放非持久消息到死信队列中


默认情况下,Activemq 不会把非持久的死消息发送到死信队列中


processNonPersistent”表示是否将“非持久化”消息放入死信队列,默认为false。


非持久性如果你想把非持久的消息发送到死信队列中,需要设置属性processNonPersistent=“true”


<policyEntry queue=">">
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired="true"/>
    </deadLetterStrategy>
</policyEntry>


华丽的分割线=====================================================================配置案例=========


    ............
    <pendingMes sageLimitStrategy>
         <constantPendingMessageLimitstrategy limit="1000"/>
         </pendingMes sageLimitStrategy>
    </policyEntry>
    <policyEntry queue=">">
         <deadLetterStrategy>
         <individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="true"/>
         </deadLetterstrategy>
    </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>


5、如何保证消息不被重复消费呢?也即消息幂等性问题


网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。


如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。


如果上面两种情况还不行,准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。



写在最后


ActiveMQ 系列文章全篇完结了。(好像写小说那样),有兴趣的留言共同探讨,大佬轻拍。

目录
相关文章
|
消息中间件 存储 开发者
死信是什么,如何运用RabbitMQ的死信机制?
死信是什么,如何运用RabbitMQ的死信机制?
383 0
ly~
|
3月前
|
消息中间件 存储 数据库连接
RocketMQ 消息的重试机制是怎样的?
RocketMQ的消息重试机制确保消息消费失败时能自动重试,直至成功。默认重试16次,时间间隔逐次翻倍,从10秒至数分钟不等。重试在同组内不同消费者间进行,由异常抛出或特定状态返回触发。支持自定义重试次数与时间间隔,建议合理配置避免无限重试,保障系统稳定性和性能。
ly~
1320 2
|
5月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
74 0
|
6月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
176 1
|
6月前
|
消息中间件
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
RabbitMQ 死信消息队列 重复消费 basicAck basicNack
|
消息中间件
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
我们一起来学RabbitMQ 三:RabbiMQ 死信队列,延迟队列,持久化等知识点
|
消息中间件
RabbitMQ 的死信队列、延迟队列
RabbitMQ 的死信队列、延迟队列
99 0
|
消息中间件 监控 NoSQL
RocketMq普通消息,死信队列,消息幂等性(redis)
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
256 0
|
消息中间件 运维 Java
rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景
rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景
|
消息中间件 存储 负载均衡
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(下)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。

热门文章

最新文章