解析 RocketMQ 多样消费功能-消息过滤

本文涉及的产品
可观测可视化 Grafana 版,10个用户账号 1个月
可观测监控 Prometheus 版,每月50GB免费额度
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。

作者:徒钟


什么是消息过滤


在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。


1.png


就如上图所描述的,生产者会向主题中写入形形色色的消息,有橙色的、黄色的、还有灰色的,而这个主题有两个消费者,第一个消费者只想要消费橙色的消息,第二个消费者只想要消费黄色的和灰色的消息,那么这个效果就需要通过消息过滤来实现。


消息过滤的应用场景


我们以常见的电商场景为例,来看看消息过滤在实际应用过程中起到的作用。


2.png


电商平台在设计时,往往存在系统拆分细、功能模块多、调用链路长、系统依赖复杂等特点,消息中间件在其中就起到了异步解耦、异步通信的作用,特别是在双十一这样的流量高峰期,消息中间件还起到了削峰填谷的作用。


而在消息中间件使用方面,电商平台因为覆盖的领域众多会产生很多的消息主题,消息收发量也随着交易量和订阅系统的增加而增大。随着业务系统的水平拆解和垂直增加,相关的消息呈现出高订阅比和低投递比的状态,比如一个主题订阅比是 300:1,即 1 个主题的订阅者有 300 个,但是投递比却只有 15:300,即一条消息只有 15 个订阅者需要投递,其他 285 个订阅者全部过滤了这条消息。那解决这些场景,就需要使用到消息过滤。


举例来说,在交易链路中,一个订单的处理流程分为下单、扣减库存、支付等流程,这个流程会涉及订单操作和状态机的变化。下游的系统,如积分、物流、通知、实时计算等,他们会通过消息中间件监听订单的变更消息。但是它们对订单不同操作和状态的消息有着不同的需求,如积分系统只关心下单消息,只要下单就扣减积分。物流系统只关系支付和收货消息,支付就发起物流订单,收货就完成物流订单。实时计算系统会统计订单不同状态的数据,所有消息都要接收。


试想一下如果没有消息过滤这个功能,我们会怎么支持以上消息过滤的功能呢?能想到的一般有以下两个方案:


1. 通过将主题进行拆分,将不同的消息发送到不同主题上。


对于生产者来说,这意味着消费者有多少消费场景,就需要新建多少个 Topic,这无疑会给生产者带来巨大的维护成本。对消费者来说,消费者有可能需要同时订阅多个 Topic,这同样带来了很大的维护成本。另外,消息被主题拆分后,他们之间的消费顺序就无法保证了,比如对于一个订单,它的下单、支付等操作显然是要被顺序处理的。


2. 消费者收到消息后,根据消息体对消息按照规则硬编码自行过滤。


这意味着所有的消息都会推送到消费者端进行计算,这无疑增加了网络带宽,也增加了消费者在内存和 CPU 上的消耗。


有了消息过滤这个功能,生产者只需向一个主题进行投递消息,服务端根据订阅规则进行计算,并按需投递给每个消费者。这样对生产者和消费者的代码维护就非常友好,同时也能很大程度上降低网络带宽,同时减少消费者的内存占用和 CPU 的消耗。


RocketMQ 消息过滤的模式


RocketMQ 是众多消息中间件中为数不多支持消息过滤的系统。这也是其作为业务集成消息首选方案的重要基础之一。


在功能层面,RocketMQ 支持两种过滤方式,Tag 标签过滤和 SQL 属性过滤,下面我来这两个过滤方式使用方式和技术原理进行介绍


Tag 标签过滤


  • 功能介绍

Tag 标签过滤方式是 RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的 Tag 标签进行匹配。生产者在发送消息时,设置消息的 Tag 标签,消费者按需指定已有的 Tag 标签来进行匹配订阅。


  • 过滤语法

1. 单 Tag 匹配:过滤表达式为目标 Tag,表示只有消息标签为指定目标 Tag 的消息符合匹配条件,会被发送给消费者;


2. 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费;


3. 全 Tag 匹配:使用星号(*)作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。


  • 使用方式

1. 发送消息,设置 Tag 标签


Message message = provider.newMessageBuilder()
    .setTopic("TopicA")
    .setKeys("messageKey")
    //设置消息Tag,用于消费端根据指定Tag过滤消息
    .setTag("TagA")
    .setBody("messageBody".getBytes())
    .build();


2. 订阅消息,匹配单个 Tag 标签


//只订阅消息标签为“TagA”的消息
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);


3. 订阅消息,匹配多个 Tag 标签


