五分钟带你玩转rocketMQ(六)队列难题——如何顺序消费

简介: 五分钟带你玩转rocketMQ(六)队列难题——如何顺序消费


现在我们想按照顺序消费消息

顺序消费场景:因为rocketmq有多个队列,所以当多个业务 如 登陆 下单 发货  使用rocketmq时,需要按照这个顺序进行消费,所以解决途径就是 使用MessageQueueSelector  将同一个批次的业务同步放入一个队列 然后顺序消费就可以了。

先原有基础上

1.修改消费者

1. package cn.baocl.rocketmq.processor;
2. 
3. 
4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
7. import com.alibaba.rocketmq.common.message.MessageExt;
8. import org.slf4j.Logger;
9. import org.slf4j.LoggerFactory;
10. import org.springframework.stereotype.Component;
11. 
12. import java.util.List;
13. import java.util.concurrent.atomic.AtomicLong;
14. 
15. @Component
16. public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
17. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
18. 
19. @Override
20. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
21. //原子类 可以使消费者同步进行
22.         AtomicLong consumeTimes = new AtomicLong(0);
23.         context.setAutoCommit(false);
24.         consumeTimes.incrementAndGet();
25. if ((consumeTimes.get() % 2) == 0) {
26. return ConsumeOrderlyStatus.SUCCESS;
27.         } else if ((consumeTimes.get() % 3) == 0) {
28. return ConsumeOrderlyStatus.ROLLBACK;
29.         } else if ((consumeTimes.get() % 4) == 0) {
30. return ConsumeOrderlyStatus.COMMIT;
31.         } else if ((consumeTimes.get() % 5) == 0) {
32.             context.setSuspendCurrentQueueTimeMillis(3000);
33. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
34.         }
35. return ConsumeOrderlyStatus.SUCCESS;
36.     }
37. 
38. 
39. }

2.修改调用方法

1. package cn.baocl.rocketmq.controllor;
2. 
3. import cn.baocl.rocketmq.entity.TestVo;
4. import com.alibaba.rocketmq.client.exception.MQBrokerException;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
7. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
8. import com.alibaba.rocketmq.client.producer.SendCallback;
9. import com.alibaba.rocketmq.client.producer.SendResult;
10. import com.alibaba.rocketmq.common.message.Message;
11. import com.alibaba.rocketmq.common.message.MessageQueue;
12. import com.alibaba.rocketmq.remoting.common.RemotingHelper;
13. import com.alibaba.rocketmq.remoting.exception.RemotingException;
14. import org.slf4j.Logger;
15. import org.slf4j.LoggerFactory;
16. import org.springframework.beans.factory.annotation.Autowired;
17. import org.springframework.web.bind.annotation.RequestMapping;
18. import org.springframework.web.bind.annotation.RestController;
19. 
20. import javax.annotation.Resource;
21. import java.util.List;
22. 
23. 
24. @RestController
25. @RequestMapping("/test")
26. public class TestControllor {
27. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
28. 
29. /**
30.      * 使用RocketMq的生产者
31.      */
32. @Resource(name = "customRocketMQProducer")
33. private DefaultMQProducer producer;
34. 
35. @RequestMapping("/send")
36. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
37. //定义tags
38.         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
39. for (int i = 0; i < 100; i++) {
40. int orderId = i % 10;
41. //指定每条消息发送到某个tags
42. Message msg = new Message("DemoTopic", tags[i % tags.length], "KEY" + i,
43.                     ("现在排号到:" + i).getBytes());
44. //顺序发送方法
45. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
46. @Override
47. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
48. Integer id = (Integer) arg;
49. int index = id % mqs.size();
50. return mqs.get(index);
51.                 }
52.             }, orderId);
53. //orderId 就是队列编号 ,默认有4个队列
54.             System.out.print("此条消息id为:"+i);
55.             System.out.printf("%s%n", sendResult);
56.         }
57.     }
58. }

得到的结果是A,B,C,D,E tags每个都是按照顺序消费

image.png


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
27 1
|
1月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
【1月更文挑战第12天】用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
241 1
|
1月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
134 0
|
1月前
|
消息中间件 存储 NoSQL
RabbitMQ的幂等性、优先级队列和惰性队列
**摘要:** 本文讨论了RabbitMQ中的幂等性、优先级队列和惰性队列。幂等性确保了重复请求不会导致副作用,关键在于消费端的幂等性保障,如使用唯一ID和Redis的原子性操作。优先级队列适用于处理不同重要性消息,如大客户订单优先处理,通过设置`x-max-priority`属性实现。惰性队列自3.6.0版起提供,用于延迟将消息加载到内存,适合大量消息存储和消费者延迟消费的场景。
43 4
|
12天前
|
消息中间件 存储 监控
RabbitMQ 死信队列
RabbitMQ的死信队列(DLQ)是存储无法正常消费消息的特殊队列,常见于消息被拒绝、过期或队列满时。DLQ用于异常处理、任务调度和监控,通过绑定到普通队列自动路由死信消息。通过监听死信队列,可以对异常消息进行补偿和进一步处理,提升系统稳定性和可维护性。
12 1
|
12天前
|
消息中间件 存储 负载均衡
消息队列 MQ产品使用合集之如何排查是哪个队列导致的异常TPS增加
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
1月前
|
消息中间件 Java
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
SpringBoot基于RabbitMQ实现死信队列 (SpringBoot整合RabbitMQ实战篇)
53 1
|
1月前
|
消息中间件
RabbitMQ 死信队列
RabbitMQ 死信队列
31 0
RabbitMQ 死信队列
|
1月前
|
消息中间件 存储 监控
解析RocketMQ:高性能分布式消息队列的原理与应用
RocketMQ是阿里开源的高性能分布式消息队列,具备低延迟、高吞吐和高可靠性,广泛应用于电商、金融等领域。其核心概念包括Topic、Producer、Consumer、Message和Name Server/Broker。RocketMQ支持异步通信、系统解耦、异步处理和流量削峰。关键特性有分布式架构、顺序消息、高可用性设计和消息事务。提供发布/订阅和点对点模型,以及消息过滤功能。通过集群模式、存储方式、发送和消费方式的选择进行性能优化。RocketMQ易于部署,可与Spring集成,并与Kafka等系统对比各有优势,拥有丰富的生态系统。
318 4
|
1月前
|
消息中间件 存储 NoSQL
rocketmq实现延迟队列思路探讨
本文介绍了两种实现RocketMQ延迟消息的方法。非任意时间延迟可通过在服务器端配置`messageDelayLevel`实现,但需重启服务。任意时间延迟则分为两种策略:一是结合原生逻辑和时间轮,利用RocketMQ的默认延迟等级组合支持任意延迟,但可能丢失1分钟内的数据;二是使用存储介质(如Redis)加时间轮,消息存储和定时发送结合,能处理数据不一致和丢失问题,但涉及更多组件。推荐项目[civism-rocket](https://github.com/civism/civism-rocket)作为参考。
114 1