RocketMQ系列(六)批量发送与过滤

简介: 今天我们再来看看RocketMQ的另外两个小功能,消息的批量发送和过滤。这两个小功能提升了我们使用RocketMQ的效率。

今天我们再来看看RocketMQ的另外两个小功能,消息的批量发送和过滤。这两个小功能提升了我们使用RocketMQ的效率。

批量发送

以前我们发送消息的时候,都是一个一个的发送,这样效率比较低下。能不能一次发送多个消息呢?当然是可以的,RocketMQ为我们提供了这样的功能。但是它也有一些使用的条件:

  • 同一批发送的消息的Topic必须相同;
  • 同一批消息的waitStoreMsgOK 必须相同;
  • 批量发送的消息不支持延迟,就是上一节说的延迟消息;
  • 同一批次的消息,大小不能超过1MiB;

好了,只要我们满足上面的这些限制,就可以使用批量发送了,我们来看看发送端的代码吧,

@Test
public void producerBatch() throws Exception {

    List<Message> messages = new ArrayList<>();
    for (int i = 0;i<3;i++) {
        MessageExt message = new MessageExt();
        message.setTopic("cluster-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());
        messages.add(message);
    }
    SendResult sendResult = defaultMQProducer.send(messages);
    System.out.println("sendResult:" + sendResult.getSendStatus().toString());
}
  • 其实批量发送很简单,我们只是把消息放到一个List当中,然后统一的调用send方法发送就可以了。

再来看看消费端的代码,

@Bean(initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer pushConsumer()  {
    try {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultMQPushConsumer");
        consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
        consumer.subscribe("cluster-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("msgs.size():"+msgs.size());
                if (msgs != null && msgs.size() > 0) {
                    for (MessageExt msg : msgs) {
                        System.out.println(new String(msg.getBody()));
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        return consumer;
    }catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}
  • 消费端的代码没有任何的变化,正常的接收消息就可以了,我们只是打印出了msgs.size(),看看一次接收一个消息,还是一次可以批量的接收多个消息。

我们启动项目,批量发送一下,看看效果吧,

发送端的日志如下:

sendResult:SEND_OK

发送成功,看来我们批量发送的3个消息都进入到了队列中,再看看消费端,是一次消费一个,还是一次消费3个,如下:

msgs.size():1
this is batchMQ,my NO is 0---Mon Jun 15 09:31:04 CST 2020
msgs.size():1
this is batchMQ,my NO is 1---Mon Jun 15 09:31:04 CST 2020
msgs.size():1
this is batchMQ,my NO is 2---Mon Jun 15 09:31:04 CST 2020

看样子是一次只消费了一个消息,那么能不能一次消费3个消息呢?当然是可以的,不过要进行特殊的设置,

consumer.setConsumeMessageBatchMaxSize(5);

在消费端,我们设置批量消费消息的数量是5,这个值默认是1。我们再看看消费端的日志,

msgs.size():3
this is batchMQ,my NO is 0---Mon Jun 15 09:35:47 CST 2020
this is batchMQ,my NO is 1---Mon Jun 15 09:35:47 CST 2020
this is batchMQ,my NO is 2---Mon Jun 15 09:35:47 CST 2020

这次一次消费了3个消息,如果消息比较多的话,最大一次能消费5个。这就是RocketMQ的批量发送和批量消费。

消息过滤

其实我们在大多数情况下,使用tag标签就能够很好的实现消息过滤。虽然tag标签咱们并没有过多的介绍,其实也很好理解,就是一个子Topic的概念,咱们在构建消息message的时候,message.setTags("xxx")。然后在消费的时候,订阅Topic的时候,也可以指定订阅的tag,

consumer.subscribe("cluster-topic", "*");

看到那个"*"了吗?它就是订阅的tag,"*"代表全部的tag,如果您想订阅其中的一个或几个,可以使用这种方式"tagA || tagB || tagC",这是订阅了cluster-topic下的3个tag,其他的tag是不会被消费的。

这里我们所说的消息过滤比tag要高级很多,是可以支持sql的,怎么样?高级吧。比如:我们订阅"a > 5 and b = 'abc'"的消息,如下图:

image-20200615101201605.png

但是,RocketMQ毕竟不是数据库,它只能支持一些基础的SQL语句,并不是所有的SQL都支持,

  • 数字型的支持,>, >=, <, <=, BETWEEN, =
  • 字符串支持,=, <>, IN
  • IS NULL或者IS NOT NULL
  • 逻辑判断,ANDORNOT

字段的类型也只是简单的几种,

  • 数字型,支持123,543.123,整型、浮点都可以;
  • 字符串,必须使用单引号''括起来;
  • 空值,NULL;
  • 布尔型,TRUE或者FALSE;

并且对消费者的类型也有一定的限制,只能使用push consumer才可以进行消息过滤。好了,说了这么多了,我们看看怎么使用吧,消费端和生产端都要进行相应的改造,先看看生产端吧,

@Test
public void producerBatch() throws Exception {

    List<Message> messages = new ArrayList<>();
    for (int i = 0;i<3;i++) {
        MessageExt message = new MessageExt();
        message.setTopic("cluster-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());

        int a = i+4;
        message.putUserProperty("a",String.valueOf(a));

        messages.add(message);
    }
    SendResult sendResult = defaultMQProducer.send(messages);
    System.out.println("sendResult:" + sendResult.getSendStatus().toString());
}

我们在之前批量发送的基础上进行了修改,定义了a的值,等于i+4,这样循环3次,a的值就是4,5,6。然后调用message.putUserProperty("a",String.valueOf(a))注意,在使用消息过滤的时候,这些附加的条件属性都是通过putUserProperty方法进行设置。这里,我们设置了a的值。再看看消费端,

consumer.subscribe("cluster-topic", MessageSelector.bySql("a > 5"));

消费端,整体上没有变化,只是在订阅的方法中,使用MessageSelector.bySql("a > 5"),进行了条件的过滤。有的小伙伴可能会有疑问,我既想用sql过滤又想用tag过滤怎么办?当然也是可以,我们可以使用MessageSelector.bySql("a > 5").byTag("xx),byTag和bySql不分前后,怎么样,很强大吧。我们运行一下程序,看看效果吧。

我们启动一下服务,报错了,怎么回事?错误信息如下:

The broker does not support consumer to filter message by SQL92

队列不支持过滤消息,我们查询了RocketMQ源码中的BrokerConfig类,这个类就是对broker的一些设置,其中发现了这两个属性,

// whether do filter when retry.
private boolean filterSupportRetry = false;
private boolean enablePropertyFilter = false;
  • filterSupportRetry是在重试的时候,是否支持filter;
  • enablePropertyFilter,这个就是是否支持过滤消息的属性;

我们把这两个属性在broker的配置文件改为true吧,如下:

filterSupportRetry=true
enablePropertyFilter=true

然后,再重新部署一下我们两主两从的集群环境。环境部署完以后,我们再重启应用,没有报错。在生产端发送一下消息看看吧,

sendResult:SEND_OK

生产端发送消息没有问题,说明3个消息都发送成功了。再看看消费端的日志,

msgs.size():1
this is batchMQ,my NO is 2---Mon Jun 15 10:59:37 CST 2020

只消费了一个消息,并且这个消息中i的值是2,那么a的值就是2+4=6,它是>5的,满足SQL的条件,所以被消费掉了。这完全符合我们的预期。

总结

今天的两个小功能还是比较有意思的,但里边也有需要注意的地方,

  • 消息的批量发送,只要我们满足它的条件,然后使用List发送就可以了;批量消费,默认的消费个数是1,我们可以调整它的值,这样就可以一次消费多个消息了;
  • 过滤消息中,最大的坑就是队列的配置里,需要设置enablePropertyFilter=true,否则消费端在启动的时候报不支持SQL的错误;

我们在使用的时候,多加留意就可以了,有问题,评论区留言吧~

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java RocketMQ
SpringBoot整合RocketMQ发送批量消息
SpringBoot整合RocketMQ发送批量消息
|
存储 消息中间件 Java
SpringBoot整合RocketMQ发送延时消息
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息
1100 0
|
消息中间件 缓存 负载均衡
【消息中间件】RocketMQ的默认发送流程,你简单的回答下?
今天我们就开始学习下默认消息发送流程,学习他的实现思路,也帮助我们工作中,遇到了问题不会手足无措。
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2748 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
存储 消息中间件 固态存储
RocketMQ 消息存储和发送性能保证|学习笔记
快速学习 RocketMQ 消息存储和发送性能保证
RocketMQ 消息存储和发送性能保证|学习笔记
|
消息中间件 存储 Java
MQ系列5:RocketMQ消息的发送模式
MQ系列5:RocketMQ消息的发送模式
416 0
MQ系列5:RocketMQ消息的发送模式
|
消息中间件 算法 数据可视化
SpringBoot整合RocketMQ发送顺序消息
严格按照消息的发送顺序进行消费的消息。默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列,而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的,如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性
|
消息中间件 Unix Java
SpringBoot整合RocketMQ发送事务消息
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
1134 1
|
消息中间件 Java 数据处理
SpringBoot整合RocketMQ发送普通消息
RocketMQ是一个统一消息引擎、轻量级数据处理平台
398 0
|
消息中间件 Java RocketMQ
使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息