//只订阅消息标签为“TagA”、“TagB”或“TagC”的消息
FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);

4. 订阅消息,匹配所有 Tag 标签,即不过滤


//使用Tag标签过滤消息,订阅所有消息
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);

  • 技术原理


3.png


RocketMQ 在存储消息的时候,是通过 Append-Only 的方式将所有主题的消息都写在同一个 CommitLog 文件中,这可以有效的提升了消息的写入速率。为了消费时能够快速检索消息,它会在后台启动异步方式将消息所在位点、消息的大小,以及消息的标签哈希值存储到 ConsumeQueue 索引文件中。将标签存储到这个索引文件中,就是为了在通过标签进行消息过滤的时候,可以在索引层面就可以获取到消息的标签,不需要从 CommitLog 文件中读取,这样就减少消息读取产生的系统 IO 和内存开销。标签存储哈希值,主要是为了保证 ConsumeQueue 索引文件能够定长处理,这样可以有效较少存储空间,提升这个索引文件的读取效率。


整个 Tag 标签过滤的流程如下:


  1. 生产者对消息打上自己的业务标签,发送给我们的服务端 Broker;
  2. Broker 将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中;
  3. 消费者启动后,定时向 Broker 发送心跳请求,将订阅关系上传到 Broker 端,Broker 将订阅关系及标签的哈希值保存在内存中;
  4. 消费者向 Broker 拉取消息,Broker 会通过订阅关系和队列去 ConsumeQueue 中检索消息,将订阅关系中的标签哈希值和消息中的标签哈希值做比较,如果匹配就返回给消费者;
  5. 消费者收到消息后,会将消息中的标签值和本地订阅关系中标签值做精确匹配,匹配成功才会交给消费线程进行消费。


SQL 属性过滤


  • 功能介绍

SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置S QL 语法的过滤表达式过滤多个属性。


  • 过滤语法

1. 数值比较:>, >=, <, <=, BETWEEN, =


2. 字符比较:=, <>, IN


3. 判空运算:IS NULL or IS NOT NULL


4. 逻辑运算:AND, OR, NOT


  • 使用方式

1. 发送消息,设置属性


Message message = provider.newMessageBuilder()
    .setTopic("TopicA")
    .setKeys("messageKey")
    //设置消息属性,用于消费端根据指定属性过滤消息。
    .addProperty("Channel", "TaoBao")
    .addProperty("Price", "5999")
    .setBody("messageBody".getBytes())
    .build();


2. 订阅消息,匹配单个属性


FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);


3. 订阅消息,匹配多个属性


FilterExpression filterExpression = new FilterExpression("Channel='TaoBao' AND Price>5000", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);


4. 订阅消息,匹配所有属性


FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);


  • 技术原理
     

4.png


由于 SQL 过滤需要将消息的属性和 SQL 表达式进行匹配,这会对服务端的内存和 CPU 增加很大的开销。为了降低这个开销,RocketMQ 采用了布隆过滤器进行优化。当 Broker 在收到消息后,会预先对所有的订阅者进行 SQL 匹配,并将匹配结果生成布隆过滤器的位图存储在 ConsumeQueueExt 索引扩展文件中。在消费时,Broker 就会使用使用这个过滤位图,通过布隆过滤器对消费者的 SQL 进行过滤,这可以避免消息在一定不匹配的时候,不需要去 CommitLog 中将消息的属性拉取到内存进行计算,可以有效地降低属性和 SQL 进行匹配的消息量,减少服务端的内存和 CPU 开销。


整个 SQL 过滤的处理流程如下:


  1. 消费者通过心跳上传订阅关系,Broker 判断如果是 SQL 过滤,就会通过布隆过滤器的算法,生成这个 SQL 对应的布隆过滤匹配参数; 
  2. 生产者对消息设置上自己的业务属性,发送给我们的服务端 Broker; 
  3. Broker 收到后将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中。在写入之前,会将这条消息的属性和当前所有订阅关系中 SQL 进行匹配,如果通过,则将 SQL 对应的布隆过滤匹配参数合并成一个完整的布隆过滤位图; 
  4. 消费者消费消息的时候,Broker 会先获取预先生成的布隆过滤匹配参数,然后通过布隆过滤器对 ConsumeQueueExt 的布隆过滤位图和消费者的布隆过滤匹配参数进行匹配; 
  5. 布隆过滤器返回匹配成功只能说明消息属性和 SQL 可能匹配,Broker 还需要从 CommitLog 中将消息属性取出来,再做一次和 SQL 的精确匹配,这个时候匹配成功才会将消息投递给消费者 


差异及对比

