RocketMQ 消息过滤

简介: 前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息过滤部分的实现细节。

引言

前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息过滤部分的实现细节,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;

FilterServer 过滤器

RocketMQ 提供了基于表达式与基于类模式两种过滤模式,前面已经详细介绍了整个消息拉取、基于表达式(TAG)的过滤模式。基于类模式过滤是指在 Broker 端运行 1 个或多个消息过滤服务器(FilterServer), RocketMQ 允许消息消费者自定义消息过滤实现类并将其代码上传到 FilterServer 上,消息消费者向 FilterServer 拉取消息,FilterServer将消息消费者的拉取命令转发到 Broker,然后对返回的消息执行消息过滤逻辑,最终将消息返回给消费端,其工作原理如下图所示。
filter-server

  1. Broker 进程所在的服务器会启动多个 FilterServer 进程
  2. 消费者在订阅消息主题时会上传一个自定义的消息过滤实现类,FilterServer 加载并实例化
  3. 消息消费者(Consume)向 FilterServer 发送消息拉取请求,FilterServer 接收到消费者消息拉取请求后,FilterServer 将消息拉取请求转发给 Broker, Broker 返回消息后在 FilterServer 端执行消息过滤逻辑,然后返回符合条件的消息给消费者进行消费

FilterServer 注册

FilterServer 从配置文件中获取 Broker地址,然后将自己的IP与端口发送到 Broker 服务器。随后 Broker 会在其内存中维护一个 FilterServer 列表,此后 FilterServer 和 Broker 之间还会通过心跳来维持注册关系,如果超过 30s 未收到心跳,则会删除关于该 FilterServer 的信息。

为了防止 FilterServer 由于 Crash 而越来越少,Broker 也会定时检查当前 FilterServer 的数量,如果数量小于阈值,则自动创建一个 FilterServer。

还记得前面说过的 Broker 每隔 30s 会向 NameServer 发送心跳包么,在心跳包中就包含该 Broker 的所有 FilterServer 信息,消息的消费者就是从 NameServer 中获取到该 Broker 的所有 FilterServer 信息的。

总结一下:FilterServer 在启动时向 Broker 注册自己,在 Broker 端维护该 Broker 的 FilterServer 信息,并定时监控 FilterServer 的状态,然后 Broker 通过与所有 NameServer 的心跳包向 NameServer 注册 Broker 上存储的 FilterServer 列表,指引消息消费者正确从 FilterServer 上拉取消息。

类过滤机制

  1. 消费者查询需要订阅的主题所在的 Broker 和其对应的 FilterServer
  2. 遍历所有 FilterServer 并发送过滤代码
  3. FilterServer 先通过CRC验证源码的正确性,然后根据消费组名+topic 保存其过滤代码,最后进行编译

如果 FilterServer 设置为不允许直接编译消费者上传的类,则会开启一个定时任务,每隔一段时间从指定的远程服务器下载对应的过滤代码。而远端服务器的过滤代码上传,就需要进行适当的检查,防止图谋不轨的代码上传。

消息拉取模式

RocketMQ 消息的过滤发生在消息消费的时候,PullMessageService 线程默认从 Broker 上拉取消息,执行相关的过滤逻辑,在 FilterServer 过滤模式下,PullMessageService 线程将拉取地址由原来的 Broker 地址转换成随机一个 FilterServer 地址。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 海量数据处理之Bloom Filter详解
[7] rocketmq GitHub Wiki

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
780 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 存储 数据可视化
【RocketMq-生产者】消息发送者参数详解
首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。 这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
702 0
|
消息中间件 存储 负载均衡
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(下)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
|
消息中间件 负载均衡 算法
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(上)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67790 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 Java RocketMQ
【消息中间件】默认RocketMQ消息发送者是如何启动的?
上一篇文章,主要介绍了RocketMQ消息发送-请求与响应,了解了消息发送的请求参数和响应结果,今天我们主要来学习默认消息发送者的源码,看看消息发送主要是做了那哪些事情。
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2764 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
706 1
5张图带你理解 RocketMQ 顺序消息实现机制
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
264 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
|
消息中间件 缓存 数据库
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
428 0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息