五分钟带你玩转rocketMQ(六)队列难题——如何顺序消费

简介: 五分钟带你玩转rocketMQ(六)队列难题——如何顺序消费


现在我们想按照顺序消费消息

顺序消费场景:因为rocketmq有多个队列,所以当多个业务 如 登陆 下单 发货  使用rocketmq时,需要按照这个顺序进行消费,所以解决途径就是 使用MessageQueueSelector  将同一个批次的业务同步放入一个队列 然后顺序消费就可以了。

先原有基础上

1.修改消费者

1. package cn.baocl.rocketmq.processor;
2. 
3. 
4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
7. import com.alibaba.rocketmq.common.message.MessageExt;
8. import org.slf4j.Logger;
9. import org.slf4j.LoggerFactory;
10. import org.springframework.stereotype.Component;
11. 
12. import java.util.List;
13. import java.util.concurrent.atomic.AtomicLong;
14. 
15. @Component
16. public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
17. private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
18. 
19. @Override
20. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
21. //原子类 可以使消费者同步进行
22.         AtomicLong consumeTimes = new AtomicLong(0);
23.         context.setAutoCommit(false);
24.         consumeTimes.incrementAndGet();
25. if ((consumeTimes.get() % 2) == 0) {
26. return ConsumeOrderlyStatus.SUCCESS;
27.         } else if ((consumeTimes.get() % 3) == 0) {
28. return ConsumeOrderlyStatus.ROLLBACK;
29.         } else if ((consumeTimes.get() % 4) == 0) {
30. return ConsumeOrderlyStatus.COMMIT;
31.         } else if ((consumeTimes.get() % 5) == 0) {
32.             context.setSuspendCurrentQueueTimeMillis(3000);
33. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
34.         }
35. return ConsumeOrderlyStatus.SUCCESS;
36.     }
37. 
38. 
39. }

2.修改调用方法

1. package cn.baocl.rocketmq.controllor;
2. 
3. import cn.baocl.rocketmq.entity.TestVo;
4. import com.alibaba.rocketmq.client.exception.MQBrokerException;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
7. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
8. import com.alibaba.rocketmq.client.producer.SendCallback;
9. import com.alibaba.rocketmq.client.producer.SendResult;
10. import com.alibaba.rocketmq.common.message.Message;
11. import com.alibaba.rocketmq.common.message.MessageQueue;
12. import com.alibaba.rocketmq.remoting.common.RemotingHelper;
13. import com.alibaba.rocketmq.remoting.exception.RemotingException;
14. import org.slf4j.Logger;
15. import org.slf4j.LoggerFactory;
16. import org.springframework.beans.factory.annotation.Autowired;
17. import org.springframework.web.bind.annotation.RequestMapping;
18. import org.springframework.web.bind.annotation.RestController;
19. 
20. import javax.annotation.Resource;
21. import java.util.List;
22. 
23. 
24. @RestController
25. @RequestMapping("/test")
26. public class TestControllor {
27. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
28. 
29. /**
30.      * 使用RocketMq的生产者
31.      */
32. @Resource(name = "customRocketMQProducer")
33. private DefaultMQProducer producer;
34. 
35. @RequestMapping("/send")
36. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
37. //定义tags
38.         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
39. for (int i = 0; i < 100; i++) {
40. int orderId = i % 10;
41. //指定每条消息发送到某个tags
42. Message msg = new Message("DemoTopic", tags[i % tags.length], "KEY" + i,
43.                     ("现在排号到:" + i).getBytes());
44. //顺序发送方法
45. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
46. @Override
47. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
48. Integer id = (Integer) arg;
49. int index = id % mqs.size();
50. return mqs.get(index);
51.                 }
52.             }, orderId);
53. //orderId 就是队列编号 ,默认有4个队列
54.             System.out.print("此条消息id为:"+i);
55.             System.out.printf("%s%n", sendResult);
56.         }
57.     }
58. }

得到的结果是A,B,C,D,E tags每个都是按照顺序消费

image.png


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java Spring
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
SpringBoot实现RabbitMQ的简单队列(SpringAMQP 实现简单队列)
54 1
|
1月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
127 6
|
2月前
|
消息中间件 JSON Java
|
2月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
2月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
85 0
|
3月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
91 2
|
4月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
4月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
69 0
说说RabbitMQ延迟队列实现原理?
|
4月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
130 1
|
5月前
|
消息中间件 存储 监控
RabbitMQ 死信队列
RabbitMQ的死信队列(DLQ)是存储无法正常消费消息的特殊队列,常见于消息被拒绝、过期或队列满时。DLQ用于异常处理、任务调度和监控,通过绑定到普通队列自动路由死信消息。通过监听死信队列,可以对异常消息进行补偿和进一步处理,提升系统稳定性和可维护性。
87 1