5.png


最佳实践


主题划分及消息定义


主题和消息背后的本质其实就是业务实体的属性、行为或状态发生了变化。只有发生了变化,生产者才会往主题里面发送消息,消费者才需要监听这些的消息,去完成自身的业务逻辑。


那么如何做好主题划分和消息定义呢,我们以订单实体为例,来看看主题划分和消息定义的原则。


6.png


  • 主题划分的原则
     

1. 业务领域是否一致


不同的业务领域背后有不同的业务实体,其属性、行为及状态的定义天差地别。比如商品和订单,他们属于两个完全独立且不同的领域,就不能定义成同一个主题。


2. 业务场景是否一致


同一个业务领域不同的业务场景或者技术场景,不能定义一个主题。如订单流程和订单缓存刷新都和订单有关系,但是订单缓存刷新可能需要被不同的流程触发,放在一起就会导致部分场景订单缓存不刷新的情况。


3. 消息类型是否一致


同一个业务领域和业务场景,对消息类型有不同需求,比如订单处理过程中,我们需要发送一个事务消息,同时也需要发送一个定时消息,那么这两个消息就不能共用一个主题。


  • 消息定义的原则
     

1. 无标签无属性


对于业务实体极其简单的消息,是可以不需要定义标签和属性,比如 MySQLBinlog 的同步。所有的消费者都没有消息过滤需求的,也无需定义标签和属性。


2. 如何定义标签


标签过滤是 RocketMQ 中使用最简单,且过滤性能最好的一种过滤方式。为了发挥其巨大的优势,可以考虑优先使用。在使用时,我们需要确认这个字段在业务实体和业务流程中是否是唯一定义的,并且它是被绝大多数消费者作为过滤条件的,那么可以将它作为标签来定义。比如订单中有下单渠道和订单操作这两个字段,并且在单次消息发送过程中都是唯一定义,但是订单操作被绝大多数消费者应用为过滤条件,那么它最合适作为标签。


3. 如何定义属性


属性过滤的开销相对比较大,所以只有在标签过滤无法满足时,才推荐使用。比如标签已经被其他字段占用,或者过滤条件不可枚举,需要支持多属性复杂逻辑的过滤,就只能使用属性过滤了。


保持订阅关系一致


订阅关系一致是指同一个消费者组下面的所有的消费者所订阅的 Topic 和过滤表达式都必须完全一致。


7.png


正如上图所示,一个消费者组包含两个消费者,他们同时订阅了 Topic-A 这个主题,但是消费者一订阅的是 Tag-A 这个标签的消息,消费者二订阅的是 Tag-B 这个标签的消息,那么他们两者的订阅关系就存在不一致。


  • 导致的问题:


那么订阅关系不一致会导致什么问题呢?


1. 频繁复杂均衡


在 RocketMQ 实现中,消费者客户端默认每 30 秒向 Broker 发送一次心跳,这个过程会上传订阅关系,Broker 发现变化了就进行订阅关系覆盖,同时会触发客户端进行负载均衡。那么订阅关系不一致的两个客户端会交叉上传自己的订阅关系,从而导致客户端频繁进行负载均衡。


2. 消费速率下降


客户端触发了负载均衡,会导致消费者所持有的消费队列发生变化,出现间断性暂停消息拉取,导致整体消费速率下降,甚至出现消息积压。


3. 消息重复消费


客户端触发了负载均衡,会导致已经消费成功的消息因为消费队列发生变化而放弃向 Broker 提交消费位点。Broker 会认为这条消息没有消费成功而重新向消费者发起投递,从而导致消息重复消费。


4. 消息未消费


订阅关系的不一致,会有两种场景会导致消息未消费。第一种是消费者的订阅关系和 Broker 当前订阅关系不一致,导致消息在 Broker 服务端就被过滤了。第二种是消费者的订阅关系和 Broker 当前的虽然一致,但是 Broker 投递给了其他的消费者,被其他消费者本地过滤了。


  • 使用的建议

在消息过滤使用中,有以下建议:


1. 不要共用消费者组


不同业务系统千万不要使用同一个消费者组订阅同一个主题的消息。一般不同业务系统由不同团队维护,很容易发生一个团队修改了订阅关系而没有通知到其他团队,从而导致订阅关系不一致的情况。


2. 不频繁变更订阅关系


频繁变更订阅关系这种情况比较少,但也存在部分用户实现在线规则或者动态参数来设置订阅关系。这有可能导致订阅关系发生变化,触发客户端负载均衡的情况。


3. 变更做好风险评估


