五分钟带你玩转rocketMQ(九)push与pull模式如何选择是个难题

简介: 对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费。严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )


rocketmq分为push与pull

MQ中Pull和Push的两种消费方式

 对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费。严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是  Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker  获取消息。当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push  做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )

(1)Pull方式

 由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;

(2)Push方式

 由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;

代码实现pull模式

调用方法

1. @RestController
2. @RequestMapping("/test")
3. public class TestControllor {
4. private static final Logger logger = LoggerFactory.getLogger(TestControllor.class);
5. 
6. /**
7.      * 使用RocketMq的生产者
8.      */
9. @Autowired
10. private DefaultMQProducer defaultMQProducer;
11. 
12. @RequestMapping("/send")
13. public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
14. Date day = new Date();
15. SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
16.         System.out.println(df.format(day));
17. //定义tags
18.         String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
19. for (int i = 0; i < 100; i++) {
20. final int index = i;
21. String msg = "demo msg test" + ",这是第" + i + "条" + df.format(day);
22.             logger.info("开始发送消息:" + i);
23. //指定每条消息发送到某个tags
24. Message sendMsg = new Message("NewMessage", tags[i % tags.length], "KEY" + i + "时间为" + df.format(day),
25.                     ("现在排号到:" + i).getBytes());
26. //默认3秒超时
27. SendResult sendResult = defaultMQProducer.send(sendMsg);
28.             logger.info("消息发送响应信息:" + sendResult.toString() + "当前为第" + i + "次");
29.         }
30.     }

生产者

1. @SpringBootConfiguration
2. public class MQProducerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
5. /**
6.      * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
7.      */
8. @Value("${rocketmq.producer.groupName}")
9. private String groupName;
10. @Value("${rocketmq.producer.namesrvAddr}")
11. private String namesrvAddr;
12. /**
13.      * 消息最大大小,默认4M
14.      */
15. @Value("${rocketmq.producer.maxMessageSize}")
16. private Integer maxMessageSize ;
17. /**
18.      * 消息发送超时时间,默认3秒
19.      */
20. @Value("${rocketmq.producer.sendMsgTimeout}")
21. private Integer sendMsgTimeout;
22. /**
23.      * 消息发送失败重试次数,默认2次
24.      */
25. @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
26. private Integer retryTimesWhenSendFailed;
27. 
28. @Bean
29. public DefaultMQProducer getRocketMQProducer() throws Exception {
30. if (StringUtils.isEmpty(this.groupName)) {
31. throw new Exception("groupName is blank");
32.         }
33. if (StringUtils.isEmpty(this.namesrvAddr)) {
34. throw new Exception("nameServerAddr is blank");
35.         }
36.         DefaultMQProducer producer;
37.         producer = new DefaultMQProducer(this.groupName);
38.         producer.setNamesrvAddr(this.namesrvAddr);
39. //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
40. //producer.setInstanceName(instanceName);
41.         producer.setVipChannelEnabled(false);
42. if(this.maxMessageSize!=null){
43.             producer.setMaxMessageSize(this.maxMessageSize);
44.         }
45. if(this.sendMsgTimeout!=null){
46.             producer.setSendMsgTimeout(this.sendMsgTimeout);
47.         }
48. //如果发送消息失败,设置重试次数,默认为2次
49. if(this.retryTimesWhenSendFailed!=null){
50.             producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
51.         }
52. 
53. try {
54.             producer.start();
55.             LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
56.                     , this.groupName, this.namesrvAddr));
57.         } catch (MQClientException e) {
58.             LOGGER.error(String.format("producer is error {}"
59.                     , e.getMessage(),e));
60. throw new Exception(e);
61.         }
62. return producer;
63.     }
64. }

消费者

以负载均衡的方式拉取消息 不能指定具体队列 --MQPullConsumerScheduleService 可以指定轮询时间

