五分钟带你玩转rocketMQ(五)实战广播与集群

简介: 1.集群消费方式 一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务


一.集群和广播区别

1.集群消费方式

 一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer  Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务

2.广播消费方式

 一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费

二.代码实现广播

1. package cn.baocl.rocketmq.consumer;
2. 
3. import cn.baocl.rocketmq.processor.MQConsumeMsgListenerProcessor;
4. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
7. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
8. import org.slf4j.Logger;
9. import org.slf4j.LoggerFactory;
10. import org.springframework.beans.factory.annotation.Autowired;
11. import org.springframework.beans.factory.annotation.Value;
12. import org.springframework.boot.SpringBootConfiguration;
13. import org.springframework.context.annotation.Bean;
14. import org.springframework.util.StringUtils;
15. 
16. 
17. @SpringBootConfiguration
18. public class MQConsumerConfiguration {
19. 
20. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
21. @Value("${rocketmq.consumer.namesrvAddr}")
22. private String namesrvAddr;
23. @Value("${rocketmq.consumer.groupName}")
24. private String groupName;
25. @Value("${rocketmq.consumer.consumeThreadMin}")
26. private int consumeThreadMin;
27. @Value("${rocketmq.consumer.consumeThreadMax}")
28. private int consumeThreadMax;
29. @Value("${rocketmq.consumer.topics}")
30. private String topics;
31. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
32. private int consumeMessageBatchMaxSize;
33. @Autowired
34. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
35. 
36. @Bean
37. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
38. if (StringUtils.isEmpty(groupName)){
39. throw new Exception("groupName is null !!!");
40.         }
41. if (StringUtils.isEmpty(namesrvAddr)){
42. throw new Exception("namesrvAddr is null !!!");
43.         }
44. if(StringUtils.isEmpty(topics)){
45. throw new Exception("topics is null !!!");
46.         }
47.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
48.         consumer.setNamesrvAddr(namesrvAddr);
49.         consumer.setConsumeThreadMin(consumeThreadMin);
50.         consumer.setConsumeThreadMax(consumeThreadMax);
51.         consumer.registerMessageListener(mqMessageListenerProcessor);
52. 
53. /**
54.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
55.          * 如果非第一次启动,那么按照上次消费的位置继续消费
56.          */
57.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
58. /**
59.          * 设置消费模型,集群还是广播,默认为集群
60.          */
61. //广播 
62.         consumer.setMessageModel(MessageModel.BROADCASTING);
63. //集群
64. //consumer.setMessageModel(MessageModel.CLUSTERING);
65. 
66. /**
67.          * 设置一次消费消息的条数,默认为1条
68.          */
69.         consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
70. 
71. try {
72. /**
73.              * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
74.              */
75.             String[] topicTagsArr = topics.split(";");
76. for (String topicTags : topicTagsArr) {
77.                 String[] topicTag = topicTags.split("~");
78.                 consumer.subscribe(topicTag[0],topicTag[1]);
79.             }
80.             consumer.start();
81.             LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
82.         }catch (MQClientException e){
83.             LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
84. throw new Exception(e);
85.         }
86. return consumer;
87.     }
88. }

只需要以下代码

1. //广播 
2.         consumer.setMessageModel(MessageModel.BROADCASTING);
3.         //集群
4.         //consumer.setMessageModel(MessageModel.CLUSTERING);


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
13天前
|
消息中间件 存储 运维
2024最全RabbitMQ集群方案汇总
本文梳理了RabbitMQ集群的几种方案,主要包括普通集群、镜像集群(高可用)、Quorum队列(仲裁队列)、Streams集群模式(高可用+负载均衡)和插件方式。重点介绍了每种方案的特点、优缺点及适用场景。搭建步骤包括安装Erlang和RabbitMQ、配置集群节点、修改hosts文件、配置Erlang Cookie、启动独立节点并创建集群,以及配置镜像队列以提高可用性和容错性。推荐使用Quorum队列与Streams模式,其中Quorum队列适合高可用集群,Streams模式则同时支持高可用和负载均衡。此外,还有Shovel和Federation插件可用于特定场景下的集群搭建。
111 2
|
13天前
|
消息中间件 RocketMQ
2024最全RocketMQ集群方案汇总
在研究RocketMQ集群方案时,发现网上存在诸多不一致之处,如组件包含NameServer、Broker、Proxy等。通过查阅官方文档,了解到v4.x和v5.x版本的差异。v4.x部署模式包括单主、多主、多主多从(异步复制、同步双写),而v5.x新增Local与Cluster模式,主要区别在于Broker和Proxy是否同进程部署。Local模式适合平滑升级,Cluster模式适合高可用需求。不同模式下,集群部署方案大致相同,涵盖单主、多主、多主多从等模式,以满足不同的高可用性和性能需求。
68 0
|
3月前
|
消息中间件 数据采集 中间件
RabbitMQ的使用—实战
RabbitMQ的使用—实战
112 0
|
5月前
|
消息中间件 存储 负载均衡
|
4月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
200 0
|
6月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
178 17
|
5月前
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
57 2
|
5月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
99 0
|
6月前
|
消息中间件 Prometheus 监控
消息队列 MQ使用问题之如何将旧集群的store目录迁移到新集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决