RocketMQ-消息过滤

本文涉及的产品
云原生网关 MSE Higress,422元/月
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 本文主要讲述了RocketMQ消息过滤相关内容。
目录
  • 业务背景
  • 什么是消息过滤
  • 如何使用消息过滤
  • 浅析消息过滤
  • 后续


业务背景


在电商系统中,运营和产品需要通过大数据系统来查询订单数据和商品数据。

很多公司,一开始,技术架构不是特别完善。于是乎,系统架构图可能是这样子的:


1.png


大数据系统,直接查询mysql数据库。当大数据系统发送一些大SQL查询语句去查询数据时,容易给mysql增加cpu和io压力。


这样子容易给电商系统的查询带来影响。


为了不对mysql数据库带来直接的影响,同时又允许大数据系统查询数据。一般架构比较完善的公司的技术架构如下:


2.png

image.gif

一般都是需要通过数据交换平台,把mysql的数据同步到hive数据库中。


同时,上面的架构也用了RocketMQ。


其实,上面去掉RocketMQ后,也能正常把数据从mysql同步到hive数据库。


那问题来了:为什么要用RocketMQ呢?


我们用反证法来思考一下:如果没RocketMQ,那会如何?首先,我们可以肯定的是:数据肯定能从mysql数据库同步到hive数据库。


但是缺点是什么?


如果不用RocketMQ的情况下,这时,又来一个别的数据团队,也需要电商数据库的数据。这时又需要进行定制开发了。


如果引入RocketMQ后,别的数据团队需要数据,你只需要给他一个Topic即可。


其实引入RocketMQ,主要就是为了方便后续扩展,共享数据。


但上述其实还有一个问题:大数据系统,可能只需要订单数据库的数据。其他表的数据,比如商品数据,它是不需要的。


这个时候,就需要消息的过滤。


消息过滤


  • 在RocketMQ中,一个消息都有且仅有一个标签,生产者发送消息时,消息都有一个标签。在消费者消息信息时,可以消费一个或多个标签的消息。


如何使用?


接下来,需要对RocketMQ如何使用?


public class FilterProducer {
    public static void main(String[] args) throws Exception{
        //生产者组
        DefaultMQProducer producer = new DefaultMQProducer("filter_producer_group");
        //设置nameserver
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();
        for(int i=0;i<10;i++){
            //构建消息
            Message message = new Message("filterTopic",("helloWorld"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            message.putUserProperty("a",String.valueOf(i));
            // 发送消息
            SendResult sendResult = producer.send(message);
            // 打印发送结果
            System.out.println("发送结果:"+sendResult);
        }
        // 关闭生产者
        producer.shutdown();
    }
}


  • 这里给每个消息都增加了key-value。即a属性。


接下来看消费者:


public class FilterConsumer {
    public static void main(String[] args) throws Exception {
        // 消费者组
        DefaultMQPushConsumer consumer =  new DefaultMQPushConsumer("filter_consumer_group");
        //注册nameserver
        consumer.setNamesrvAddr("localhost:9876");
        MessageSelector ms = MessageSelector.bySql("a > 5");
        // 订阅主题
        consumer.subscribe("filterTopic",ms);
        // 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i = 0; i < list.size(); i++) {
                    MessageExt messageExt = list.get(i);
                    String msg = new String(messageExt.getBody());
                    System.out.println("消费消息:"+msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("启动消费者");
    }
}
//运行结果如下:
启动消费者
消费消息:helloWorld8
消费消息:helloWorld7
消费消息:helloWorld9
消费消息:helloWorld6


消息过滤的语法

image.gif

3.png


注意点


  • 在启动消费者时,如果你报错误:


CODE: 1  DESC: The broker does not support consumer to filter message by SQL92


那是因为你启动broker的时候,没开启消息过滤。


如何开启呢?


在broker.conf文件中加入:enablePropertyFilter = true


然后启动的时候,还要指定配置文件broker.conf


nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &


浅析消息过滤


4.png


首先,我们根据代码点进去看看image.gif三个重载的方法subscribe:


5.png


我们看第一种:


subscribe(String topic, String subExpression) 就是直接加标签的这种:


consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");


我们看第二种:


subscribe(String topic, String fullClassName, String filterClassSource)


这种,是自定义一个过滤器实现类的。


我大概写一下:实现MessageFilter接口的match方法


6.png


fullClassName:是指类的全路径 filterClassSource:是指类的java文件路径


  • 1,Broker机器启动多个FilterServer过滤进程
  • 2,Consumer启动后,会想Broker传递一个Java类
  • 3,Consumer从FilterServer拉取消息,FilterServer从Broker拉取消息,按照上传的java类进行过滤,过滤后返回给Consumer


我们看第三种:


subscribe(String topic, MessageSelector messageSelector)


就是我们刚刚写的这种


这一节,就写到这了。


有问题,欢迎留言沟通


6、后续文章


  • RocketMQ-入门(已更新)
  • RocketMQ-架构和角色(已更新)
  • RocketMQ-消息发送(已更新)
  • RocketMQ-消费信息
  • RocketMQ-消费者的广播模式和集群模式(已更新)
  • RocketMQ-顺序消息(已更新)
  • RocketMQ-延迟消息(已更新)
  • RocketMQ-批量消息
  • RocketMQ-过滤消息(已更新)
  • RocketMQ-事务消息(已更新)
  • RocketMQ-消息存储
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主从复制
  • RocketMQ-刷盘机制
  • RocketMQ-幂等性
  • RocketMQ-消息重试
  • RocketMQ-死信队列

...


欢迎各位入(guan)股(zhu),后续文章干货多多。


—本文转载自「大头菜技术」公众号

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
1003 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 存储 数据可视化
【RocketMq-生产者】消息发送者参数详解
首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。 这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
827 0
|
消息中间件 存储 负载均衡
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(下)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
|
消息中间件 Java RocketMQ
【消息中间件】默认RocketMQ消息发送者是如何启动的?
上一篇文章,主要介绍了RocketMQ消息发送-请求与响应,了解了消息发送的请求参数和响应结果,今天我们主要来学习默认消息发送者的源码,看看消息发送主要是做了那哪些事情。
|
消息中间件 负载均衡 算法
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(上)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67989 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2992 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
909 1
5张图带你理解 RocketMQ 顺序消息实现机制
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
425 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
|
消息中间件 缓存 数据库
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
539 0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息