引言
前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中HA机制部分的实现细节,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;
HA机制
为了提高消息消费的高可用性,避免 Broker 发生单点故障引起存储在 Broker 上的消息无法及时消费,RocketMQ 引入了 Broker 主备机制,即消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器 Broker 宕机后,消息消费者可以从从服务器拉取消息。
工作机制
RocketMQ HA 的实现原理如下。
- 主服务器启动,并在特定端口上监听从服务器的连接
- 从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关 TCP 连接
- 从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器
- 从服务器保存消息并继续发送新 的消息同步请求
如果是同步主从模式,消息发送者将消息刷写到磁盘后,需要继续等待新数据被传输到从服务器,而从服务器数据的复制是在另外一个线程中去拉取的,所以消息发送者在这里需要等待数据传输的结果,RocketMQ 有一个 GroupTransferService,它的职责是负责当主从同步复制结束后通知由于等待 HA 同步结果而阻塞的消息发送者线程。
判断主从同步是否完成的依据是 Slave 中已成功复制的最大偏移量是否大于等于消息生产者发送消息后消息服务端返回下一条消息的起始偏移量,如果是则表示主从同步复制已经完成,唤醒消息发送线程,否则等待 1s 再次判断,每一个任务在一批任务中循环判断 5 次。
RocketMQ HA 主要交互流程如下图所示。
读写分离
RocketMQ 根据 MessageQueue查找 Broker地址的唯一依据是 brokerName,从 RocketMQ 的 Broker 组织结构中得知同一组 Broker (M-S)服务器,它们的 brokerName 相同但 brokerId 不同,主服务器的 brokerId 为 0,从服务器的 brokerId 大于 0。
在前面介绍消息拉取的时候,提过 Broker 在返回拉取内容的同时还会返回下一次是否要从 Slave 拉取数据,消费者收到该建议后,会找到合适的 Broker 节点进行拉取。那么 Broker 是通过哪种策略来建议的呢?
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
上述就是建议策略,下面进行解读:
- maxOffsetPy: 代表当前主服务器消息存储文件最大偏移量。
- maxPhyOffsetPulling: 此次拉取消息最大偏移量。
- diff:对于PullMessageService线程来说,当前未被拉取到消息消费端的消息长度。
- TOTAL_PHYSICAL_MEMORY_SIZE: RocketMQ 所在服务器总内存大小。
- AccessMessageInMemoryMaxRatio: 表示 RocketMQ 所能使用的最大内存比例,超过该内存,消息将被置换出内存
- memory: 表示 RocketMQ 消息常驻内存的大小,超过该大小, RocketMQ 会将旧的消息置换回磁盘
- 如果 diff 大于 memory,表示当前需要拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时才建议从 Slave 服务器拉取
如果主服务器繁忙则建议下一次从从服务器拉取消息,下次默认从标号为 1 的从节点拉取消息。如果一个 Master 拥有多台 Slave 服务器,参与消息拉取负载的从服务器只会是其中一个。
文章说明
更多有价值的文章均收录于贝贝猫的文章目录
版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。
参考内容
[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[7] 海量数据处理之Bloom Filter详解
[8] rocketmq GitHub Wiki