RocketMQ 消息过滤

简介: 前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息过滤部分的实现细节。

引言

前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息过滤部分的实现细节,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;

FilterServer 过滤器

RocketMQ 提供了基于表达式与基于类模式两种过滤模式,前面已经详细介绍了整个消息拉取、基于表达式(TAG)的过滤模式。基于类模式过滤是指在 Broker 端运行 1 个或多个消息过滤服务器(FilterServer), RocketMQ 允许消息消费者自定义消息过滤实现类并将其代码上传到 FilterServer 上,消息消费者向 FilterServer 拉取消息,FilterServer将消息消费者的拉取命令转发到 Broker,然后对返回的消息执行消息过滤逻辑,最终将消息返回给消费端,其工作原理如下图所示。
filter-server

  1. Broker 进程所在的服务器会启动多个 FilterServer 进程
  2. 消费者在订阅消息主题时会上传一个自定义的消息过滤实现类,FilterServer 加载并实例化
  3. 消息消费者(Consume)向 FilterServer 发送消息拉取请求,FilterServer 接收到消费者消息拉取请求后,FilterServer 将消息拉取请求转发给 Broker, Broker 返回消息后在 FilterServer 端执行消息过滤逻辑,然后返回符合条件的消息给消费者进行消费

FilterServer 注册

FilterServer 从配置文件中获取 Broker地址,然后将自己的IP与端口发送到 Broker 服务器。随后 Broker 会在其内存中维护一个 FilterServer 列表,此后 FilterServer 和 Broker 之间还会通过心跳来维持注册关系,如果超过 30s 未收到心跳,则会删除关于该 FilterServer 的信息。

为了防止 FilterServer 由于 Crash 而越来越少,Broker 也会定时检查当前 FilterServer 的数量,如果数量小于阈值,则自动创建一个 FilterServer。

还记得前面说过的 Broker 每隔 30s 会向 NameServer 发送心跳包么,在心跳包中就包含该 Broker 的所有 FilterServer 信息,消息的消费者就是从 NameServer 中获取到该 Broker 的所有 FilterServer 信息的。

总结一下:FilterServer 在启动时向 Broker 注册自己,在 Broker 端维护该 Broker 的 FilterServer 信息,并定时监控 FilterServer 的状态,然后 Broker 通过与所有 NameServer 的心跳包向 NameServer 注册 Broker 上存储的 FilterServer 列表,指引消息消费者正确从 FilterServer 上拉取消息。

类过滤机制

  1. 消费者查询需要订阅的主题所在的 Broker 和其对应的 FilterServer
  2. 遍历所有 FilterServer 并发送过滤代码
  3. FilterServer 先通过CRC验证源码的正确性,然后根据消费组名+topic 保存其过滤代码,最后进行编译

如果 FilterServer 设置为不允许直接编译消费者上传的类,则会开启一个定时任务,每隔一段时间从指定的远程服务器下载对应的过滤代码。而远端服务器的过滤代码上传,就需要进行适当的检查,防止图谋不轨的代码上传。

消息拉取模式

RocketMQ 消息的过滤发生在消息消费的时候,PullMessageService 线程默认从 Broker 上拉取消息,执行相关的过滤逻辑,在 FilterServer 过滤模式下,PullMessageService 线程将拉取地址由原来的 Broker 地址转换成随机一个 FilterServer 地址。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 文件 IO 操作的一些最佳实践
[7] 海量数据处理之Bloom Filter详解
[8] rocketmq GitHub Wiki

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
647 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 存储 数据可视化
【RocketMq-生产者】消息发送者参数详解
首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。 这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
630 0
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
318 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
|
消息中间件 存储 缓存
RocketMQ Schema——让消息成为流动的结构化数据
RocketMQ Schema 提供了对消息的数据结构托管服务,同时为原生客户端提供了较为丰富的序列化/反序列化 SDK ,补齐了 RocketMQ 在数据治理和业务上下游解耦方面的短板,让数据成为流动的结构化数据,那么快来了解下实现原理吧~
482 0
RocketMQ Schema——让消息成为流动的结构化数据
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
956 1
|
消息中间件 RocketMQ
简述RocketMQ消息拉取过程【一】
简述RocketMQ消息拉取过程【一】
665 0
|
消息中间件 缓存 负载均衡
RocketMQ消息生产者是如何选择Broker的
RocketMQ消息生产者是如何选择Broker的
492 1
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
515 1
|
消息中间件 存储 Java
10 张图告诉你 RocketMQ 是怎样保存消息的
10 张图告诉你 RocketMQ 是怎样保存消息的
201 0
10 张图告诉你 RocketMQ 是怎样保存消息的
|
消息中间件 存储 uml
5 张图带你彻底理解 RocketMQ 轨迹消息
5 张图带你彻底理解 RocketMQ 轨迹消息
410 0
5 张图带你彻底理解 RocketMQ 轨迹消息