解析 RocketMQ 多样消费功能-消息过滤

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
注册配置 MSE Nacos/ZooKeeper,118元/月
可观测链路 OpenTelemetry 版,每月50GB免费额度
简介: 在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。

作者:徒钟


什么是消息过滤


在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。


1.png


就如上图所描述的,生产者会向主题中写入形形色色的消息,有橙色的、黄色的、还有灰色的,而这个主题有两个消费者,第一个消费者只想要消费橙色的消息,第二个消费者只想要消费黄色的和灰色的消息,那么这个效果就需要通过消息过滤来实现。


消息过滤的应用场景


我们以常见的电商场景为例,来看看消息过滤在实际应用过程中起到的作用。


2.png


电商平台在设计时,往往存在系统拆分细、功能模块多、调用链路长、系统依赖复杂等特点,消息中间件在其中就起到了异步解耦、异步通信的作用,特别是在双十一这样的流量高峰期,消息中间件还起到了削峰填谷的作用。


而在消息中间件使用方面,电商平台因为覆盖的领域众多会产生很多的消息主题,消息收发量也随着交易量和订阅系统的增加而增大。随着业务系统的水平拆解和垂直增加,相关的消息呈现出高订阅比和低投递比的状态,比如一个主题订阅比是 300:1,即 1 个主题的订阅者有 300 个,但是投递比却只有 15:300,即一条消息只有 15 个订阅者需要投递,其他 285 个订阅者全部过滤了这条消息。那解决这些场景,就需要使用到消息过滤。


举例来说,在交易链路中,一个订单的处理流程分为下单、扣减库存、支付等流程,这个流程会涉及订单操作和状态机的变化。下游的系统,如积分、物流、通知、实时计算等,他们会通过消息中间件监听订单的变更消息。但是它们对订单不同操作和状态的消息有着不同的需求,如积分系统只关心下单消息,只要下单就扣减积分。物流系统只关系支付和收货消息,支付就发起物流订单,收货就完成物流订单。实时计算系统会统计订单不同状态的数据,所有消息都要接收。


试想一下如果没有消息过滤这个功能,我们会怎么支持以上消息过滤的功能呢?能想到的一般有以下两个方案:


1. 通过将主题进行拆分,将不同的消息发送到不同主题上。


对于生产者来说,这意味着消费者有多少消费场景,就需要新建多少个 Topic,这无疑会给生产者带来巨大的维护成本。对消费者来说,消费者有可能需要同时订阅多个 Topic,这同样带来了很大的维护成本。另外,消息被主题拆分后,他们之间的消费顺序就无法保证了,比如对于一个订单,它的下单、支付等操作显然是要被顺序处理的。


2. 消费者收到消息后,根据消息体对消息按照规则硬编码自行过滤。


这意味着所有的消息都会推送到消费者端进行计算,这无疑增加了网络带宽,也增加了消费者在内存和 CPU 上的消耗。


有了消息过滤这个功能,生产者只需向一个主题进行投递消息,服务端根据订阅规则进行计算,并按需投递给每个消费者。这样对生产者和消费者的代码维护就非常友好,同时也能很大程度上降低网络带宽,同时减少消费者的内存占用和 CPU 的消耗。


RocketMQ 消息过滤的模式


RocketMQ 是众多消息中间件中为数不多支持消息过滤的系统。这也是其作为业务集成消息首选方案的重要基础之一。


在功能层面,RocketMQ 支持两种过滤方式,Tag 标签过滤和 SQL 属性过滤,下面我来这两个过滤方式使用方式和技术原理进行介绍


Tag 标签过滤


  • 功能介绍

Tag 标签过滤方式是 RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的 Tag 标签进行匹配。生产者在发送消息时,设置消息的 Tag 标签,消费者按需指定已有的 Tag 标签来进行匹配订阅。


  • 过滤语法

1. 单 Tag 匹配:过滤表达式为目标 Tag,表示只有消息标签为指定目标 Tag 的消息符合匹配条件,会被发送给消费者;


2. 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费;


3. 全 Tag 匹配:使用星号(*)作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。


  • 使用方式

1. 发送消息,设置 Tag 标签


Message message = provider.newMessageBuilder()
    .setTopic("TopicA")
    .setKeys("messageKey")
    //设置消息Tag,用于消费端根据指定Tag过滤消息
    .setTag("TagA")
    .setBody("messageBody".getBytes())
    .build();


2. 订阅消息,匹配单个 Tag 标签


//只订阅消息标签为“TagA”的消息
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);


3. 订阅消息,匹配多个 Tag 标签


//只订阅消息标签为“TagA”、“TagB”或“TagC”的消息
FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);

4. 订阅消息,匹配所有 Tag 标签,即不过滤


