一、前言
作为一款消息中间件产品,高可用起着至关重要的作用。那 RocketMQ 是怎么保证高可用机制的呢?老周从四个维度来分析:消息存储高可用、消息发送高可用、消息消费高可用以及消息管理高可用。本文主要侧重讲消息存储高可用,其它三个维度老周也会说一下。话不多说,我们现在就开始 RocketMQ 的高可用机制之旅吧。
二、消息存储高可用
很多需要持久化的产品,它们的高可用机制一般都是主从复制和读写分离。RocketMQ 消息存储的高可用也不例外。既然这样,那我们就从这两个方向来分析一下。
1、主从复制
我们先来从全局的概念先来了解下主从同步的交互流程
看完这张图我们可以总结出主从复制的实现原理如下:
- 首先启动 Master 并在特定端口上监听 Slave 的连接;
- Slave 主动连接 Master,主服务器接收客户端的连接,并建立相关 TCP 连接;
- Slave 以每隔 5s 的间隔时间向 Master 拉取消息,如果是第一次拉取的话,先获取本地 commitlog 文件中最大的偏移量,以该偏移量向服务端拉取消息;
- Master 解析请求,并返回一批数据给 Slave;
- Slave 收到一批消息后,将消息写入本地 commitlog 文件中,然后向 Master 汇报拉取进度,并更新下一次待拉取偏移量;
- 然后重复第 3 步;
实现原理老周这里不会从源码层面来讲哈,因为一展开的话东西太多了,一个专栏都说不完。这里只说下核心原理。看完上面主从复制的实现原理后,相信你有个大致的脉络了,感觉实现原理和其它存储的产品都差不了太多。
如果是主从同步模式的话,消息发送者将消息刷写到磁盘后,需要继续等待新数据被传输到从服务器,而从服务器数据的复制是在另外一个线程中去拉取的,所以消息发送者在这里需要等待数据传输的结果,RocketMQ 有一个 GroupTransferService,它的职责是负责当主从同步复制结束后通知由于等待 HA 同步结果而阻塞的消息发送者线程。
判断主从同步是否完成的依据是 Slave 中已成功复制的最大偏移量是否大于等于消息生产者发送消息后消息服务端返回下一条消息的起始偏移量,如果是则表示主从同步复制已经完成,唤醒消息发送线程,否则等待 1s 再次判断,每一个任务在一批任务中循环判断 5 次。
主从复制确实能保证消息存储的高可用,那极端情况下呢还能保证吗?比如主服务器挂掉了,会不会重新选举一台主出来,从服务器会不会接管消息消费,此时消息消费进度如何保持?那如果主服务器又恢复了,消息消费者是从主服务器消费还是从服务器消费?主服务器与从服务器的消费进度又是如何同步的呢?
带着这些问题,我们来看下 RocketMQ 作者是如何处理的。读写分离机制很好的解决了上述问题,我们继续往下一小节看。
2、读写分离
RocketMQ 根据 MessageQueue 查找 Broker 地址的唯一依据是 brokerName,从 RocketMQ 的 Broker 组织结构中得知同一组 Broker (M-S) 服务器,它们的 brokerName 相同但 brokerId 不同,主服务器的 brokerId 为 0,从服务器的 brokerId 大于 0。
在默认情况下 RocketMQ 会优先选择从主服务器进行拉取消息,如果主服务器繁忙则建议下一次从从服务器拉取消息,下次默认从标号为 1 的从节点拉取消息。如果一个 Master 拥有多台 Slave 服务器,参与消息拉取负载的从服务器只会是其中一个。你可能会问,老周,我怎么知道主服务器啥时候繁忙啊?还有我怎么知道要从从服务器拉取消息的建议啊?别急,要解决你的疑问,先从 DefaultMessageStore#getMessage 这个方法看起,核心代码老周贴在下面了。
2.1 DefaultMessageStore#getMessage
long diff = maxOffsetPy - maxPhyOffsetPulling; // ① long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); // ② getResult.setSuggestPullingFromSlave(diff > memory); // ③
上面这几行代码就是就是建议策略,下面进行核心参数解读:
2.1.1 代码 ① 详解
- maxOffsetPy:当前主服务器消息存储文件最大物理偏移量,返回的偏移量为已存入到操作系统的 PageCache 中的内容。
final long maxOffsetPy = this.commitLog.getMaxOffset();
- maxPhyOffsetPulling:本次消息拉取最大物理偏移量,按照消息顺序拉取的基本原则,可以基本预测下次开始拉取的物理偏移量将大于该值,并且就在其附近。
long maxPhyOffsetPulling = 0; long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy;
- diff:对于 PullMessageService 线程来说,当前未被拉取到消息消费端的消息长度。
2.1.2 代码 ② 详解
获取 RocketMQ 消息存储在 PageCache 中的总大小,如果当 RocketMQ 容量超过该阔值,将会将被置换出内存,如果要访问不在 PageCache 中的消息,则需要从磁盘读取。
- StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE:RocketMQ 所在服务器总物理内存大小
- accessMessageInMemoryMaxRatio:设置消息存储在内存中的阀值,默认为 40。
this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() public int getAccessMessageInMemoryMaxRatio() { return accessMessageInMemoryMaxRatio; } private int accessMessageInMemoryMaxRatio = 40;
这里就可以算出 RocketMQ 消息能映射到内存中最大值为 40% * (机器总物理内存)。
2.1.3 代码 ③ 详解
设置下次拉取是否从从服务器拉取标记,触发下次从从服务器拉取的条件为:当前所有可用消息数据(所有 commitlog )文件的大小已经超过了其阔值,默认为物理内存的 40%。
public void setSuggestPullingFromSlave(boolean suggestPullingFromSlave) { this.suggestPullingFromSlave = suggestPullingFromSlave; } private boolean suggestPullingFromSlave = false; // 默认为false
那 GetMessageResult 的 suggestPullingFromSlave 属性在哪里使用呢?请看下面。
2.2 PullMessageProcessor#processRequest
if (getMessageResult.isSuggestPullingFromSlave()) { // ① responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { // ② case ASYNC_MASTER: case SYNC_MASTER: break; case SLAVE: if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break; } if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // ③ // consume too slow ,redirect to another machine if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } // consume ok else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); } } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); }
2.2.1 代码 ① 详解
当前所有可用消息数据(所有 commitlog )文件的大小已经超过了默认物理内存的 40%,会建议从从服务器读取。
2.2.2 代码 ② 详解
如果当前服务器的角色为从服务器并且 slaveReadEnable=false,则忽略代码 ① 设置的值,下次拉取切换为从主拉取。
2.2.3 代码 ③ 详解
如果 slaveReadEnable=true (从允许读),并且建议从从服务器读取,则从消息消费组建议当消息消费缓慢时建议的拉取 brokerId,由订阅组配置属性 whichBrokerWhenConsumeSlowly 决定。
如果消息消费速度正常,则使用订阅组建议的 brokerId 拉取消息进行消费,默认为主服务器。如果不允许从可读,则固定使用从主拉取。
2.3 消息消费进度同步机制
从上面内容可知,主从同步引入的主要目的就是消息堆积的内容默认超过物理内存的 40%,则消息读取则由从服务器来接管,实现消息的读写分离,避免主服务 IO 抖动严重。那问题来了,主服务器宕机后,从服务器接管消息消费后,那消息消费进度存储在哪里?当 主服务器恢复正常后,消息是从主服务器拉取还是从从服务器拉取?主服务器如何得知最新 的消息消费进度呢?
2.3.1 消息消费进度管理(集群模式)
集群模式下消息消费进度存储文件位于服务端 ${ROCKETMQ_HOME}/store/config/consumerOffset.json。消息消费者从服务器拉取一批消息后提交到消费组特定的线程池中处理消息,当消息消费成功后会向 Broker 发送 ACK 消息,告知消费端已成功消费到哪条消息,Broker 收到消息消费进度反馈后,首先存储在内存中,然后定时持久化到 consumeOffset.json 文件中。
我们先看一下客户端向服务端反馈消息消费进度时如何选择 Broker ?
因为主服务的 brokerId 为 0,默认情况下当主服务器存活的时候,优先会选择主服务器,只有当主服务器宕机的情况下,才会选择从服务器。
既然集群模式下消息消费进度存储在 Broker 端,当主服务器正常时,消息消费进度文件存储在主服务器,那提出如下两个问题:
- 消息消费端在主服务器存活的情况下,会优先向主服务器反馈消息消费进度,那从服务器是如何同步消息消费进度的。
- 当主服务器宕机后则消息消费端会向从服务器反馈消息消费进度,此时消息消费进度如何存储,当主服务器恢复正常后,主服务器如何得知最新的消息消费进度。
客户端定时向 Broker 端发送更新消息消费进度的请求,其入口为:RemoteBrokerOffsetStore#updateConsumeOffsetToBroker,该方法中一个非常关键的点是:选择 broker 的逻辑,如下所示:
@Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } if (findBrokerResult != null) { UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); if (isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } } else { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } }
public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) { String brokerAddr = null; boolean slave = false; boolean found = false; HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { for (Map.Entry<Long, String> entry : map.entrySet()) { Long id = entry.getKey(); brokerAddr = entry.getValue(); if (brokerAddr != null) { found = true; if (MixAll.MASTER_ID == id) { slave = false; } else { slave = true; } break; } } // end of for } if (found) { return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr)); } return null; }
如果主服务器存活,则选择主服务器,如果主服务器宕机,则选择从服务器。也就是说, 不管消息是从主服务器拉取的还是从从服务器拉取的,提交消息消费进度请求,优先选择主服务器。服务端就是接收其偏移量,更新到服务端的内存中,然后定时持久化到${ROCKETMQ_HOME}/store/config/consumerOffset.json。
经过上面的分析,你应该还是会有疑惑吧,那请看下面的这两点。
消息消费者首先从主服务器拉取消息,并向其提交消息消费进度,如果当主服务器宕机后,从服务器会接管消息拉取服务,此时消息消费进度存储在从服务器,主从服务器的消息消费进度会出现不一致?那当主服务器恢复正常后,两者之间的消息消费进度如何同步?
看到这里了,当你自己脑海里会有这两个疑问的时候,那说明,消息存储高可用机制你了解的还算不错。我们带着这两个问题继续往下分析!
2.3.2 从服务定时同步主服务器进度
public class SlaveSynchronize { public void syncAll() { this.syncTopicConfig(); this.syncConsumerOffset(); this.syncDelayOffset(); this.syncSubscriptionGroupConfig(); } }
如果 Broker 角色为从服务器,会通过定时任务调用 syncAll,从主服务器定时同步 topic 路由信息、消息消费进度、延迟队列处理进度、消费组订阅信息。
那问题来了,如果主服务器启动后,从服务器马上从主服务器同步消息消息进度,那岂不是又要重新消费?
其实在绝大部分情况下,就算从服务器从主服务器同步了很久之前的消费进度,只要消费者没有重新启动,就不需要重新消费,在这种情况下,RocketMQ 提供了两种机制来确保不丢失消息消费进度。
- 消息消费者在内存中存在最新的消息消费进度,继续以该进度去服务器拉取消息后,消息处理完后,会定时向 Broker 服务器反馈消息消费进度,在上面也提到过,在反馈消息消费进度时,会优先选择主服务器,此时主服务器的消息消费进度就立马更新了,从服务器此时只需定时同步主服务器的消息消费进度即可。
- 消息消费者在向主服务器拉取消息时,如果是是主服务器,在处理消息拉取时,也会更新消息消费进度。
2.3.3 主服务器消息拉取时更新消息消费进度
主服务器在处理消息拉取命令时,会触发消息消费进度的更新,其代码入口为 PullMessageProcessor#processRequest
boolean storeOffsetEnable = brokerAllowSuspend; // ① storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; // ② if (storeOffsetEnable) { this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); }
2.3.3.1 代码 ① 详解
- brokerAllowSuspend:broker 是否允许挂起,在消息拉取时,该值默认为 true。
- hasCommitOffsetFlag:消息消费者在内存中是否缓存了消息消费进度,如果缓存了,该标记设置为 true。
2.3.3.2 代码 ② 详解
如果 Broker 的角色为主服务器,并且上面两个变量都为 true,则首先使用 commitOffset 更新消息消费进度。
看到这里,消息存储高可用核心机制相信你掌握的差不多了。消息发送高可用、消息消费高可用以及消息管理高可用这三个方面的高可用我们在后面再分三篇分析。
欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。
喜欢的话,点赞、再看、分享三连。