RocketMQ 第六章:手把手教老婆实现-批量消息生产者和消费者

简介: RocketMQ 第六章:手把手教老婆实现-批量消息生产者和消费者

image.png

RocketMQ使用教程相关系列 目录


目录


第一节:介绍


限制:


第二节:批量消息-生产者和消息者步骤说明


批量消息生产者代码实现步骤


批量消息消费者代码实现步骤


第三节:批量消息生产者-小于4MB


效果:


第四节:批量消息消费者


效果:


第五节:批量消息生产者-大于4MB


消息拆分工具类


生产者


效果:


第六节:批量消息消费者-大于4MB


效果:


第一节:介绍

批量发送消息能显著提高传递小消息的性能。


限制:

应该有相同的topic,相同的waitStoreMsgOK

不能是延时消息

这一批消息的总大小不应超过4MB(默认配置:DefaultMQProducer的maxMessageSize参数,可在broker*.properties配置文件中修改)。

第二节:批量消息-生产者和消息者步骤说明

批量消息生产者代码实现步骤

1.创建消息生产者producer,并制定生产者组名


2.指定Nameserver地址


3.启动producer


4.创建消息对象集合,指定主题Topic、Tag和消息体


5.发送集合消息


6.关闭生产者producer


批量消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名


2.指定Nameserver地址


3.订阅主题Topic和Tag


4.设置回调函数,处理消息


5.启动消费者consumer


注意:消费者的 Topic 和 Tag 需要和生产者保持一致


第三节:批量消息生产者-小于4MB

public class Producer {
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      List<Message> msgs = new ArrayList<Message>();
      // 4.创建消息对象,指定主题Topic、Tag和消息体
      /**
       * 参数一:消息主题Topic
       * 参数二:消息Tag
       * 参数三:消息内容
       */
      for (int i = 0; i < 20; i++) {
         Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes());
         msgs.add(msg);
      }
      // 5.发送消息
      SendResult result = producer.send(msgs);
      // 发送状态
      SendStatus status = result.getSendStatus();
      System.out.println("发送结果:" + result);
      // 线程睡1秒
      TimeUnit.SECONDS.sleep(1);
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者关闭");
   }
}

image.png

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.88.131:9876");
        //消息拉取最大条数
        consumer.setConsumeMessageBatchMaxSize(2);
        //3.订阅主题Topic和Tag
        consumer.subscribe("Topic_batch_demo", "*");
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //接受消息内容
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : msgs) {
                    try {
                        //获取主题
                        String topic = msg.getTopic();
                        //获取标签
                        String tags = msg.getTags();
                        //获取信息
                        byte[] body =  msg.getBody();
                        String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
                                ",result"+ result);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        //重试
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }
}

image.png

public class ListSplitter implements Iterator<List<Message>> {
   private int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
      this.messages = messages;
   }
   public ListSplitter(List<Message> messages, DefaultMQProducer mqProducer) {
      this.messages = messages;
      this.SIZE_LIMIT = mqProducer.getMaxMessageSize();
   }
   @Override
   public boolean hasNext() {
      return currIndex < messages.size();
   }
   @Override
   public List<Message> next() {
      int nextIndex = currIndex;
      int totalSize = 0;
      for (; nextIndex < messages.size(); nextIndex++) {
         Message message = messages.get(nextIndex);
         int tmpSize = message.getTopic().length() + message.getBody().length;
         Map<String, String> properties = message.getProperties();
         for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length();
         }
         // 增加日志的开销20字节
         tmpSize = tmpSize + 20;
         if (tmpSize > SIZE_LIMIT) {
            // 单个消息超过了最大的限制
            // 忽略,否则会阻塞分裂的进程
            if (nextIndex - currIndex == 0) {
               // 假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
               nextIndex++;
            }
            break;
         }
         if (tmpSize + totalSize > SIZE_LIMIT) {
            break;
         } else {
            totalSize += tmpSize;
         }
      }
      List<Message> subList = messages.subList(currIndex, nextIndex);
      currIndex = nextIndex;
      return subList;
   }
   @Override
   public void remove() {
   }
}

生产者

public class ProducerMoreThan4M {
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      List<Message> msgs = new ArrayList<Message>();
      // 4.创建消息对象,指定主题Topic、Tag和消息体
      /**
       * 参数一:消息主题Topic
       * 参数二:消息Tag
       * 参数三:消息内容
       */
      for (int i = 0; i < 20; i++) {
         Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes());
         msgs.add(msg);
      }
      // 5.发送消息
      // 发送批量消息:把大的消息分裂成若干个小的消息
      ListSplitter splitter = new ListSplitter(msgs, producer);
      while (splitter.hasNext()) {
         try {
            List<Message> listItem = splitter.next();
            SendResult result = producer.send(listItem);
            System.out.println("发送结果:" + result);
         } catch (Exception e) {
            e.printStackTrace();
            // 处理error
         }
      }
      // 线程睡1秒
      TimeUnit.SECONDS.sleep(1);
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者关闭");
   }
}

image.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
127 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
85 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
74 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
65 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
83 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 生产者最佳实践总结
RocketMQ - 生产者最佳实践总结
57 0
|
4月前
|
消息中间件 缓存 API
RocketMQ - 生产者消息发送流程
RocketMQ - 生产者消息发送流程
89 0