RocketMQ第四章:手把手教老婆实现-顺序消息生产者和顺序消息消费者

简介: RocketMQ第四章:手把手教老婆实现-顺序消息生产者和顺序消息消费者

image.pngRocketMQ使用教程相关系列 目录


目录


第一节:介绍


顺序消息含义介绍


原理解析


第二节:顺序消息-生产者和消息者步骤说明


顺序消息生产者代码实现步骤


顺序消息消费者代码实现步骤


第三节:顺序消息生产者


效果:


第四节:顺序消息消费者


效果:


第一节:介绍

顺序消息含义介绍

顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。


原理解析

在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);


而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

image.png当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。


第二节:顺序消息-生产者和消息者步骤说明

顺序消息生产者代码实现步骤

1.创建消息生产者producer并制定生产者组名


2.指定Nameserver地址


3.启动producer


4.创建消息对象,指定主题Topic、Tag和消息体


5.发送消息,选择的send方法有三个参数:


* 参数一:消息对象

* 参数二:消息队列的选择器

* 参数三:选择队列的业务标识 6.关闭生产者producer

顺序消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名


2.指定Nameserver地址


3.订阅主题Topic和Tag


4.设置回调函数,处理消息:与普通消息的差别,这里用的是MessageListenerOrderly


5.启动消费者consumer


注意:消费者的 Topic 和 Tag 需要和生产者保持一致


第三节:顺序消息生产者

public class Producer {
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      for (int i = 0; i < 20; i++) {
         // 4.创建消息对象,指定主题Topic、Tag和消息体
         Message msg = new Message("Topic_order_demo", "Tag_order_demo",
               ("Hello 虚竹,这是顺序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
         // 5.发送消息
         /**
         * 参数一:消息对象
         * 参数二:消息队列的选择器
         * 参数三:选择队列的业务标识
         */
         SendResult result = producer.send(msg, new MessageQueueSelector() {
            /**
            *
            * @param mqs:队列集合
            * @param msg:消息对象
            * @param arg:业务标识的参数
            * @return
            */
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
               Integer index = (Integer) arg;
               return mqs.get(index);
            }
         }, 1);
         System.out.println("发送结果:" + msg.toString());
      }
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者关闭");
   }
}

image.png

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.88.131:9876");
        //消息拉取最大条数
        consumer.setConsumeMessageBatchMaxSize(2);
        //3.订阅主题Topic和Tag
        consumer.subscribe("Topic_order_demo", "*");
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //接受消息内容
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : msgs) {
                    try {
                        //获取主题
                        String topic = msg.getTopic();
                        //获取标签
                        String tags = msg.getTags();
                        //获取信息
                        byte[] body =  msg.getBody();
                        String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
                                ",result"+ result);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        //重试
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }
}

image.png

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
3月前
|
消息中间件 负载均衡 API
RocketMQ生产者负载均衡(轮询机制)核心原理
文章深入分析了RocketMQ生产者的负载均衡机制,特别是轮询机制的实现原理,揭示了如何通过`ThreadLocal`技术和消息队列的选播策略来确保消息在多个队列之间均衡发送,以及如何通过灵活的API支持自定义负载均衡策略。
|
3月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
3月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
64 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
61 0
|
3月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
56 0
|
3月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
47 0
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
758 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67758 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
667 1
5张图带你理解 RocketMQ 顺序消息实现机制