解析 RocketMQ 多样消费功能-消息过滤

本文涉及的产品
应用实时监控服务-用户体验监控,每月100OCU免费额度
可观测可视化 Grafana 版,10个用户账号 1个月
函数计算FC,每月15万CU 3个月
简介: 在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。

作者:徒钟


什么是消息过滤


在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。


1.png


就如上图所描述的,生产者会向主题中写入形形色色的消息,有橙色的、黄色的、还有灰色的,而这个主题有两个消费者,第一个消费者只想要消费橙色的消息,第二个消费者只想要消费黄色的和灰色的消息,那么这个效果就需要通过消息过滤来实现。


消息过滤的应用场景


我们以常见的电商场景为例,来看看消息过滤在实际应用过程中起到的作用。


2.png


电商平台在设计时,往往存在系统拆分细、功能模块多、调用链路长、系统依赖复杂等特点,消息中间件在其中就起到了异步解耦、异步通信的作用,特别是在双十一这样的流量高峰期,消息中间件还起到了削峰填谷的作用。


而在消息中间件使用方面,电商平台因为覆盖的领域众多会产生很多的消息主题,消息收发量也随着交易量和订阅系统的增加而增大。随着业务系统的水平拆解和垂直增加,相关的消息呈现出高订阅比和低投递比的状态,比如一个主题订阅比是 300:1,即 1 个主题的订阅者有 300 个,但是投递比却只有 15:300,即一条消息只有 15 个订阅者需要投递,其他 285 个订阅者全部过滤了这条消息。那解决这些场景,就需要使用到消息过滤。


举例来说,在交易链路中,一个订单的处理流程分为下单、扣减库存、支付等流程,这个流程会涉及订单操作和状态机的变化。下游的系统,如积分、物流、通知、实时计算等,他们会通过消息中间件监听订单的变更消息。但是它们对订单不同操作和状态的消息有着不同的需求,如积分系统只关心下单消息,只要下单就扣减积分。物流系统只关系支付和收货消息,支付就发起物流订单,收货就完成物流订单。实时计算系统会统计订单不同状态的数据,所有消息都要接收。


试想一下如果没有消息过滤这个功能,我们会怎么支持以上消息过滤的功能呢?能想到的一般有以下两个方案:


1. 通过将主题进行拆分,将不同的消息发送到不同主题上。


对于生产者来说,这意味着消费者有多少消费场景,就需要新建多少个 Topic,这无疑会给生产者带来巨大的维护成本。对消费者来说,消费者有可能需要同时订阅多个 Topic,这同样带来了很大的维护成本。另外,消息被主题拆分后,他们之间的消费顺序就无法保证了,比如对于一个订单,它的下单、支付等操作显然是要被顺序处理的。


2. 消费者收到消息后,根据消息体对消息按照规则硬编码自行过滤。


这意味着所有的消息都会推送到消费者端进行计算,这无疑增加了网络带宽,也增加了消费者在内存和 CPU 上的消耗。


有了消息过滤这个功能,生产者只需向一个主题进行投递消息,服务端根据订阅规则进行计算,并按需投递给每个消费者。这样对生产者和消费者的代码维护就非常友好,同时也能很大程度上降低网络带宽,同时减少消费者的内存占用和 CPU 的消耗。


RocketMQ 消息过滤的模式


RocketMQ 是众多消息中间件中为数不多支持消息过滤的系统。这也是其作为业务集成消息首选方案的重要基础之一。


在功能层面,RocketMQ 支持两种过滤方式,Tag 标签过滤和 SQL 属性过滤,下面我来这两个过滤方式使用方式和技术原理进行介绍


Tag 标签过滤


  • 功能介绍

Tag 标签过滤方式是 RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的 Tag 标签进行匹配。生产者在发送消息时,设置消息的 Tag 标签,消费者按需指定已有的 Tag 标签来进行匹配订阅。


  • 过滤语法

1. 单 Tag 匹配:过滤表达式为目标 Tag,表示只有消息标签为指定目标 Tag 的消息符合匹配条件,会被发送给消费者;


2. 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费;


3. 全 Tag 匹配:使用星号(*)作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。


  • 使用方式

1. 发送消息,设置 Tag 标签


Message message = provider.newMessageBuilder()
    .setTopic("TopicA")
    .setKeys("messageKey")
    //设置消息Tag,用于消费端根据指定Tag过滤消息
    .setTag("TagA")
    .setBody("messageBody".getBytes())
    .build();


2. 订阅消息,匹配单个 Tag 标签


//只订阅消息标签为“TagA”的消息
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);


3. 订阅消息,匹配多个 Tag 标签


//只订阅消息标签为“TagA”、“TagB”或“TagC”的消息
FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);