//使用Tag标签过滤消息,订阅所有消息
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);

  • 技术原理


3.png


RocketMQ 在存储消息的时候,是通过 Append-Only 的方式将所有主题的消息都写在同一个 CommitLog 文件中,这可以有效的提升了消息的写入速率。为了消费时能够快速检索消息,它会在后台启动异步方式将消息所在位点、消息的大小,以及消息的标签哈希值存储到 ConsumeQueue 索引文件中。将标签存储到这个索引文件中,就是为了在通过标签进行消息过滤的时候,可以在索引层面就可以获取到消息的标签,不需要从 CommitLog 文件中读取,这样就减少消息读取产生的系统 IO 和内存开销。标签存储哈希值,主要是为了保证 ConsumeQueue 索引文件能够定长处理,这样可以有效较少存储空间,提升这个索引文件的读取效率。


整个 Tag 标签过滤的流程如下:


  1. 生产者对消息打上自己的业务标签,发送给我们的服务端 Broker;
  2. Broker 将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中;
  3. 消费者启动后,定时向 Broker 发送心跳请求,将订阅关系上传到 Broker 端,Broker 将订阅关系及标签的哈希值保存在内存中;
  4. 消费者向 Broker 拉取消息,Broker 会通过订阅关系和队列去 ConsumeQueue 中检索消息,将订阅关系中的标签哈希值和消息中的标签哈希值做比较,如果匹配就返回给消费者;
  5. 消费者收到消息后,会将消息中的标签值和本地订阅关系中标签值做精确匹配,匹配成功才会交给消费线程进行消费。


SQL 属性过滤


  • 功能介绍

SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置S QL 语法的过滤表达式过滤多个属性。


  • 过滤语法

1. 数值比较:>, >=, <, <=, BETWEEN, =


2. 字符比较:=, <>, IN


3. 判空运算:IS NULL or IS NOT NULL


4. 逻辑运算:AND, OR, NOT


  • 使用方式

1. 发送消息,设置属性


Message message = provider.newMessageBuilder()
    .setTopic("TopicA")
    .setKeys("messageKey")
    //设置消息属性,用于消费端根据指定属性过滤消息。
    .addProperty("Channel", "TaoBao")
    .addProperty("Price", "5999")
    .setBody("messageBody".getBytes())
    .build();


2. 订阅消息,匹配单个属性


FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);


3. 订阅消息,匹配多个属性


FilterExpression filterExpression = new FilterExpression("Channel='TaoBao' AND Price>5000", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);


4. 订阅消息,匹配所有属性


FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);


  • 技术原理
     

4.png


由于 SQL 过滤需要将消息的属性和 SQL 表达式进行匹配,这会对服务端的内存和 CPU 增加很大的开销。为了降低这个开销,RocketMQ 采用了布隆过滤器进行优化。当 Broker 在收到消息后,会预先对所有的订阅者进行 SQL 匹配,并将匹配结果生成布隆过滤器的位图存储在 ConsumeQueueExt 索引扩展文件中。在消费时,Broker 就会使用使用这个过滤位图,通过布隆过滤器对消费者的 SQL 进行过滤,这可以避免消息在一定不匹配的时候,不需要去 CommitLog 中将消息的属性拉取到内存进行计算,可以有效地降低属性和 SQL 进行匹配的消息量,减少服务端的内存和 CPU 开销。


整个 SQL 过滤的处理流程如下:


  1. 消费者通过心跳上传订阅关系,Broker 判断如果是 SQL 过滤,就会通过布隆过滤器的算法,生成这个 SQL 对应的布隆过滤匹配参数; 
  2. 生产者对消息设置上自己的业务属性,发送给我们的服务端 Broker; 
  3. Broker 收到后将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中。在写入之前,会将这条消息的属性和当前所有订阅关系中 SQL 进行匹配,如果通过,则将 SQL 对应的布隆过滤匹配参数合并成一个完整的布隆过滤位图; 
  4. 消费者消费消息的时候,Broker 会先获取预先生成的布隆过滤匹配参数,然后通过布隆过滤器对 ConsumeQueueExt 的布隆过滤位图和消费者的布隆过滤匹配参数进行匹配; 
  5. 布隆过滤器返回匹配成功只能说明消息属性和 SQL 可能匹配,Broker 还需要从 CommitLog 中将消息属性取出来,再做一次和 SQL 的精确匹配,这个时候匹配成功才会将消息投递给消费者 


差异及对比

5.png


最佳实践


主题划分及消息定义


主题和消息背后的本质其实就是业务实体的属性、行为或状态发生了变化。只有发生了变化,生产者才会往主题里面发送消息,消费者才需要监听这些的消息,去完成自身的业务逻辑。


那么如何做好主题划分和消息定义呢,我们以订单实体为例,来看看主题划分和消息定义的原则。


