引言
前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中 NameServer 部分的实现细节,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;
NameServer
本节主要介绍RocketMQ路由管理、服务注册及服务发现的机制,NameServer是整个RocketMQ的“大脑”。相信大家对“服务发现”这个词语并不陌生,分布式服务SOA架构体系中会有服务注册中心,分布式服务SOA的注册中心主要提供服务调用的解析服务,指引服务调用方(消费者)找到“远方”的服务提供者,完成网络通信,那么RocketMQ的路由中心存储的是什么数据呢?作为一款高性能的消息中间件,如何避免NameServer的单点故障,提供高可用性呢?
Broker消息服务器在启动时向所有NameServer注册,消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。NameServer与每台Broker服务器保持长连接,并间隔30s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,为什么要这样设计呢?这是为了降低NameServer实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。
NameServer本身的高可用可通过部署多台NameServer服务器来实现,但彼此之间互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是RocketMQ NameServer设计的一个亮点,RocketMQ NameServer设计追求简单高效。
存储内容
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡。
- brokerAddrTable:Broker基础信息,包含brokerName、所属集群名称、主备Broker地址。
- clusterAddrTable:Broker集群信息,存储集群中所有Broker名称。
- brokerLiveTable:Broker状态信息,NameServer每次收到心跳包时会替换该信息。
- filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。
QueueData、BrokerData、BrokerLiveInfo类图如下图所示。
RocketMQ 2主2从部署图如下所示。
对应运行时数据结构如下图所示。
路由注册
RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳语句,每隔30s向集群中所有NameServer发送心跳包,NameServer收到Broker心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimestamp,然后NameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,NameServer将移除该Broker的路由信息同时关闭Socket连接。
心跳包
- brokerAddr:broker地址。
- brokerId:brokerId,O:Master;大于0:Slave。
- brokerName:broker名称。
- clusterName:集群名称。
- haServerAddr:master地址,初次请求时该值为空,slave向NameServer注册后返回其MasterAddr。
requestBody:
- filterServerList:消息过滤服务器列表。
- topicConfigWrapper:主题配置。
从心跳包内容我们会发现,每次心跳包中都会包含所有的topic信息,如果一个broker上topic非常多的话,心跳包就会比较大,如果正好赶上网络不好的时候,可能就会导致broker下线。
NameServer与Broker保持长连接,Broker状态存储在brokerLiveTable中,NameServer每收到一个心跳包,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、 brokerAddrTable、 brokerLiveTable、 filterServerTable)。
路由删除
Broker每隔30s向NameServer发送一个心跳包,心跳包中包含BrokerId、Broker地址、Broker名称、Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢? NameServer会每隔1Os扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker, 关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。
RocketMQ有两个触发点来触发路由删除:
- NameServer定时扫描brokerLiveTable检测上次心跳包与当前系统时间的时间差,如果时间戳大于120s,则需要移除该Broker信息。
- Broker在正常被关闭的情况下,会执行unRegisterBroker指令,主动删除NameServer中关于自己的信息。
路由发现
RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。
工作示意图
文章说明
更多有价值的文章均收录于贝贝猫的文章目录
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。
参考内容
[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 海量数据处理之Bloom Filter详解
[7] rocketmq GitHub Wiki