4. 订阅消息,匹配所有 Tag 标签,即不过滤


//使用Tag标签过滤消息,订阅所有消息
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);

  • 技术原理


3.png


RocketMQ 在存储消息的时候,是通过 Append-Only 的方式将所有主题的消息都写在同一个 CommitLog 文件中,这可以有效的提升了消息的写入速率。为了消费时能够快速检索消息,它会在后台启动异步方式将消息所在位点、消息的大小,以及消息的标签哈希值存储到 ConsumeQueue 索引文件中。将标签存储到这个索引文件中,就是为了在通过标签进行消息过滤的时候,可以在索引层面就可以获取到消息的标签,不需要从 CommitLog 文件中读取,这样就减少消息读取产生的系统 IO 和内存开销。标签存储哈希值,主要是为了保证 ConsumeQueue 索引文件能够定长处理,这样可以有效较少存储空间,提升这个索引文件的读取效率。


整个 Tag 标签过滤的流程如下:


  1. 生产者对消息打上自己的业务标签,发送给我们的服务端 Broker;
  2. Broker 将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中;
  3. 消费者启动后,定时向 Broker 发送心跳请求,将订阅关系上传到 Broker 端,Broker 将订阅关系及标签的哈希值保存在内存中;
  4. 消费者向 Broker 拉取消息,Broker 会通过订阅关系和队列去 ConsumeQueue 中检索消息,将订阅关系中的标签哈希值和消息中的标签哈希值做比较,如果匹配就返回给消费者;
  5. 消费者收到消息后,会将消息中的标签值和本地订阅关系中标签值做精确匹配,匹配成功才会交给消费线程进行消费。


SQL 属性过滤


  • 功能介绍

SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置S QL 语法的过滤表达式过滤多个属性。


  • 过滤语法

1. 数值比较:>, >=, <, <=, BETWEEN, =


2. 字符比较:=, <>, IN


3. 判空运算:IS NULL or IS NOT NULL


4. 逻辑运算:AND, OR, NOT


  • 使用方式

1. 发送消息,设置属性


Message message = provider.newMessageBuilder()
    .setTopic("TopicA")
    .setKeys("messageKey")
    //设置消息属性,用于消费端根据指定属性过滤消息。
    .addProperty("Channel", "TaoBao")
    .addProperty("Price", "5999")
    .setBody("messageBody".getBytes())
    .build();


2. 订阅消息,匹配单个属性


FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);


3. 订阅消息,匹配多个属性


FilterExpression filterExpression = new FilterExpression("Channel='TaoBao' AND Price>5000", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);


4. 订阅消息,匹配所有属性


FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);


  • 技术原理
     

4.png


由于 SQL 过滤需要将消息的属性和 SQL 表达式进行匹配,这会对服务端的内存和 CPU 增加很大的开销。为了降低这个开销,RocketMQ 采用了布隆过滤器进行优化。当 Broker 在收到消息后,会预先对所有的订阅者进行 SQL 匹配,并将匹配结果生成布隆过滤器的位图存储在 ConsumeQueueExt 索引扩展文件中。在消费时,Broker 就会使用使用这个过滤位图,通过布隆过滤器对消费者的 SQL 进行过滤,这可以避免消息在一定不匹配的时候,不需要去 CommitLog 中将消息的属性拉取到内存进行计算,可以有效地降低属性和 SQL 进行匹配的消息量,减少服务端的内存和 CPU 开销。


整个 SQL 过滤的处理流程如下:


  1. 消费者通过心跳上传订阅关系,Broker 判断如果是 SQL 过滤,就会通过布隆过滤器的算法,生成这个 SQL 对应的布隆过滤匹配参数; 
  2. 生产者对消息设置上自己的业务属性,发送给我们的服务端 Broker; 
  3. Broker 收到后将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中。在写入之前,会将这条消息的属性和当前所有订阅关系中 SQL 进行匹配,如果通过,则将 SQL 对应的布隆过滤匹配参数合并成一个完整的布隆过滤位图; 
  4. 消费者消费消息的时候,Broker 会先获取预先生成的布隆过滤匹配参数,然后通过布隆过滤器对 ConsumeQueueExt 的布隆过滤位图和消费者的布隆过滤匹配参数进行匹配; 
  5. 布隆过滤器返回匹配成功只能说明消息属性和 SQL 可能匹配,Broker 还需要从 CommitLog 中将消息属性取出来,再做一次和 SQL 的精确匹配,这个时候匹配成功才会将消息投递给消费者 


差异及对比

5.png


最佳实践


主题划分及消息定义


主题和消息背后的本质其实就是业务实体的属性、行为或状态发生了变化。只有发生了变化,生产者才会往主题里面发送消息,消费者才需要监听这些的消息,去完成自身的业务逻辑。


