五分钟带你玩转rocketMQ(五)实战广播与集群

简介: 1.集群消费方式 一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务


一.集群和广播区别

1.集群消费方式

 一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer  Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务

2.广播消费方式

 一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费

二.代码实现广播

1. package cn.baocl.rocketmq.consumer;
2. 
3. import cn.baocl.rocketmq.processor.MQConsumeMsgListenerProcessor;
4. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
7. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
8. import org.slf4j.Logger;
9. import org.slf4j.LoggerFactory;
10. import org.springframework.beans.factory.annotation.Autowired;
11. import org.springframework.beans.factory.annotation.Value;
12. import org.springframework.boot.SpringBootConfiguration;
13. import org.springframework.context.annotation.Bean;
14. import org.springframework.util.StringUtils;
15. 
16. 
17. @SpringBootConfiguration
18. public class MQConsumerConfiguration {
19. 
20. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
21. @Value("${rocketmq.consumer.namesrvAddr}")
22. private String namesrvAddr;
23. @Value("${rocketmq.consumer.groupName}")
24. private String groupName;
25. @Value("${rocketmq.consumer.consumeThreadMin}")
26. private int consumeThreadMin;
27. @Value("${rocketmq.consumer.consumeThreadMax}")
28. private int consumeThreadMax;
29. @Value("${rocketmq.consumer.topics}")
30. private String topics;
31. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
32. private int consumeMessageBatchMaxSize;
33. @Autowired
34. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
35. 
36. @Bean
37. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
38. if (StringUtils.isEmpty(groupName)){
39. throw new Exception("groupName is null !!!");
40.         }
41. if (StringUtils.isEmpty(namesrvAddr)){
42. throw new Exception("namesrvAddr is null !!!");
43.         }
44. if(StringUtils.isEmpty(topics)){
45. throw new Exception("topics is null !!!");
46.         }
47.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
48.         consumer.setNamesrvAddr(namesrvAddr);
49.         consumer.setConsumeThreadMin(consumeThreadMin);
50.         consumer.setConsumeThreadMax(consumeThreadMax);
51.         consumer.registerMessageListener(mqMessageListenerProcessor);
52. 
53. /**
54.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
55.          * 如果非第一次启动,那么按照上次消费的位置继续消费
56.          */
57.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
58. /**
59.          * 设置消费模型,集群还是广播,默认为集群
60.          */
61. //广播 
62.         consumer.setMessageModel(MessageModel.BROADCASTING);
63. //集群
64. //consumer.setMessageModel(MessageModel.CLUSTERING);
65. 
66. /**
67.          * 设置一次消费消息的条数,默认为1条
68.          */
69.         consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
70. 
71. try {
72. /**
73.              * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
74.              */
75.             String[] topicTagsArr = topics.split(";");
76. for (String topicTags : topicTagsArr) {
77.                 String[] topicTag = topicTags.split("~");
78.                 consumer.subscribe(topicTag[0],topicTag[1]);
79.             }
80.             consumer.start();
81.             LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
82.         }catch (MQClientException e){
83.             LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
84. throw new Exception(e);
85.         }
86. return consumer;
87.     }
88. }

只需要以下代码

1. //广播 
2.         consumer.setMessageModel(MessageModel.BROADCASTING);
3.         //集群
4.         //consumer.setMessageModel(MessageModel.CLUSTERING);


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
190 2
|
6月前
|
消息中间件 存储 Java
消息队列 MQ使用问题之如何将RocketMQ中某个集群的topic迁移到另一个集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 存储 监控
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
395 0
|
8月前
|
消息中间件 Java API
MQ产品使用合集之RocketMQ dledger集群模式的dledgerpeers端口是集群之间通讯吗
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 中间件 Kafka
RocketMQ源码(一)RocketMQ消息生产及消费通信链路源码分析
**RocketMQ**的核心架构主要分为Broker、Producer、Consumer,通过阅读源码看到他们之间是通过Netty来通信的 ,具体来说Broker端是**Netty服务器**用来负责与客户端的连接请求处理,而Producer/Consumer端是**Netty客户端**用来负责与Netty服务器的通信及请求响应处理。
200 1
|
消息中间件 存储 API
RocketMQ极简入门-RocketMQ延迟消息
我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。
350 1
|
消息中间件 存储 API
五.RocketMQ极简入门-RocketMQ延迟消息
RocketMQ极简入门-RocketMQ延迟消息
|
消息中间件 Java API
RocketMq-消息模式讲解
RocketMq-消息模式讲解
RocketMq-消息模式讲解
|
消息中间件 RocketMQ
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
386 0
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
|
消息中间件 负载均衡 RocketMQ
一文带你理解 RocketMQ 广播模式实现机制
一文带你理解 RocketMQ 广播模式实现机制
749 0
一文带你理解 RocketMQ 广播模式实现机制