RocketMQ第三章:手把手教老婆实现:普通消息(同步,异步和单向)的生产者和消费者

简介: RocketMQ第三章:手把手教老婆实现:普通消息(同步,异步和单向)的生产者和消费者

image.pngRocketMQ使用教程相关系列 目录


目录


第一节:项目准备


第二节:普通消息-生产者和消息者步骤说明


普通消息生产者代码实现步骤


普通消息消费者代码实现步骤


第三节:普通消息-同步消息


生产者


效果:


消费者


效果:


第四节:普通消息-异步消息


生产者


效果:


消费者:


效果:


分析:


第五节:普通消息-单向消息


生产者


效果:


消费者


效果:


分析:


第六节:总结


第一节:项目准备

rocketmq-client依赖

image.png第二节:普通消息-生产者和消息者步骤说明

普通消息生产者代码实现步骤

1.创建消息生产者producer,并制定生产者组名

2.指定Nameserver地址

3.启动producer

4.创建消息对象 Message,指定主题Topic、Tag和消息体

5.发送消息

6.关闭生产者producer

普通消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名

2.指定Nameserver地址

3.订阅主题Topic和Tag

4.设置回调函数,处理消息

5.启动消费者consumer

注意:消费者的 Topic 和 Tag 需要和生产者保持一致


第三节:普通消息-同步消息

同步消息这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。


生产者

image.png

消费者

public class ConsumerSync {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");
      // 消息拉取最大条数
      consumer.setConsumeMessageBatchMaxSize(2);
      // 3.订阅主题Topic和Tag
      consumer.subscribe("Topic_demo_sync", "*");
      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {
         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               try {
                  // 获取主题
                  String topic = msg.getTopic();
                  // 获取标签
                  String tags = msg.getTags();
                  // 获取信息
                  byte[] body = msg.getBody();
                  String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                  System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
               } catch (UnsupportedEncodingException e) {
                  e.printStackTrace();
               }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();
   }
}1.public class ConsumerSync {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");
      // 消息拉取最大条数
      consumer.setConsumeMessageBatchMaxSize(2);
      // 3.订阅主题Topic和Tag
      consumer.subscribe("Topic_demo_sync", "*");
      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {
         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               try {
                  // 获取主题
                  String topic = msg.getTopic();
                  // 获取标签
                  String tags = msg.getTags();
                  // 获取信息
                  byte[] body = msg.getBody();
                  String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                  System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
               } catch (UnsupportedEncodingException e) {
                  e.printStackTrace();
               }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();
   }
}

image.pngimage.pngimage.png

消费者:

1.public class ConsumerASync {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");
      // 消息拉取最大条数
      consumer.setConsumeMessageBatchMaxSize(2);
      // 3.订阅主题Topic和Tag
      consumer.subscribe("Topic_demo_async", "*");
      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {
         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               try {
                  // 获取主题
                  String topic = msg.getTopic();
                  // 获取标签
                  String tags = msg.getTags();
                  // 获取信息
                  byte[] body = msg.getBody();
                  String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                  System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
               } catch (UnsupportedEncodingException e) {
                  e.printStackTrace();
               }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();
   }
}

image.png

分析:

从消费者代码中可以看出:异步消费者的实现方式和同步消费者实现方式并无区别。

从消费者的结果来看:可以看出异步消费者接收的结果是无序的。

第五节:普通消息-单向消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

生产者

public class Producer {
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      for (int i = 0; i < 20; i++) {
         // 4.创建消息对象,指定主题Topic、Tag和消息体
         /**
         * 参数一:消息主题Topic
         * 参数二:消息Tag
         * 参数三:消息内容
         */
         Message msg = new Message("Topic_demo", "Tag_demo",
               ("Hello 虚竹," + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
         // 5.发送单向消息
         producer.sendOneway(msg);
         System.out.println("发送结果:" + msg);
         // 线程睡1秒
      }
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者关闭");
   }
}

image.png

消费者

1.public class Consumer {
   public static void main(String[] args) throws Exception {
      // 1.创建消费者Consumer,制定消费者组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
      // 2.指定Nameserver地址
      consumer.setNamesrvAddr("192.168.88.131:9876");
      // 消息拉取最大条数
      consumer.setConsumeMessageBatchMaxSize(2);
      // 3.订阅主题Topic和Tag
      consumer.subscribe("Topic_demo", "*");
      // 4.设置回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {
         // 接受消息内容
         @Override
         public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
               try {
                  // 获取主题
                  String topic = msg.getTopic();
                  // 获取标签
                  String tags = msg.getTags();
                  // 获取信息
                  byte[] body = msg.getBody();
                  String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                  System.out.println("Consumer消费信息:topic:" + topic + ",tags:" + tags + ",result:" + result);
               } catch (UnsupportedEncodingException e) {
                  e.printStackTrace();
               }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         }
      });
      // 5.启动消费者consumer
      consumer.start();
   }
}

image.png分析:

从消费结果来看,消费的顺序是无序的


第六节:总结

同步消息、异步消息和单向消息的消费者实现方式是一样的。


同步消息、异步消息和单向消息的区别在于消息的发送方。


异步消息生产者没有返回值,需要使用 SendCallback 接收异步返回结果的回调。


异步消息生产者,在关闭实例之前,建议进行休眠。


单向消息也是没有返回值的,并且它的消费者也是无序消费。


单向消息和异步消息的区别是单向消息不需要 SendCallback 来接收异步返回结果的回调。



相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
20天前
|
消息中间件 监控 Java
RocketMQ 同步发送、异步发送和单向发送,如何选择?
本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。
100 4
|
9天前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
2月前
|
消息中间件 Kafka 数据安全/隐私保护
RabbitMQ异步通信详解
RabbitMQ异步通信详解
80 16
|
3月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
3月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
3月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
3月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
61 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
61 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
53 0
|
3月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
46 0