【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现

简介: 【RocketMQ系列五】消息示例-顺序消息&延迟消息&广播消息的实现

1. 前言

上一篇文章我们介绍了简单消息的实现,本文将主要来介绍顺序消息的实现,顺序消息分为局部顺序消息和全局顺序消息。

顺序消息指的是消费者在消费消息时,按照生产者发送消息的顺序进行消费。即先发送的先消费【FIFO】。

顺序消息分为 全局顺序消息和局部顺序消息。

全局顺序消息就是全局使用一个queue。

局部顺序消息就是 有顺序依赖的消息放在同一个queue中,多个queue并行消费。

2. 局部顺序消息

默认情况下RocketMQ会根据轮询的方式将消息发送到某个broker中的某个队列中,这样的话就不能保证消息是有序的。

比如在购物网站下单场景下:有  1. 创建订单---->2. 订单支付---->3. 订单发货---->4. 订单完成  四条消息。这四条消息逻辑上肯定是有序的。但是如果采用RocketMQ默认的消息投递方式,那么同一个订单,有可能创建订单被投递到了  MessageQueue1,订单支付的话被投递到了MessageQueue2。  由于消息在不同的MessageQueue中,消费者在消费的时候就可能会出现订单支付的消息先于创建订单的消息。

局部顺序消息就是要保证同一笔订单4条消息都放在同一个queue中,这样的话就不会出现订单支付的消息先于创建订单的消息被消费。就像下图所示:

局部顺序消息消费者在消费某个topic的某个队列中的消息的时候是顺序的。消费者使用MessageListenerOrderly类来进行消息监听。

2.1. 定义生产者

  1. 这里定义了名为part_order_topic_test的topic。运行程序之后该topic可以路由到broker-a 以及broker-b 两个broker。

public class OrderProducer {
  // 局部顺序消费,核心就是自己选择Queue,保证需要顺序保障的消息落到同一个队列中
  public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
    DefaultMQProducer defaultMQProducer = new DefaultMQProducer("order_producer_group");
    defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
    defaultMQProducer.start();
    for (int i = 0; i < 10; i++) {
      int orderId = i;
      for (int j = 0; j < 5; j++) {
        // 构建消息体,tags和key 只是做一个简单区分
        Message partOrderMsg = new Message("part_order_topic_test", "order_" + orderId, "KEY_" + orderId, ("局部顺序消息处理_" + orderId + ";step_" + j).getBytes());
        SendResult send = defaultMQProducer.send(partOrderMsg, new MessageQueueSelector() {
          @Override
          //这里的arg参数就是外面的orderId
          public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer orderId = (Integer) arg;
            int index = orderId % mqs.size();
            return mqs.get(index);
          }
        }, orderId);
        System.out.printf("%s%n", send);
      }
    }
    defaultMQProducer.shutdown();
  }
}
  1. 在发送消息的时候实现MessageQueueSelector接口用于在发送消息的时候指定队列。其中, public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 方法有三个参数:其中,mqs表示当前topic所路由的全部队列数,这里就是8个队列,broker-a有4个队列,broker-b有4个队列。msg就是传入的消息体,arg 就是传入的orderId。
  2. 这里根据orderId与队列数求模取余来获取消息应该发送到哪个队列中,这样就保证了相同的orderId的消息会落到同一个队列中
Integer orderId = (Integer) arg;
int index = orderId % mqs.size();
return mqs.get(index);
生产者运行结果(部分截图)

从运行结果可以看出相同orderId的消息被投递到了同一个MessageQueue中,而相同MessageQueue队列天然是有顺序的。

2.2.定义消费者

说完了生产者,接着来说说消费者。消费者的逻辑主要是在消费的时候需要实现 MessageListenerOrderly 类来进行消息监听。核心代码是:

// 2.订阅消费消息
    defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
      @Override
      public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
          System.out.println("消费得到的消息是={}" + msg);
          System.out.println("消息体内容是={}" + new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
      }
    });

这里启动了三个消费者,不管消费者消费的顺序如何,相同的orderId下的5条消息都是被顺序消费的。

3. 碰到的问题