6.png


  • 主题划分的原则
     

1. 业务领域是否一致


不同的业务领域背后有不同的业务实体,其属性、行为及状态的定义天差地别。比如商品和订单,他们属于两个完全独立且不同的领域,就不能定义成同一个主题。


2. 业务场景是否一致


同一个业务领域不同的业务场景或者技术场景,不能定义一个主题。如订单流程和订单缓存刷新都和订单有关系,但是订单缓存刷新可能需要被不同的流程触发,放在一起就会导致部分场景订单缓存不刷新的情况。


3. 消息类型是否一致


同一个业务领域和业务场景,对消息类型有不同需求,比如订单处理过程中,我们需要发送一个事务消息,同时也需要发送一个定时消息,那么这两个消息就不能共用一个主题。


  • 消息定义的原则
     

1. 无标签无属性


对于业务实体极其简单的消息,是可以不需要定义标签和属性,比如 MySQLBinlog 的同步。所有的消费者都没有消息过滤需求的,也无需定义标签和属性。


2. 如何定义标签


标签过滤是 RocketMQ 中使用最简单,且过滤性能最好的一种过滤方式。为了发挥其巨大的优势,可以考虑优先使用。在使用时,我们需要确认这个字段在业务实体和业务流程中是否是唯一定义的,并且它是被绝大多数消费者作为过滤条件的,那么可以将它作为标签来定义。比如订单中有下单渠道和订单操作这两个字段,并且在单次消息发送过程中都是唯一定义,但是订单操作被绝大多数消费者应用为过滤条件,那么它最合适作为标签。


3. 如何定义属性


属性过滤的开销相对比较大,所以只有在标签过滤无法满足时,才推荐使用。比如标签已经被其他字段占用,或者过滤条件不可枚举,需要支持多属性复杂逻辑的过滤,就只能使用属性过滤了。


保持订阅关系一致


订阅关系一致是指同一个消费者组下面的所有的消费者所订阅的 Topic 和过滤表达式都必须完全一致。


7.png


正如上图所示,一个消费者组包含两个消费者,他们同时订阅了 Topic-A 这个主题,但是消费者一订阅的是 Tag-A 这个标签的消息,消费者二订阅的是 Tag-B 这个标签的消息,那么他们两者的订阅关系就存在不一致。


  • 导致的问题:


那么订阅关系不一致会导致什么问题呢?


1. 频繁复杂均衡


在 RocketMQ 实现中,消费者客户端默认每 30 秒向 Broker 发送一次心跳,这个过程会上传订阅关系,Broker 发现变化了就进行订阅关系覆盖,同时会触发客户端进行负载均衡。那么订阅关系不一致的两个客户端会交叉上传自己的订阅关系,从而导致客户端频繁进行负载均衡。


2. 消费速率下降


客户端触发了负载均衡,会导致消费者所持有的消费队列发生变化,出现间断性暂停消息拉取,导致整体消费速率下降,甚至出现消息积压。


3. 消息重复消费


客户端触发了负载均衡,会导致已经消费成功的消息因为消费队列发生变化而放弃向 Broker 提交消费位点。Broker 会认为这条消息没有消费成功而重新向消费者发起投递,从而导致消息重复消费。


4. 消息未消费


订阅关系的不一致,会有两种场景会导致消息未消费。第一种是消费者的订阅关系和 Broker 当前订阅关系不一致,导致消息在 Broker 服务端就被过滤了。第二种是消费者的订阅关系和 Broker 当前的虽然一致,但是 Broker 投递给了其他的消费者,被其他消费者本地过滤了。


  • 使用的建议

在消息过滤使用中,有以下建议:


1. 不要共用消费者组


不同业务系统千万不要使用同一个消费者组订阅同一个主题的消息。一般不同业务系统由不同团队维护,很容易发生一个团队修改了订阅关系而没有通知到其他团队,从而导致订阅关系不一致的情况。


2. 不频繁变更订阅关系


频繁变更订阅关系这种情况比较少,但也存在部分用户实现在线规则或者动态参数来设置订阅关系。这有可能导致订阅关系发生变化,触发客户端负载均衡的情况。


3. 变更做好风险评估


由于业务的发展,需求的变更,订阅关系不可能一直不变,但是变更订阅关系过程中,需要考虑整体发布完成需要的总体时间,以及发布过程中订阅关系不一致而对业务可能带来的风险。


4. 消费做好幂等处理


不管是订阅关系不一致,还是客户端上下线,都会导致消息的重复投递,所以消息幂等处理永远是消息消费的黄金法则。在业务逻辑中,消费者需要保证对已经处理过的消息直接返回成功,避免二次消费对业务造成的损害,如果返回失败就会导致消息一直重复投递直到进死信。


