引言
前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息过滤部分的实现细节,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;
FilterServer 过滤器
RocketMQ 提供了基于表达式与基于类模式两种过滤模式,前面已经详细介绍了整个消息拉取、基于表达式(TAG)的过滤模式。基于类模式过滤是指在 Broker 端运行 1 个或多个消息过滤服务器(FilterServer), RocketMQ 允许消息消费者自定义消息过滤实现类并将其代码上传到 FilterServer 上,消息消费者向 FilterServer 拉取消息,FilterServer将消息消费者的拉取命令转发到 Broker,然后对返回的消息执行消息过滤逻辑,最终将消息返回给消费端,其工作原理如下图所示。
- Broker 进程所在的服务器会启动多个 FilterServer 进程
- 消费者在订阅消息主题时会上传一个自定义的消息过滤实现类,FilterServer 加载并实例化
- 消息消费者(Consume)向 FilterServer 发送消息拉取请求,FilterServer 接收到消费者消息拉取请求后,FilterServer 将消息拉取请求转发给 Broker, Broker 返回消息后在 FilterServer 端执行消息过滤逻辑,然后返回符合条件的消息给消费者进行消费
FilterServer 注册
FilterServer 从配置文件中获取 Broker地址,然后将自己的IP与端口发送到 Broker 服务器。随后 Broker 会在其内存中维护一个 FilterServer 列表,此后 FilterServer 和 Broker 之间还会通过心跳来维持注册关系,如果超过 30s 未收到心跳,则会删除关于该 FilterServer 的信息。
为了防止 FilterServer 由于 Crash 而越来越少,Broker 也会定时检查当前 FilterServer 的数量,如果数量小于阈值,则自动创建一个 FilterServer。
还记得前面说过的 Broker 每隔 30s 会向 NameServer 发送心跳包么,在心跳包中就包含该 Broker 的所有 FilterServer 信息,消息的消费者就是从 NameServer 中获取到该 Broker 的所有 FilterServer 信息的。
总结一下:FilterServer 在启动时向 Broker 注册自己,在 Broker 端维护该 Broker 的 FilterServer 信息,并定时监控 FilterServer 的状态,然后 Broker 通过与所有 NameServer 的心跳包向 NameServer 注册 Broker 上存储的 FilterServer 列表,指引消息消费者正确从 FilterServer 上拉取消息。
类过滤机制
- 消费者查询需要订阅的主题所在的 Broker 和其对应的 FilterServer
- 遍历所有 FilterServer 并发送过滤代码
- FilterServer 先通过CRC验证源码的正确性,然后根据消费组名+topic 保存其过滤代码,最后进行编译
如果 FilterServer 设置为不允许直接编译消费者上传的类,则会开启一个定时任务,每隔一段时间从指定的远程服务器下载对应的过滤代码。而远端服务器的过滤代码上传,就需要进行适当的检查,防止图谋不轨的代码上传。
消息拉取模式
RocketMQ 消息的过滤发生在消息消费的时候,PullMessageService 线程默认从 Broker 上拉取消息,执行相关的过滤逻辑,在 FilterServer 过滤模式下,PullMessageService 线程将拉取地址由原来的 Broker 地址转换成随机一个 FilterServer 地址。
文章说明
更多有价值的文章均收录于贝贝猫的文章目录
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。
参考内容
[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 海量数据处理之Bloom Filter详解
[7] rocketmq GitHub Wiki