在首次调试的时候出现了一个 broker is full 的错误。这是由于磁盘空间不足导致的,可以通过 df -h 命令查看当前磁盘空间的占用情况,当磁盘空间使用率超过90%的话则会报此错。

4. 全局顺序消息

全局顺序消息是指消费者消费全部消息都是顺序的,只能让所有的消息都发送到同一个MessageQueue中来实现,在高并发场景下会非常影响效率。

5. 广播消息

广播消息是向主题(topic)的所有订阅者发送消息,订阅同一个topic的多个消费者,都能全量收到生产者发送的所有消息。

广播消息的生产者与普通同步消息的生产者实现是一致的,不同的是消费者的消息模式不同。这里给出消费者实现的不同之处。

DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("broadCastGroup");
    defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");
    // 设置消费者的模式是广播模式
    defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
    //从第一位开始消费
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

6. 延迟消息

延迟消息与普通消息的不同之处在于,它们要在指定的时间之后才会被传递。生产者并不会延迟发送消息,而是发送到topic里面,消费者延迟指定的时间进行消费。

6.1. 延迟消息生产者

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("scheduled_group");
    defaultMQProducer.setNamesrvAddr("172.31.186.180:9876");
    defaultMQProducer.start();
    for (int i = 0; i < 100; i++) {
      Message message = new Message("Schedule_topic", ("延迟消息测试" + i).getBytes());
      //设置延迟级别,默认有18个延迟级别,这个消息将延迟10秒消费
      message.setDelayTimeLevel(3);
      defaultMQProducer.send(message);
    }
    System.out.println("所有延迟消息发送完成");
    defaultMQProducer.shutdown();

延迟消息生产者与普通消息生产者主要的区别是延迟消息需要调用 setDelayTimeLevel 方法设置延迟级别,这里设置级别是3,则是延迟10秒。RocketMQ提供了18种延迟级别。可以在 RocketMQ的仪表板中的集群中的broker配置中找到。

延迟消息的消费者与普通消息的消费者相同的。RocketMQ内部通过名为SCHEDULE_TOPIC_XXXX 的topic来存放延迟消息。

7.批量消息

批量发送消息提高了传递消息的性能。官方建议批量消息的总大小不应超过1M,实际不应超过4M。如果超过4M的批量消息需要进行分批处理。同时设置broker的配置参数为4M(在broker的配置文件中修改:maxMessageSize=4194304)。核心代码如下:

//4.创建消息
    List<Message> messageList = new ArrayList<>();
    for (int i = 0; i < 100*100; i++) {
      // 创建消息,指定topic,以及消息体
      messageList.add(new Message("batch_topic", ("飞哥测试批量消息" + i).getBytes()));
    }
    //批量消息消息小于4M的处理
    SendResult send = defaultMQProducer.send(messageList);
    System.out.println(send);

8.过滤消息

使用tag过滤

在大多数情况下,标签是一种简单而有用的设计,可以用来选择你想要的消息。

首先是根据tag来过滤消息,生产者在发送消息的时候指定该消息的tag标签,消费者则可以根据tag来过滤消息。

8.1. 过滤消息生产者

这里定义了三个tag,分别是tagA,tagB以及tagC,生产者在生产消息的时候给每个消息指定不同的tag。

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");
    defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
    defaultMQProducer.start();
    String[] tags = new String[]{"tagA", "tagB", "tagC"};
    for (int i = 0; i < 15; i++) {
      Message message = new Message("TagFilterTest", tags[i % tags.length], ("飞哥tag消息过滤" + tags[i % tags.length]).getBytes());
      SendResult send = defaultMQProducer.send(message);
      System.out.printf("%s%n", send);
    }
    defaultMQProducer.shutdown();

8.2. 过滤消息的消费者

消费者过滤出了标签带有tagA以及tagC的消息进行消费。这里其实是broker将consumer需要的消息推给消费者。

DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");
    defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");
    defaultMQPushConsumer.subscribe("TagFilterTest", "tagA||tagC");
    defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      for (MessageExt msg : msgs) {
        System.out.println("接收到的消息=" + msg);
        System.out.println("接收到的消息体=" + new String(msg.getBody()));
      }
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    defaultMQPushConsumer.start();
    System.out.println("消费者已经启动");