那么如何做好主题划分和消息定义呢,我们以订单实体为例,来看看主题划分和消息定义的原则。


6.png


  • 主题划分的原则
     

1. 业务领域是否一致


不同的业务领域背后有不同的业务实体,其属性、行为及状态的定义天差地别。比如商品和订单,他们属于两个完全独立且不同的领域,就不能定义成同一个主题。


2. 业务场景是否一致


同一个业务领域不同的业务场景或者技术场景,不能定义一个主题。如订单流程和订单缓存刷新都和订单有关系,但是订单缓存刷新可能需要被不同的流程触发,放在一起就会导致部分场景订单缓存不刷新的情况。


3. 消息类型是否一致


同一个业务领域和业务场景,对消息类型有不同需求,比如订单处理过程中,我们需要发送一个事务消息,同时也需要发送一个定时消息,那么这两个消息就不能共用一个主题。


  • 消息定义的原则
     

1. 无标签无属性


对于业务实体极其简单的消息,是可以不需要定义标签和属性,比如 MySQLBinlog 的同步。所有的消费者都没有消息过滤需求的,也无需定义标签和属性。


2. 如何定义标签


标签过滤是 RocketMQ 中使用最简单,且过滤性能最好的一种过滤方式。为了发挥其巨大的优势,可以考虑优先使用。在使用时,我们需要确认这个字段在业务实体和业务流程中是否是唯一定义的,并且它是被绝大多数消费者作为过滤条件的,那么可以将它作为标签来定义。比如订单中有下单渠道和订单操作这两个字段,并且在单次消息发送过程中都是唯一定义,但是订单操作被绝大多数消费者应用为过滤条件,那么它最合适作为标签。


3. 如何定义属性


属性过滤的开销相对比较大,所以只有在标签过滤无法满足时,才推荐使用。比如标签已经被其他字段占用,或者过滤条件不可枚举,需要支持多属性复杂逻辑的过滤,就只能使用属性过滤了。


保持订阅关系一致


订阅关系一致是指同一个消费者组下面的所有的消费者所订阅的 Topic 和过滤表达式都必须完全一致。


7.png


正如上图所示,一个消费者组包含两个消费者,他们同时订阅了 Topic-A 这个主题,但是消费者一订阅的是 Tag-A 这个标签的消息,消费者二订阅的是 Tag-B 这个标签的消息,那么他们两者的订阅关系就存在不一致。


  • 导致的问题:


那么订阅关系不一致会导致什么问题呢?


1. 频繁复杂均衡


在 RocketMQ 实现中,消费者客户端默认每 30 秒向 Broker 发送一次心跳,这个过程会上传订阅关系,Broker 发现变化了就进行订阅关系覆盖,同时会触发客户端进行负载均衡。那么订阅关系不一致的两个客户端会交叉上传自己的订阅关系,从而导致客户端频繁进行负载均衡。


2. 消费速率下降


客户端触发了负载均衡,会导致消费者所持有的消费队列发生变化,出现间断性暂停消息拉取,导致整体消费速率下降,甚至出现消息积压。


3. 消息重复消费


客户端触发了负载均衡,会导致已经消费成功的消息因为消费队列发生变化而放弃向 Broker 提交消费位点。Broker 会认为这条消息没有消费成功而重新向消费者发起投递,从而导致消息重复消费。


4. 消息未消费


订阅关系的不一致,会有两种场景会导致消息未消费。第一种是消费者的订阅关系和 Broker 当前订阅关系不一致,导致消息在 Broker 服务端就被过滤了。第二种是消费者的订阅关系和 Broker 当前的虽然一致,但是 Broker 投递给了其他的消费者,被其他消费者本地过滤了。


  • 使用的建议

在消息过滤使用中,有以下建议:


1. 不要共用消费者组


不同业务系统千万不要使用同一个消费者组订阅同一个主题的消息。一般不同业务系统由不同团队维护,很容易发生一个团队修改了订阅关系而没有通知到其他团队,从而导致订阅关系不一致的情况。


2. 不频繁变更订阅关系


频繁变更订阅关系这种情况比较少,但也存在部分用户实现在线规则或者动态参数来设置订阅关系。这有可能导致订阅关系发生变化,触发客户端负载均衡的情况。


3. 变更做好风险评估


由于业务的发展,需求的变更,订阅关系不可能一直不变,但是变更订阅关系过程中,需要考虑整体发布完成需要的总体时间,以及发布过程中订阅关系不一致而对业务可能带来的风险。


4. 消费做好幂等处理


