开发者学堂课程【消息队列 RocketMQ 5.0 云原生架构升级课程:RocketMQ 5.0 多样消费功能详解消息过滤】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/1234/detail/18399
RocketMQ5.0 多样消费功能详解-消息过滤
内容介绍
一、消费过滤概念介绍
二、电商场景应用
三、Rocket MQ 的过滤模式
四、最佳实践介绍
五、上手 EDMO 演示
一、消费过滤概念介绍
首先在消息中间件使用的过程中,一个主题所对应的消费者,他想要通过规则只消费这个主题下面具有某些特征的消息,然后过滤掉自己不关心的消息,这个功能的话叫消息过滤.
这组图中所描述的:生产者会向一个主题发送很多不同类型的一个消息,然后通过颜色来标识它们,其中有橙色的、黄色的,还有灰色的,这个主题的话它有对应的两个消费者,一个消费者他想要消费承受的消息,第二个,消费者的话,他想要消费黄色的和灰色的一个消息,那么想要达到这样的一个效果的话,就需要通过消息过滤这个功能来实现。
二、电商场景应用
那以常见的一个电商的一个场景为例,来看看那个消息过滤在实际应用过程中起到的作用。那电商平台在设计的时候往往会存在系统拆分系功能模块多,调用链路长,以及系统依赖复杂的一个特点,消息中间件在其中就起到了一个异不解偶异不通信的一个作用。特别是在双11这样一个流量高峰期,消息中间件还起到一个削峰填谷的一个作用,那么在消息中间的使用方面的话,电商平台会因为覆盖的一个业务众多,还会定义很多的一个消息主题,然后消息的收发量的话,也会随着的一个交易量以及订阅系统的一个数量的增加而增大,那随着业务系统的一个水平拆解以及垂直业务的一个增加,相应的一个消息的主题,也会呈现出一个高订阅笔和低投机比的一个状态,高利率比主要是指一个消息的一个主题,他被很多的一个消费者进行了订阅,比如说这个比例是1:300,那低投低比的话主要是指在如此高的一个订阅的一个比的一个情况下,那一条消息的话却只需要投递给少量的一个消费者,比如说只有15个,然后其他的285个那个消费者的话,都对这个消息进行一个过滤。
举例来说,在一个交易链路中,一个订单的一个处理流程,它会分为下单支付,还有物流发货的一个流程,那么这个流程就会涉及到一个订单的操作以及订单的一个状态机的一个变化,那么下游的系统的话,包括积分物流通知,还有实时计算这个系统,他们是通过消息中间件来去监听这个订单的一个变更消息,但是他们对这个订单的一个不同状态的一个消息有着一个不同的一个需求。
比如说积分系统它只关心的一个下单的一个消息,然后只要下单的话,就扣减那个积分,然后物流系统的话,它只要关心的话,是支付消息以及收货的消息,比如说支付的话,就会去通知商家发货,那用户确认收货的话就会去完成整个物流订单。
那么实时计算这种系统的话,它可能需要统计订单不同状态下的一个数据,所以他所有的消息他都要消费。那试想一下,就是如果没有消息过滤这个功能的话,该去如何支持这样的一个功能,这边的话想到了有两个方案,第一个方案的话就是基于主题的一个深度的拆分,然后将不同的消息发送到不同的主题上面,然后这对于生产者来说的话,也就意味着消费者他有多少个消费场景,那就需要新建多少个topic,这就无疑对生产者带来了一个巨大的一个维护成本。
对于消费者来说的话,消费者可能会需要同时订阅多个topic,那这同时也会带来一些维护成本,另外的话就是消息被那个主题拆分之后,他们的资金的一个消费顺序就没有办法保证了。
比如说一个订单,它涉及到一个下单和支付的一个操作,那这个这两个消息的话显然是需要被顺序进行处理了。那第二个方面的话主要是消费者收到消息之后,自行进行本地的一个过滤,那这就意味着所有的消息都会推送到消费者端,做一个计算,那这无疑就会增加了一个网络带宽的一个成本,同时也增加了一个增加了消费者在那个内存和CPU上面的一个消耗。
那么有了消息过滤这个功能之后的话,生产者就只需要向一个主题去投递消息,然后服务端的话根据订阅规则进行计算,然后并按需的投递到每一个消费者,然后这样生产者和消费者他们的代码维护也就非常的友好,同时也能很大程度的去降低整个网络的一个带宽,减少了这个消费者的一个内存占用和CPU的一个消耗,Rocket MQ它是众多消息中间件中为数不多的支持消息过滤的一个系统,这也是他能够作为业务集成消息首选方案的一个重要的一个基础之一。
三、Rocket MQ 的过滤模式
那在功能层面的话,Rocket MQ它支持两种过滤方式,分别是tag标签过滤和SQL属性过滤。
那下面就对这两种过滤模式进行功能上的一些介绍以及技术原理的一个分析。
(1)tag标签过滤及原理
首先来介绍一下tag标签过滤,它是Rocket MQ提供了一个最基础的一个消息过滤能力,它主要是基于生产者为消息设置了一个tag标签来进行匹配的,那tag过滤的话在语法方面的话,它支持单tag的一个匹配,然后多tag的匹配,还有全坦克的匹配,三种语法在使用方式上面,消息在发送的时候,只需要在这个消息上面设置一个标签,然后在订阅的时候,就可以通过设置单个标签,多个标签以及所有标签来进行消息的匹配就可以了。
那么Rocket MQ实现它的一个原理是怎么样的,首先就是在消息存储的时候,Rocket MQ是通过iPad only的这种方式,来将所有的主题的消息写在一个logo文件中,然后这可以有效的提升了那个消息的写入速率,为了消费时候能够快速的检索到消息,那它会在后台以异步任务的方式,将消息的一个位点以及消息的大小,还有消息标签值存储在一个consume cue的一个索引文件中,那加上这个 tag标签存在这个索引文件中,其实就是为了支持通过标签来进行消息的过滤的时候,可以在索引这个层面,就把这个小企的标签能够获取到,然后就不需要去文件中去获取这个标签了,减少了读取这个文件的一个系统l和内存的一个开销。
Rocket MQ里面去存储标签的哈希值的话,主要是为了能够保证整个Rocket MQ的一个文件能够正常的一个处理,然后这样可以有效的减少的存储空间,还要提升整个索引一个查询的一个读取的一个效率,那整个tag标签过滤的一个流程。是这样的,首先就是生产者对消息打上自己的一个业务标签之后,发送到给的一个 broken broker,收到这个消息之后会写入到Rocket MQ中里面去,然后通过异步现成的这种方式,将消息分发到的一个Rocket MQ里面,然后Rocket MQ里面就会去存储这个消息的一个标签的哈希值,消费者在启动的时候的话,他会先去像block注册它,然后将它的订阅关系去上传到高端,会将这个订阅关系以及标签的这个哈希值保存在那个内存中。
那当消费者去拉取消息的时候,block会去根据它的定义关系以及它对应的队列去consumqueue里面去检索消息,那就会将订阅关系里面的一个标签希值跟cosplay里面消息的哈希值做一个比较,如果匹配的话,就会发送给的一个消费者,但是希匹配的话并不代表它一定是相等的,所以消费者在收到这个消息之后,也会将这个消息中的一个真实的一个标签值和订阅关系里面的一个标签值做一个精确的匹配,然后匹配成功之后才会去进行消费。
(2)SQL属性过滤及原理
Rocket MQ的第二种那个过滤,模式的话就是SQL过滤,然后它是Rocket MQ提供了一种比较高级的一个过滤模式,它是通过生产者在发送消息的时候,在消息上面设置一个多个业务属性,然后订阅者在匹配的时候,可以通过 SQL这种表达式对这个属性进行过滤。
然后在过滤语法方面的话,它主要支持一些常用的一些授课语法,包括一些数字的一些比较,字符串的一个比较,还有一些看空的一些运算,可以支持比较复杂的一个逻辑的运算。
那在使用上面的话,发送消息的时候,只要在消息上面设置的一个属性,然后在订阅的时候,就可以订阅单个属性,也可以订阅多个属性,当然也可以定义所有的属性。
Rocket MQ中那个 SQL过滤的一个实验原理是这样的,首先搜狗过滤是是需要将消息的一个属性和circle表达式来进行一个匹配的,那这对服务端的一个内存以及CPU的消耗会有很大的开销。
为了降低这样的一个开销,罗特梅q采用了一个不容过滤器这种方式来进行优化,那当布洛克收到消息之后,会预先对那个通过一计算的方式,将所有的订阅进行消息的一个匹配,然后如果匹配的话,然后会将这个结果写到一个不容过滤器的一个位图里面去,然后存储在的一个消费队列Rocket MQ里面的一个扩展文件里面。
然后在消费的时候,block会使用这个 不能过滤器的一个位图,然后通过和消费者的一个SQL进行一个不能过滤器的一个过滤,然后这样的话就可以避免在消息一定不匹配的这种情况下,就不需要去 communicate logo里面去将这个消息的属性给拉出来,然后再进行一个 circle的匹配了,这就可以有效的降低生效匹配的一个计算的一个消息量,同时也减少了那个服务端的内存和 CPU的一个开销。那么整个circle过滤的一个处理流程是这样的,首先消费者通过心跳去上传订阅关系,如果block判断它是有circle过滤的话,它就会通过不容过滤器这种算法,然后生成这个 circle所对应的sql过滤器的这个匹配参数,然后存储在这样的一个 consumer philter dater中,然后生产者在发送消息的时候,在消息上面设置自己的一个业务属性,然后发送到给的博客端。
block 收到这个消息之后就会写入到Rocket MQ那个的logo里面去,然后通过一部现成的这种方式去分发到的casi cue里面,在写入之前,会将这个消息的一个属性跟他所有的订阅关系的Rocket MQ进行一个匹配。
那如果通过了就会将这个不能过去的匹配参数合并成一个完整的一个部分过滤器的一个位图,存储在这个 cosmic的一个弹性的一个文件里面去。那消费者在消费这个消息的时候,那block就会去获取到这个Rocket MQ里面每个消息的一个消费过滤位图,然后跟跟这个消费者的 circle的一个过滤匹配参数,做一个不能过去的一个过滤。然后如果是匹配的话,那就说明可能是需要被投递给消费者,如果是不批的话就需要被过滤掉。然后不能过于去返回匹配成功。 就需要将消息从里面去读取出来,然后将它的属性跟这个表达式再做一次,这个表达是一个精确的一个匹配。然后如果这次再匹配的话,就会去投递给的消费者。