使用SQL过滤

SQL 功能可以通过发送消息时输入的属性进行一些计算,在RocketMQ定义的语法下,可以实现一些有趣的逻辑。

语法

RocketMQ只定义了一些基本的语法类支持这个特性。

1. 数值比较:如 `>`,`>=`,`<=`,`BETWEEN`,`=`;
2. 字符比较:如 `=`,'<>',`IN`;
3. `IS NULL` 或 `IS NOT NULL` ;
4. 逻辑`AND`,`OR`,`NOT`;

常量类型有:

1. 数字,如 123,
2. 字符,如 'abc',必须用单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE` 或 `FALSE`;

SQL过滤生产者

生产者主要设置属性过滤 message.putUserProperty("a", String.valueOf(i)); 表示第一条消息键值对是 a=0,第二条消息键值对是a=1。

DefaultMQProducer defaultMQProducer = new DefaultMQProducer("TagProducer_group");
    defaultMQProducer.setNamesrvAddr("172.31.184.89:9876");
    defaultMQProducer.start();
    String[] tags = new String[]{"tagA", "tagB", "tagC"};
    for (int i = 0; i < 15; i++) {
      Message message = new Message("SQLFilterTest", tags[i % tags.length], ("飞哥sql消息过滤" + tags[i % tags.length]).getBytes());
      message.putUserProperty("a", String.valueOf(i));
      SendResult send = defaultMQProducer.send(message);
      System.out.printf("%s%n", send);
    }
    defaultMQProducer.shutdown();

SQL过滤消费者:

DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("tagConsumer");
    defaultMQPushConsumer.setNamesrvAddr("172.31.184.89:9876");
    defaultMQPushConsumer.subscribe("SQLFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA','tagC'))"+" and (a is null and a between 0 and 3)"));
    defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      for (MessageExt msg : msgs) {
        System.out.println("接收到的消息=" + msg);
        System.out.println("接收到的消息体=" + new String(msg.getBody()));
      }
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    defaultMQPushConsumer.start();
    System.out.println("消费者已经启动");

如果运行报 The broker does not support consumer to filter message by SQL92

则需要修改 broker.conf 文件,增加如下配置:

# 开启对 propertyfilter的支持
enablePropertyFilter = true 
filterSupportRetry = true

然后重启broker。

总结

本文介绍了局部顺序消息,全局顺序消息,广播消息,延迟消息,以及如何批量发送消息和过滤消息。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 JSON Java
|
4月前
|
消息中间件 存储 RocketMQ
2分钟看懂RocketMQ延迟消息核心原理
本文从源码层面解析了RocketMQ延迟消息的实现原理,包括延迟消息的使用、Broker端处理机制以及定时任务对延迟消息的处理流程。
2分钟看懂RocketMQ延迟消息核心原理
|
4月前
|
消息中间件 Kafka Apache
kafka vs rocketmq: 不要只顾着吞吐量而忘了延迟这个指标
这篇文章讨论了Apache RocketMQ和Kafka的对比,强调RocketMQ在低延迟、消息重试与追踪、海量Topic、多租户等方面进行了优化,特别是在小包非批量和大量分区场景下的吞吐量超越Kafka,适合电商和金融领域等高并发、高可靠和高可用场景。
132 0
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
81 0
说说RabbitMQ延迟队列实现原理?
|
5月前
|
消息中间件 存储 RocketMQ
消息队列 MQ使用问题之进行超过3天的延迟消息投递,采用多次投递的策略是否有风险
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ产品使用合集之是否支持任意时间延迟的消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 RocketMQ
【RocketMQ系列四】消息示例-简单消息的实现
【RocketMQ系列四】消息示例-简单消息的实现
43 0
|
6月前
|
消息中间件 数据库
03.RabbitMQ延迟队列
03.RabbitMQ延迟队列
50 0
|
7月前
|
消息中间件
第十五章 RabbitMQ 延迟队列
第十五章 RabbitMQ 延迟队列
42 0
|
2月前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
89 9