五分钟带你玩转rocketMQ(八)提升消息稳定性——重试-阿里云开发者社区

开发者社区> 小鲍侃java> 正文

五分钟带你玩转rocketMQ(八)提升消息稳定性——重试

简介: 五分钟带你玩转rocketMQ(八)提升消息稳定性——重试
+关注继续查看


消费端消息重试实现

 

生产端消息重试

重试两次 

消费端消息重试

重试16次 然后加入死信

消费端模拟重试代码

1. @Component
2. public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
3. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
4. /**
5.      *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
6.      *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
7.      */
8.     @Override
9. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
10. if(CollectionUtils.isEmpty(msgs)){
11.             logger.info("接受到的消息为空,不处理,直接返回成功");
12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
13.         }
14.         MessageExt messageExt = msgs.get(0);
15.         logger.info("接受到的消息为:"+messageExt.toString());
16. if(messageExt.getTopic().equals("NewMessage")){
17. if(messageExt.getTags().equals("TagA")){
18. //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
19. 
20.                 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
21.                 System.out.printf(df.format(new Date()) + ", %s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
22. int reconsume = messageExt.getReconsumeTimes();
23.                 System.out.println("重试的次数为"+reconsume);
24. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
25. 
26. //TODO 获取该消息重试次数
27. //int reconsume = messageExt.getReconsumeTimes();
28. //if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
29. //    //记录日志
30. //    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
31. //}
32. //TODO 处理对应的业务逻辑
33. 
34. 
35.             }
36.         }
37. // 如果没有return success ,consumer会重新消费该消息,直到return success
38. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
39.     }
40. 
41. 
42. }

用以上代码模拟消费端的重试 ,每一次重试的时间与延时队列时间等级一样(16次时间还是比较长的)

重试16次之后投入死信

image.png

然后通过业务进行判断 如流水号等


版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
10074 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
13882 0
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
11888 0
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
7365 0
3分钟实现SpringBoot集成RabbitMQ,实现消息队列服务!
消息中间件在互联网公司使用得越来越多,主要用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息队列实现系统之间的双向解耦,生产者往消息队列中发送消息,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到系统解耦的目的,也大大提高了系统的高可用性和高并发能力。Spring Boot提供了spring-bootstarter-amqp组件对消息队列进行支持,使用非常简单,仅需要非常少的配置即可实现完整的消息队列服务。
304 0
+关注
小鲍侃java
小作坊架构师。
365
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载