不管是订阅关系不一致,还是客户端上下线,都会导致消息的重复投递,所以消息幂等处理永远是消息消费的黄金法则。在业务逻辑中,消费者需要保证对已经处理过的消息直接返回成功,避免二次消费对业务造成的损害,如果返回失败就会导致消息一直重复投递直到进死信。


到此,本文关于消息过滤的分享就到此结束了,非常感谢大家能够花费宝贵的时间阅读,有不对的地方麻烦指正,感谢大家对 RocketMQ 的关注,希望大家能够多多参与社区的讨论和贡献。


如果您对 RocketMQ 感兴趣,也欢迎您扫描下方二维码加入钉钉群一起沟通交流~


8.png


点击此处,进入官网了解更多详情~

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
20天前
|
数据可视化 数据挖掘 BI
团队管理者必读:高效看板类协同软件的功能解析
在现代职场中,团队协作的效率直接影响项目成败。看板类协同软件通过可视化界面,帮助团队清晰规划任务、追踪进度,提高协作效率。本文介绍看板类软件的优势,并推荐五款优质工具:板栗看板、Trello、Monday.com、ClickUp 和 Asana,助力团队实现高效管理。
45 2
|
12天前
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
5天前
|
存储 数据库 对象存储
新版本发布:查询更快,兼容更强,TDengine 3.3.4.3 功能解析
经过 TDengine 研发团队的精心打磨,TDengine 3.3.4.3 版本正式发布。作为时序数据库领域的领先产品,TDengine 一直致力于为用户提供高效、稳定、易用的解决方案。本次版本更新延续了一贯的高标准,为用户带来了多项实用的新特性,并对系统性能进行了深度优化。
16 3
|
5天前
|
供应链 数据可视化 数据挖掘
企业服务品牌深度解析:销售易、用友、白码功能与特色对比
在企业服务领域,销售易、用友、白码等品牌凭借独特的产品和解决方案占据重要地位。销售易专注于CRM,提供客户管理、销售自动化、市场营销等功能,提升销售效率与客户满意度。用友作为领先的企业服务提供商,涵盖ERP、财务管理、人力资源管理等,助力企业资源优化配置。白码则以低代码开发平台为核心,支持快速构建业务应用,具备高度可定制化和易于维护的特点。三者各具特色,共同推动企业数字化转型。
|
2月前
|
安全 Java 测试技术
🎉Java零基础:全面解析枚举的强大功能
【10月更文挑战第19天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
124 60
|
16天前
|
消息中间件 Kafka 应用服务中间件
仙讯畅通无阻:探索MQ阵法的强大功能
MQ(消息队列)起源于1993年IBM推出的MQSeries,后更名为WebSphere MQ和IBM MQ。常见的MQ系统包括:IBM MQ、Apache ActiveMQ、RabbitMQ、Apache Kafka、RocketMQ和Amazon SQS。这些系统广泛应用于异步通信、系统解耦和削峰填谷等场景,确保消息的可靠传递。在修真界,MQ阵法如同神秘的传信工具,能在仙人修炼时安全传递重要信息,保障仙讯畅通无阻。
38 4
|
16天前
|
小程序 安全 搜索推荐
陪玩小程序的搭建解析与功能需求
陪玩小程序是为玩家提供专业陪玩服务的应用,嵌入社交或游戏平台,具备智能匹配、实时聊天、预约服务等功能,支持便捷高效的游戏体验。源码交付时需提供详细文档、技术支持及定制开发服务,确保客户能顺利维护和升级。选择陪玩小程序时应关注功能需求、用户体验、安全性和成本效益,以确保最佳使用效果。
37 0
|
1月前
|
存储 安全 数据安全/隐私保护
深入解析iOS 14隐私保护功能:用户数据安全的新里程碑
随着数字时代的到来,个人隐私保护成为全球关注的焦点。苹果公司在最新的iOS 14系统中引入了一系列创新的隐私保护功能,旨在为用户提供更透明的数据使用信息和更强的控制权。本文将深入探讨iOS 14中的几项关键隐私功能,包括App跟踪透明性、简化的隐私设置以及增强的系统安全性,分析它们如何共同作用以提升用户的隐私保护水平。
94 3
|
2月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
45 3
|
2月前
|
机器学习/深度学习 存储 人工智能
政务部门人工智能OCR智能化升级:3大技术架构与4项核心功能解析
本项目针对政务服务数字化需求,建设智能文档处理平台,利用OCR、信息抽取和深度学习技术,实现文件自动解析、分类、比对与审核,提升效率与准确性。平台强调本地部署,确保数据安全,解决低质量扫描件、复杂表格等痛点,降低人工成本与错误率,助力智慧政务发展。

相关产品

  • 云消息队列 MQ
  • 推荐镜像

    更多