由于业务的发展,需求的变更,订阅关系不可能一直不变,但是变更订阅关系过程中,需要考虑整体发布完成需要的总体时间,以及发布过程中订阅关系不一致而对业务可能带来的风险。


4. 消费做好幂等处理


不管是订阅关系不一致,还是客户端上下线,都会导致消息的重复投递,所以消息幂等处理永远是消息消费的黄金法则。在业务逻辑中,消费者需要保证对已经处理过的消息直接返回成功,避免二次消费对业务造成的损害,如果返回失败就会导致消息一直重复投递直到进死信。


到此,本文关于消息过滤的分享就到此结束了,非常感谢大家能够花费宝贵的时间阅读,有不对的地方麻烦指正,感谢大家对 RocketMQ 的关注,希望大家能够多多参与社区的讨论和贡献。


如果您对 RocketMQ 感兴趣,也欢迎您扫描下方二维码加入钉钉群一起沟通交流~


8.png


点击此处,进入官网了解更多详情~

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
77 3
|
2月前
|
机器学习/深度学习 人工智能 自然语言处理
Hugging Face 论文平台 Daily Papers 功能全解析
【9月更文挑战第23天】Hugging Face 是一个专注于自然语言处理领域的开源机器学习平台。其推出的 Daily Papers 页面旨在帮助开发者和研究人员跟踪 AI 领域的最新进展,展示经精心挑选的高质量研究论文,并提供个性化推荐、互动交流、搜索、分类浏览及邮件提醒等功能,促进学术合作与知识共享。
|
23天前
|
安全 Java 测试技术
🎉Java零基础:全面解析枚举的强大功能
【10月更文挑战第19天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
103 60
|
1月前
|
Web App开发 前端开发 测试技术
Selenium 4新特性解析:关联定位器及其他创新功能
【10月更文挑战第6天】Selenium 是一个强大的自动化测试工具,广泛用于Web应用程序的测试。随着Selenium 4的发布,它引入了许多新特性和改进,使得编写和维护自动化脚本变得更加容易。本文将深入探讨Selenium 4的一些关键新特性,特别是关联定位器(Relative Locators),以及其他一些重要的创新功能。
164 2
|
1月前
|
消息中间件 存储 监控
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
RocketMQ消息重试机制解析!
|
20天前
|
供应链 安全 BI
CRM系统功能深度解析:为何这些平台排名靠前
本文深入解析了市场上排名靠前的CRM系统,如纷享销客、用友CRM、金蝶CRM、红圈CRM和销帮帮CRM,探讨了它们在功能性、用户体验、集成能力、数据安全和客户支持等方面的优势,以及如何满足企业的关键需求,助力企业实现数字化转型和业务增长。
|
24天前
|
数据管理 Nacos 开发者
"Nacos架构深度解析:一篇文章带你掌握业务层四大核心功能,服务注册、配置管理、元数据与健康检查一网打尽!"
【10月更文挑战第23天】Nacos 是一个用于服务注册发现和配置管理的平台,支持动态服务发现、配置管理、元数据管理和健康检查。其业务层包括服务注册与发现、配置管理、元数据管理和健康检查四大核心功能。通过示例代码展示了如何在业务层中使用Nacos,帮助开发者构建高可用、动态扩展的微服务生态系统。
68 0
|
2月前
|
移动开发 Android开发 数据安全/隐私保护
移动应用与系统的技术演进:从开发到操作系统的全景解析随着智能手机和平板电脑的普及,移动应用(App)已成为人们日常生活中不可或缺的一部分。无论是社交、娱乐、购物还是办公,移动应用都扮演着重要的角色。而支撑这些应用运行的,正是功能强大且复杂的移动操作系统。本文将深入探讨移动应用的开发过程及其背后的操作系统机制,揭示这一领域的技术演进。
本文旨在提供关于移动应用与系统技术的全面概述,涵盖移动应用的开发生命周期、主要移动操作系统的特点以及它们之间的竞争关系。我们将探讨如何高效地开发移动应用,并分析iOS和Android两大主流操作系统的技术优势与局限。同时,本文还将讨论跨平台解决方案的兴起及其对移动开发领域的影响。通过这篇技术性文章,读者将获得对移动应用开发及操作系统深层理解的钥匙。
|
1月前
|
Web App开发 存储 前端开发
前端开发必备:requestAnimationFrame、setInterval、setTimeout——功能解析与优劣对比
前端开发必备:requestAnimationFrame、setInterval、setTimeout——功能解析与优劣对比
142 0

相关产品

  • 云消息队列 MQ
  • 推荐镜像

    更多
    下一篇
    无影云桌面