RocketMQ第四章:手把手教老婆实现-顺序消息生产者和顺序消息消费者

简介: RocketMQ第四章:手把手教老婆实现-顺序消息生产者和顺序消息消费者

image.pngRocketMQ使用教程相关系列 目录


目录


第一节:介绍


顺序消息含义介绍


原理解析


第二节:顺序消息-生产者和消息者步骤说明


顺序消息生产者代码实现步骤


顺序消息消费者代码实现步骤


第三节:顺序消息生产者


效果:


第四节:顺序消息消费者


效果:


第一节:介绍

顺序消息含义介绍

顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。


原理解析

在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);


而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

image.png当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。


第二节:顺序消息-生产者和消息者步骤说明

顺序消息生产者代码实现步骤

1.创建消息生产者producer并制定生产者组名


2.指定Nameserver地址


3.启动producer


4.创建消息对象,指定主题Topic、Tag和消息体


5.发送消息,选择的send方法有三个参数:


* 参数一:消息对象

* 参数二:消息队列的选择器

* 参数三:选择队列的业务标识 6.关闭生产者producer

顺序消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名


2.指定Nameserver地址


3.订阅主题Topic和Tag


4.设置回调函数,处理消息:与普通消息的差别,这里用的是MessageListenerOrderly


5.启动消费者consumer


注意:消费者的 Topic 和 Tag 需要和生产者保持一致


第三节:顺序消息生产者

public class Producer {
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      for (int i = 0; i < 20; i++) {
         // 4.创建消息对象,指定主题Topic、Tag和消息体
         Message msg = new Message("Topic_order_demo", "Tag_order_demo",
               ("Hello 虚竹,这是顺序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
         // 5.发送消息
         /**
         * 参数一:消息对象
         * 参数二:消息队列的选择器
         * 参数三:选择队列的业务标识
         */
         SendResult result = producer.send(msg, new MessageQueueSelector() {
            /**
            *
            * @param mqs:队列集合
            * @param msg:消息对象
            * @param arg:业务标识的参数
            * @return
            */
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
               Integer index = (Integer) arg;
               return mqs.get(index);
            }
         }, 1);
         System.out.println("发送结果:" + msg.toString());
      }
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者关闭");
   }
}

image.png

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.88.131:9876");
        //消息拉取最大条数
        consumer.setConsumeMessageBatchMaxSize(2);
        //3.订阅主题Topic和Tag
        consumer.subscribe("Topic_order_demo", "*");
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //接受消息内容
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : msgs) {
                    try {
                        //获取主题
                        String topic = msg.getTopic();
                        //获取标签
                        String tags = msg.getTags();
                        //获取信息
                        byte[] body =  msg.getBody();
                        String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
                                ",result"+ result);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        //重试
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }
}

image.png

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 存储 负载均衡
Rabbitmq direct模式保证一个队列只对应一个消费者
Rabbitmq direct模式保证一个队列只对应一个消费者
109 0
|
3月前
|
消息中间件 SQL Java
RabbitMQ之消费者可靠性
RabbitMQ之消费者可靠性
|
3月前
|
消息中间件 Java
Java操作RabbitMQ单一生产-消费者模式
Java操作RabbitMQ单一生产-消费者模式
29 0
|
4天前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
RabbitMQ入门指南(九):消费者可靠性
|
21天前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
12 1
|
21天前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
24 1
|
3月前
|
消息中间件 API RocketMQ
你的RocketMQ消费者组(Consumer Group)在查看时显示为离线,这可能是由于消费者组的状态没有被正确更新
你的RocketMQ消费者组(Consumer Group)在查看时显示为离线,这可能是由于消费者组的状态没有被正确更新【1月更文挑战第10天】【1月更文挑战第49篇】
198 5
|
4月前
|
消息中间件 存储 安全
mq 消费者监听经常断会出现丢消息的问题吗
在消息队列(MQ)系统中,消费者监听经常断开可能会导致消息丢失的问题,具体取决于消息队列系统的设计和配置,以及你的应用程序的处理方式。以下是一些可能导致消息丢失问题的情况: 1. **消费者断开连接:** 如果消费者监听过程中发生意外断开,例如网络故障、消费者应用程序崩溃等,那么在断开连接的瞬间,可能存在未被消费的消息。 2. **消息确认机制:** 消息队列通常提供消息确认机制,确保消息在被成功处理后才被从队列中移除。如果你的消费者应用程序在处理消息时没有发送确认,或者确认机制配置不正确,可能导致消息在被处理前被从队列中移除,从而丢失。 3. **持久化设置:** 消息队列通常提供持久
|
5月前
|
Java 测试技术 Spring
10 Spring集成MQTT(生产者与消费者)
10 Spring集成MQTT(生产者与消费者)
72 0
|
5月前
|
消息中间件 存储 缓存
RocketMQ 生产者那些事
这篇文章,我们从源码的角度探寻 RocketMQ Producer 的实现机制。
RocketMQ 生产者那些事