RocketMQ的TAG过滤和SQL过滤机制

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 写作目的项目中各个中台都使用同一个DB。而DB下会使用中间件监听binlog转换成MQ消息,而下游的各个中台去MQ去拿自己感兴趣的消息。

TAG

如果使用TAG去获取自己感兴趣的消息,那么对于一条学生表变更binlog,最少要插入三条消息,比如TAG=学生表,比如TAG=UPDATE修改操作,比如TAG=学生状态为1,等等。想到的就三种。。。

所以上面这种方式缺陷还是挺明显的。


SQL过滤

如果使用SQL过滤的方式,我们可以对某些属性进行过滤,自己拼接SQL,灵活性就上来了。


但是我好奇的一点是SQL怎么加到TAG里呢?并且TAG只能支持一个属性值呀。所以接下来从源码和原理的角度进行分析和探讨。


总体来说Tag过滤和SQL过滤如下图所示


7.png


代码展示


本着简单的原则出发


TAG过滤


当producer构建消息时消息时会构造方法里会有TAG的属性,如代码所示,Tag = Creative。


Message msg =
              new Message(
                  "CBeann", // topic
                  "Creative", // tag
                  "OrderID188", // key
                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body


当consumer订阅topic时要想监听Tag = creative的就可以如下图所示


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    //主题,Tag
        consumer.subscribe("CBeann", "Creative");
        consumer.setNamesrvAddr("114.115.208.175:9876");
        consumer.setConsumerGroup("group1");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    // wrong time format 2017_0422_221800
    // consumer.setConsumeTimestamp("20181109221800");
    consumer.registerMessageListener(...)


SQL过滤


与Tag消息不同的是,produccer生产的msg需要放入一些属性,如下代码所示,放入age属性的值为18。


 Message msg =
              new Message(
                  "CBeann", // topic
                  "creative", // tag
                  "OrderID188", // key
                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body
          msg.putUserProperty("age", String.valueOf(18));
          SendResult sendResult = producer.send(msg);


consumer中则不能根据tag过滤了。需要使用MessageSelector


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        MessageSelector messageSelector = MessageSelector.bySql("age >= 5");
        consumer.subscribe("TopicTest", messageSelector);
        //consumer.setNamesrvAddr("114.115.208.175:9876");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumerGroup("group1Sql");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.registerMessageListener()


TAG过滤机制


过滤图


此处以Tag过滤机制为例,消息过滤存在两个地方,一个是服务器端,另一个是消息者端。


8.png


假设消费者订阅的topic=CBeann,tag=creative,creative的hashCode =9527(假设一下)


而topic=CBeann的消息队列里有3条消息

msg1[tag=feed,tagHashCode= 9000]

msg2[tag=creative,tagHashCode= 9527]

msg3[tag=material,tagHashCode= 9527]


当consumer消费者给broker服务器发送获取topic=CBeann,tag=creative请求时,请求会转化为topic=CBeann,tagHashCode=9527。

因此对于上述的3条消息,经过tagHashCode匹配后会把msg2和msg3发送给consumer消息者。

而Consumer消费者会根据tag匹配后留下msg2


源码思路讲解


构建SubscriptionData


首先要了解一点,我们在consumer中设置订阅的topic和tag是什么样的一个数据结构呢?


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("CBeann", "Creative");


其实一直往下跟subcribe方法,最后我们可以定位到FilterAPI#buildSubscriptionData方法。我们传入的topic=CBeann;tag=creative 被封装到SubscriptionData对象里,包括topic、tag、tagHashCode。


9.png


brokder过滤逻辑


那么consumer消费端是存储着topic、tag、tagHashCode。而consumer会把topic和tagHashCode发送给Broker服务器。


当consumer消费者向broker服务端请求获取消息时,broker会从ConsumeQueue获取offset之后的所有如下所示的三元组。ConsumeQueue里的数据三元组如下图所示。


10.png


其实三元组是解析出来的,解析的三个属性就是上图中的offsetPy、sizePy和tagCode。下面我们重点关注一下tagCode


11.png


解析出来的tagsCode如果匹配成功,则保留,如果匹配失败,则continue。


12.png


接下来看一下是怎么匹配呢?如果是*,则全匹配,否则就根据tagsCode匹配。此处不是根据tag匹配,所以会有hash冲突的数据也会匹配到


13.png


结论:此时我们可以看到,broker服务器端是通过hashcode匹配的,哈希冲突的msg会被认为有效消息发送给consumer端。


consumer过滤逻辑


一般这种RPC的都是通过回调实现的,所以看完源码后定位到了一个CallBack方法。该CallBack方法如下所示,拿到Broker发送的消息后在经过processPullResult预处理后才会真正去判断消息是否获取到。


14.png


拿到消息后再经过Tag过滤,如下图所示,则到达我们自定义的处理消息逻辑


15.png


结论:此时我们可以看到,consumer消费者端是通过tag匹配的,二次过滤因为哈希导致消息Tag不准确的问题。


SQL过滤机制


SQL过滤和Tag过滤的消息有什么区别


结论:没区别,就是多了几个属性。比如下面的代码中的age属性


Message msg =
              new Message(
                  "CBeann", // topic
                  "creative", // tag
                  "OrderID188", // key
                  "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // msg body
          msg.putUserProperty("age", String.valueOf(18));


如上面代码所示,msg的tag=creative, 属性age=18。

其实根据Message的构造方法和putUserProperty方法可以发现,最后都是放到Properties里


16.png


构建SubscriptionData


SQL过滤和Tag过滤的consumer端有什么区别?


如下面代码所示,我们构造了一个MessageSelector


 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        MessageSelector messageSelector = MessageSelector.bySql("age >= 5");
        consumer.subscribe("TopicTest", messageSelector);


那么subscribe方法同样是把MessageSelector也是构建成SubscriptionData。不过和Tag那种不同的是,SubscriptionData里面放的是SQL即subString属性和expressionType属性SQL92


17.png


源码跟踪


broker过滤逻辑


SQL过滤和tag过滤都是经历下面的三个阶段,下面我们重点跟一下SQL过滤的代码块messageFilter#isMatchedByCommitLog。


18.png


debug了一下,如下图所示,从buffer里解析出properties来然后和SQL进行校验,返回校验结果


19.png


consumer过滤逻辑


一般这种RPC的都是通过回调实现的,所以看完源码后定位到了一个CallBack方法。该CallBack方法如下所示,拿到Broker发送的消息后在经过processPullResult预处理后才会真正去判断消息是否获取到。

下面的这个图其实在上面也出现过,这个处理方法里并没有SQL过滤的逻辑,因此在consumer不过滤。


20.png


总结


特殊的分表方式

tag作为msg的properties,这个其实映射到数据库分库分表中。比如db的一条记录需要新增一个字段,我们完全可以新增一个setting表,存储这个properties属性。阿里这边的很多项目DB设计都是这么做的。


SQL过滤比Tag过滤慢的原因:比较慢,解析慢

Tag过滤是直接等于,而SQL过滤还要通过表达式计算,SQL复杂的计算必然不如直接等于快。

SQL过滤的时候需要解析properties,本身就是一种资源消耗。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 监控
|
10月前
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
239 1
|
2月前
|
消息中间件 存储 运维
|
2月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
45 0
|
8天前
|
SQL 运维 监控
MSSQL性能调优深度剖析:索引优化策略、SQL语句微调与并发管理机制
在Microsoft SQL Server(MSSQL)的运维与优化实践中,索引优化策略、SQL语句的精细微调以及高效的并发管理机制是提升数据库性能的三大支柱
|
2月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
50 0
|
14天前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
16天前
|
SQL 缓存 Java
Java框架之MyBatis 07-动态SQL-缓存机制-逆向工程-分页插件
Java框架之MyBatis 07-动态SQL-缓存机制-逆向工程-分页插件
|
1月前
|
消息中间件 Apache RocketMQ
消息队列 MQ产品使用合集之是否提供机制检测消费的状态
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之flink sql ROW_NUMBER()回退更新的机制,有相关文档介绍吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
46 1