开发者学堂课程【RocketMQ 知识精讲与项目实战(第三阶段):消息重试】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/704/detail/12455
消息重试
内容介绍:
一、顺序消息重试
二、无序消息重试
一、顺序消息重试
顺序消息的重试:会被自动的触发,在消费顺序消息的时候,如果对MQ没有做出正常的反馈,允许重新消费这个消息。由于消息的消费是顺序的,所以当前面的消费者没有消费完消息的时候,后面的消费者是不能够消费的,需要保持严格的顺序。这种情况下会可能出现消息的消费者被阻塞。所以在使用顺序消费的时候,要及时的监控并处理消费失败的情况,检测出问题,避免阻塞现象的发生。以上是顺序消息重试的一种方式。
二、无序消息重试
无序消息的重试:哪些消息属于无序的消息?普通消息默认都是无序的,包括定时、延时、事务消息都是以无序的方式的消费的。无序的方式在进行消费的时候,如果消费失败需要设置不同的状态达到重试的结果。设置哪些状态能够进行消息的重试?无序消息的重试对于集群消费方式是有效的,对于广播的方式是失效的。也就是说如果当前的广播的方式,不管是成功消费还是消费失败,只能消费一次。所以无序消息的重试的机制都是针对集群消费方式。
首先看重试的次数。如果设置了返回消息失败了,最终MQ允许消费多少次呢?默认情况下最多允许消费16次。每一次的间隔也是不同的。例如第一次间隔了10秒,第二次间隔30秒,第三次间隔1分钟等依次类推。到第16次的重试的时候,就会间隔两个小时。
如果消息重试真正进行了16次,总的时间算下来应该是4小时46分钟。如果在这个过程中有某一次消费成功了,给MQ响应成功了,就不会再接着重试了。如果消费了16次花费了4小时46分钟依然没有消费成功,这个消息就会不再被投递了和消费了。这时这个消息就会进入死信队列。后面会讲解死信对列。以上就是重试次数的内容,默认次数是16次。
什么情况下会重置方式?当处理完业务之后,如果返回的状态是action.reconsumelater.这个的时候,就允许消费相同的消息。如果返回的是null,也就是什么都没有返回,那么MQ认为当前的这个消息没有消费成功,就会允许再次去消费。如果抛出的是异常runtimeexception,MQ也允许重新去消费消息。所以对于无序消息来说,在进行重试的时候以上三种情况都会触发重试的机制。上面已经说过重试16次。这16次能否进行修改?也是可以的。见下图:
在进行消费的时候,如果消费失败了,但是不想重试,也可以做。
在try{
Doconsumemessage(message);
}catch(throw able e){
有可能会发生异常。在catch当中,包括整个的外部都做action.commitmessage;
这样MQ会认为当前的消息成功的消费了。
就不会继续重试了。以上就是消息重试的配置方式,主要是在Java代码中,如果出现上述的三种方式,都是可以重试的。如果不想重试就设置action.commitmessage就可以了。
设置的最大重试次数16次是可以修改的。如何修改?
如果修改的次数是小于等于16次,默认的就是走上面的配置。例如重试次数设置的是10,那么后面的11-16就不会再尝试了。时间间隔就是按照默认的方式进行处理。如果设置的重试次数大于16次,超过16次每一次间隔的时间都是每次2小时。
具体的设置:
Properties Properties=new Properties();
//配置对应group ID的最大消息重试次数为20次
Properties.put(propertkeyconst.maxconsumetimes,“20”);
Consumer consumer=ONfactory.createconsumer(Properties);
通过额外的属性去设置重试的次数。在创建消费者的时候传入Properties就可以了。
这里还有一个细节需要注意:当前给同一个消费者组当中设置了重试的次数,那么对所有组内其他的消费者都是起作用的。对一个设置了,组内其他的消费者都是起作用的。
如果只对相同Group ID下两个consumer实例中的其中一个设置了maxreconsumertimes,那么该配置对两个consumer实例均有效。
就是设置了一个maxreconsumertimes,组内配置的两个都会使用对应的设置。
设置会通过覆盖的方式生效,比如最后启动的实例会把之前启动的配置的实例覆盖。以上就是如何更改默认重试的次数,是可以超过16次的。
在消费时候,有时需要获取一下当前消息重试的次数。在message中有这样一个方法:getreconsumertimes,通过
Public class Message listener Impl implements Message Listener{
@override
public Action consume (Message message, Consume context context) {
//获取消息的重试次数
system.out.println(message.get Reconsume Times());
return Action. Commit Message;
会获得当前某一个消息已经重试的次数。
以上就是关于消息重试所讲的知识点。接下来做一个梳理。
首先分为两大块,第一是顺序消息的重试;顺序消息对于消费者消息
是由严格的顺序保障的。如果前面的消费者没有正常的消费的话,后面的消费者就会一直被阻塞。所有顺序消费需要额外的关注,随时的监控消息消费的情况,避免阻塞的发生。
无序消息的重试可以通过人为的在Java代码中触发。三种方式。
第一种是返回action reconsumerlater;第二种是返回一个null;第三种是如果出现异常都是允许重试的,总共允许重试16次。16次总共间隔4小时46分钟。这16次是可以改动的,可以在创建消费者的时候指定maxreconsumertimes的次数。入股次数小于等于16,每一次的间隔时间都是根据默认的方式而来的。如果大于16次,超过16次的间隔时间都是每次2小时。最后需要知道的是对于一个消费者进行重试次数的设置之后,组内的其他消费者都是适用的。这个消息消费重试它的配置是覆盖式的,后面会覆盖前面的。
在消费消息的同时也可以获得消息重试的次数。