RocketMQ第七章:手把手教老婆实现-广播消息生产者和消费者

本文涉及的产品
网络型负载均衡 NLB,每月750个小时 15LCU
传统型负载均衡 CLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: RocketMQ第七章:手把手教老婆实现-广播消息生产者和消费者

image.png

RocketMQ使用教程相关系列 目录


目录


第一节:介绍


限制:


消费者消费模式


第二节:广播消息-生产者和消息者步骤说明


广播消息生产者代码实现步骤


广播消息消费者代码实现步骤


第三节:广播消息生产者


效果:


第四节:广播消息消费者A


效果:


第五节:广播消息消费者B


效果:


第六节:总结


第一节:介绍

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的


限制:

不支持顺序消息


消费者消费模式

消费者的消费模式分为两种


负载均衡模式:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

广播模式:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

第二节:广播消息-生产者和消息者步骤说明

广播消息生产者代码实现步骤

1.创建消息生产者producer,并制定生产者组名


2.指定Nameserver地址


3.启动producer


4.创建消息对象集合,指定主题Topic、Tag和消息体


5.发送集合消息


6.关闭生产者producer


注:与批量消息的生产者代码一模一样


广播消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名


2.指定Nameserver地址


3.默认均衡轮询消费模式 改为广播模式


4.订阅主题Topic和Tag


5.设置回调函数,处理消息


6.启动消费者consumer


注意:消费者的 Topic 和 Tag 需要和生产者保持一致


第三节:广播消息生产者

public class Producer {
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_broadcasting_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      List<Message> msgs = new ArrayList<Message>();
      // 4.创建消息对象,指定主题Topic、Tag和消息体
      /**
       * 参数一:消息主题Topic
       * 参数二:消息Tag
       * 参数三:消息内容
       */
      for (int i = 0; i < 20; i++) {
         Message msg = new Message("Topic_broadcasting_demo", "Tag_broadcasting_demo",
               ("Hello 虚竹,这是广播消息" + i).getBytes());
         msgs.add(msg);
      }
      // 5.发送消息
      SendResult result = producer.send(msgs);
      // 发送状态
      SendStatus status = result.getSendStatus();
      System.out.println("发送结果:" + result);
      // 线程睡1秒
      TimeUnit.SECONDS.sleep(1);
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者关闭");
   }
}

image.png

public class ConsumerA {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_broadcasting_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");
      // 默认均衡轮询消费模式 改为广播模式
      consumer.setMessageModel(MessageModel.BROADCASTING);
      // 3.订阅主题Topic和Tag
      consumer.subscribe("Topic_broadcasting_demo", "*");
      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {
         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               System.out.println(
                     "A----consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();
      System.out.println("消费者启动");
   }
}

image.png

第五节:广播消息消费者B

public class ConsumerB {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_broadcasting_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");
      // 默认均衡轮询消费模式 改为广播模式
      consumer.setMessageModel(MessageModel.BROADCASTING);
      // 3.订阅主题Topic和Tag
      consumer.subscribe("Topic_broadcasting_demo", "*");
      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {
         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               System.out.println(
                     "B----consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();
      System.out.println("消费者启动");
   }
}

image.png第六节:总结

消费模式默认为负载均衡模式

修改消费模式的方法是: consumer.setMessageModel();

设置为广播模式:consumer.setMessageModel(MessageModel.BROADCASTING);

设置负载均衡模式:consumer.setMessageModel(MessageModel.CLUSTERING);


参考:


顺序消息是否支持集群消费和广播消费?


https://help.aliyun.com/knowledge_detail/54357.html


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
126 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
85 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
72 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
64 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
83 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 生产者最佳实践总结
RocketMQ - 生产者最佳实践总结
57 0
|
4月前
|
消息中间件 缓存 API
RocketMQ - 生产者消息发送流程
RocketMQ - 生产者消息发送流程
89 0