1. @SpringBootConfiguration
2. public class MQConsumerConfiguration {
3. 
4. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
5. @Value("${rocketmq.consumer.namesrvAddr}")
6. private String namesrvAddr;
7. @Value("${rocketmq.consumer.groupName}")
8. private String groupName;
9. @Value("${rocketmq.consumer.consumeThreadMin}")
10. private int consumeThreadMin;
11. @Value("${rocketmq.consumer.consumeThreadMax}")
12. private int consumeThreadMax;
13. @Value("${rocketmq.consumer.topics}")
14. private String topics;
15. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
16. private int consumeMessageBatchMaxSize;
17. @Autowired
18. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
19. 
20. @Bean
21. public MQPullConsumerScheduleService testRocketMQConsumer() throws Exception {
22. // 1. 实例化对象
23. final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("rocketmq");
24. 
25. // 2. 设置NameServer
26.         scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");
27. // 3. 设置消费组为集群模式
28.         scheduleService.setMessageModel(MessageModel.CLUSTERING);
29. 
30. // 4. 注册拉取回调函数
31.         scheduleService.registerPullTaskCallback("NewMessage", new PullTaskCallback() {
32. @Override
33. public void doPullTask(MessageQueue mq, PullTaskContext context) {
34. // 5.从上下文中获取MQPullConsumer对象,此处其实就是DefaultMQPullConsumer。
35. MQPullConsumer consumer = context.getPullConsumer();
36. try {
37. // 6.获取该消费组的该队列的消费进度
38. long offset = consumer.fetchConsumeOffset(mq, false);
39. if (offset < 0) {
40.                         offset = 0;
41.                     }
42. // 7.拉取消息,pull()方法在DefaultMQPullConsumer有具体介绍
43. PullResult pullResult = consumer.pull(mq, "*", offset, 2);
44.                     System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
45. switch (pullResult.getPullStatus()) {
46. case FOUND:
47. //打印消息
48.                             List<MessageExt> messageExtList = pullResult
49.                                     .getMsgFoundList();
50. for (MessageExt m : messageExtList) {
51.                                 System.out.println(m.toString());
52.                             }
53. break;
54. case NO_MATCHED_MSG:
55. break;
56. case NO_NEW_MSG:
57. case OFFSET_ILLEGAL:
58. break;
59. default:
60. break;
61.                     }
62. // 8.更新消费组该队列消费进度
63.                     consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
64. // 9.设置下次拉取消息时间间隔,单位毫秒
65.                     context.setPullNextDelayTimeMillis(10000);
66.                 } catch (Exception e) {
67.                     e.printStackTrace();
68.                 }
69.             }
70.         });
71. 
72.         scheduleService.start();
73. return scheduleService;
74.     }
75. }

其中返回的结果为

PullStatus.FOUND:成功拉取消息

PullStatus.NO_NEW_MSG:没有新的消息可被拉取

PullStatus.NO_MATCHED_MSG:过滤结果不匹配

PullStatus.OFFSET_ILLEGAL:offset非法

同时在我们的控制有pull的体现

image.png

参考:https://www.cnblogs.com/zhyg/p/10451518.html

         https://blog.csdn.net/zhaohongfei_358/article/details/101457563


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
|
2天前
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
28 3
|
4月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
84 2
|
4月前
|
消息中间件
RabbitMQ广播模式
RabbitMQ广播模式
83 1
|
4月前
|
消息中间件 应用服务中间件 网络安全
rabbitMQ镜像模式搭建
rabbitMQ镜像模式搭建
|
5月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
5月前
|
消息中间件 存储 Kafka
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
MetaQ/RocketMQ 原理问题之RocketMQ DLedger融合模式的问题如何解决
|
4月前
|
消息中间件 Java Maven
RabbitMQ通配符模式
RabbitMQ通配符模式
74 0
|
5月前
|
消息中间件 Java Apache
消息队列 MQ使用问题之如何在内外网环境下使用单组节点单副本模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 负载均衡 RocketMQ
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决
MetaQ/RocketMQ 原理问题之在广播模式下,RebalanceService工作的问题如何解决