五分钟带你玩转rocketMQ(八)提升消息稳定性——重试

简介: 五分钟带你玩转rocketMQ(八)提升消息稳定性——重试


消费端消息重试实现

 

生产端消息重试

重试两次

消费端消息重试

重试16次 然后加入死信

消费端模拟重试代码

1. @Component
2. public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
3. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
4. /**
5.      *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
6.      *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
7.      */
8.     @Override
9. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
10. if(CollectionUtils.isEmpty(msgs)){
11.             logger.info("接受到的消息为空,不处理,直接返回成功");
12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
13.         }
14.         MessageExt messageExt = msgs.get(0);
15.         logger.info("接受到的消息为:"+messageExt.toString());
16. if(messageExt.getTopic().equals("NewMessage")){
17. if(messageExt.getTags().equals("TagA")){
18. //TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
19. 
20.                 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
21.                 System.out.printf(df.format(new Date()) + ", %s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
22. int reconsume = messageExt.getReconsumeTimes();
23.                 System.out.println("重试的次数为"+reconsume);
24. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
25. 
26. //TODO 获取该消息重试次数
27. //int reconsume = messageExt.getReconsumeTimes();
28. //if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
29. //    //记录日志
30. //    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
31. //}
32. //TODO 处理对应的业务逻辑
33. 
34. 
35.             }
36.         }
37. // 如果没有return success ,consumer会重新消费该消息,直到return success
38. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
39.     }
40. 
41. 
42. }

用以上代码模拟消费端的重试 ,每一次重试的时间与延时队列时间等级一样(16次时间还是比较长的)

重试16次之后投入死信

image.png

然后通过业务进行判断 如流水号等


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 算法
RocketMQ 重试机制详解及最佳实践
本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。
1466 0
RocketMQ 重试机制详解及最佳实践
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
791 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
ly~
|
3月前
|
消息中间件 存储 监控
如何查看 RocketMQ 消息的重试次数和时间间隔?
RocketMQ消息重试次数和时间间隔可通过查看消费者和Broker日志、使用管理控制台的监控页面和消息查询功能,或通过分析消费者代码和RocketMQ客户端库代码等方式获取。日志中常有消费失败重试的明确记录,控制台可监控消费情况推断重试状态,代码分析则适合技术用户深入了解。
ly~
294 3
|
6月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
178 1
|
7月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
消息中间件 缓存 监控
Rocketmq并发和顺序消费的失败重试机制
Rocketmq并发和顺序消费的失败重试机制
|
消息中间件 Arthas 监控
一次RocketMQ ons SDK Bug导致消息不断堆积到重试队列的案例分析
一次RocketMQ ons SDK Bug导致消息不断堆积到重试队列的案例分析
483 1
|
消息中间件 中间件 RocketMQ
【Alibaba中间件技术系列】「RocketMQ技术专题」分析消息队列中的消费失败重试机制的原理和实践
【Alibaba中间件技术系列】「RocketMQ技术专题」分析消息队列中的消费失败重试机制的原理和实践
433 6
|
消息中间件 Cloud Native 物联网
阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证
在混沌工程技术沙龙--金融行业精品专场的分布式系统稳定性评估体系获奖名单中,阿里云分布式消息队列服务成为通过首批消息队列服务稳定性认证,荣获最高级别 “先进级” 认证的消息队列服务。
521 0
阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证
|
消息中间件 缓存 Cloud Native
RocketMQ 重试机制的概念与最佳实践|学习笔记
快速学习 RocketMQ 重试机制的概念与最佳实践
566 0
RocketMQ 重试机制的概念与最佳实践|学习笔记