到此,本文关于消息过滤的分享就到此结束了,非常感谢大家能够花费宝贵的时间阅读,有不对的地方麻烦指正,感谢大家对 RocketMQ 的关注,希望大家能够多多参与社区的讨论和贡献。


如果您对 RocketMQ 感兴趣,也欢迎您扫描下方二维码加入钉钉群一起沟通交流~


8.png


点击此处,进入官网了解更多详情~

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
存储 弹性计算 大数据
阿里云服务器怎么样?云服务器ECS功能、租用费用全解析
阿里云ECS是弹性计算服务,提供安全可靠的云服务器,包括多种实例规格如经济型、通用型、计算型等,适合不同场景。ECS支持VPC专有网络、快照与镜像、多种付费模式。用户可按需选择计算架构、存储类型,享受灵活的网络控制、自动化数据备份和低成本计算资源。适用于Web应用、在线游戏、大数据分析和深度学习等场景。阿里云提供免费试用和优惠价格,服务众多知名企业,如新浪微博。
125 5
|
1天前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
16 3
|
27天前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
39 2
|
28天前
|
XML Web App开发 数据挖掘
Postman接口测试工具全解析:功能、脚本编写及优缺点探讨
文章详细分析了Postman接口测试工具的功能、脚本编写、使用场景以及优缺点,强调了其在接口自动化测试中的强大能力,同时指出了其在性能分析方面的不足,并建议根据项目需求和个人偏好选择合适的接口测试工具。
37 1
|
19天前
|
存储 开发者 C#
WPF与邮件发送:教你如何在Windows Presentation Foundation应用中无缝集成电子邮件功能——从界面设计到代码实现,全面解析邮件发送的每一个细节密武器!
【8月更文挑战第31天】本文探讨了如何在Windows Presentation Foundation(WPF)应用中集成电子邮件发送功能,详细介绍了从创建WPF项目到设计用户界面的全过程,并通过具体示例代码展示了如何使用`System.Net.Mail`命名空间中的`SmtpClient`和`MailMessage`类来实现邮件发送逻辑。文章还强调了安全性和错误处理的重要性,提供了实用的异常捕获代码片段,旨在帮助WPF开发者更好地掌握邮件发送技术,提升应用程序的功能性与用户体验。
22 0
|
19天前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
35 0
|
19天前
|
开发者 API 开发框架
Xamarin 在教育应用开发中的应用:从课程笔记到互动测验,全面解析使用Xamarin.Forms构建多功能教育平台的技术细节与实战示例
【8月更文挑战第31天】Xamarin 作为一款强大的跨平台移动开发框架,在教育应用开发中展现了巨大潜力。它允许开发者使用单一的 C# 代码库构建 iOS、Android 和 Windows 应用,确保不同设备上的一致体验。Xamarin 提供广泛的 API 支持,便于访问摄像头、GPS 等原生功能。本文通过一个简单的教育应用示例——课程笔记和测验功能,展示了 Xamarin 在实际开发中的应用过程。从定义用户界面到实现保存笔记和检查答案的逻辑,Xamarin 展现了其在教育应用开发中的高效性和灵活性。
26 0
|
19天前
|
UED 开发者
哇塞!Uno Platform 数据绑定超全技巧大揭秘!从基础绑定到高级转换,优化性能让你的开发如虎添翼
【8月更文挑战第31天】在开发过程中,数据绑定是连接数据模型与用户界面的关键环节,可实现数据自动更新。Uno Platform 提供了简洁高效的数据绑定方式,使属性变化时 UI 自动同步更新。通过示例展示了基本绑定方法及使用 `Converter` 转换数据的高级技巧,如将年龄转换为格式化字符串。此外,还可利用 `BindingMode.OneTime` 提升性能。掌握这些技巧能显著提高开发效率并优化用户体验。
38 0
|
21天前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
34 0
|
2月前
|
数据可视化 数据挖掘 数据库
低代码开发全解析核心功能及其优势
低代码开发平台采用图形界面与预构建组件加速软件开发,降低技术门槛与成本,并支持敏捷迭代与快速部署。其核心功能包括可视化建模、预构建组件库、业务流程自动化、集成与连接性、多平台应用开发、数据分析报告、版本控制与协作、测试调试工具、安全性与合规性及快速部署更新。优点体现在提升开发速度与效率、降低成本、加强团队合作及提高灵活性与可扩展性。选择平台时需明确需求、评估功能与灵活性、考虑易用性、集成能力、安全性与合规性及成本与定价模型。例如,Zoho Creator作为成熟平台,拥有丰富的经验和广泛的应用案例。低代码开发已成为企业数字化转型的关键工具。
71 13

相关产品

  • 云消息队列 MQ
  • 推荐镜像

    更多