大家好,我是君哥。
RocketMQ 选择了自己写 NameServer 做注册中心而没有选择 Zookeeper,这是为什么呢?
首先看一下 RocketMQ 的架构,如下图:
RocketMQ 的 Broker 注册到 NameServer 集群,而生产者和消费者则需要从 NameServer 拉取消息。
1 NameServer
1.1 Broker 注册
Broker 启动时,会向 NameServer 发送注册消息,相关的 UML 类图如下:
我们看一下 BrokerOuterAPI 的 registerBrokerAll 方法,代码如下:
//BrokerOuterAPI.java public List<RegisterBrokerResult> registerBrokerAll( //省略参数 final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { //省略 requestHeader 封装 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body); if (result != null) { registerBrokerResultList.add(result); } } catch (Exception e) { } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
可以看到,当 Broker 启动时,会向所有的 NameServer 发送注册消息,NameServer 端的注册内容如下:
从上图中看出,需要在 NameServer 上保存的数据其实是很少的。
注意:
1.Broker 向 NameServer 注册时,会注册到所有的 NameServer 服务器, NameServer 并不是分布式存储,NameServer 集群是去中心化的。
2.NameServer 会有定时任务(每 10s 一次)检查 Broker 是否下线了,判断依据是 120s 内有没有收到心跳,如果没有收到,则关闭 channel,把 Broker 信息从本地缓存移除。代码见 RouteInfoManager 类 scanNotActiveBroker 方法。
3.Broker 启动时,同时会启动定时任务,每 30s 向 NameServer 发送注册消息,NameServer 收到注册消息后更新心跳时间(BrokerLiveInfo.lastUpdateTimestamp)。
下面是 Broker 对 NameServer 的两个请求码:
- 注册:RequestCode.REGISTER_BROKER
- 心跳:RequestCode.QUERY_DATA_VERSION
1.2 新建 Topic
创建 Topic 时,Broker 会向 NameServer 发送注册消息。代码如下:
//BrokerController 类 public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { TopicConfig registerTopicConfig = topicConfig; //省略 ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig); TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); topicConfigSerializeWrapper.setDataVersion(dataVersion); topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); doRegisterBrokerAll(true, false, topicConfigSerializeWrapper); }
最终调用了上一节的 registerBrokerAll 的方法。NameServer 收到注册消息后更新本地保存的数据,所保存的数据并没有增加新数据。
1.3 客户端
对于生产者和消费者,在发送和拉取消息时,首先会从本地缓存获取 Topic 路由信息,如果获取失败,则需要从 NameServer 进行获取。下面是获取 Topic 路由信息的 UML 类图:
看一下更新 Topic 路由信息的核心代码:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { //根据默认 Topic 来取 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), clientConfig.getMqClientApiTimeout()); //省略部分逻辑 } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); } if (topicRouteData != null) { //判断路由信息是否发送变化 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else {} if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info if (!producerTable.isEmpty()) { //更新生产者缓存 } // Update sub info if (!consumerTable.isEmpty()) { //更新消费者缓存 } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId); } } catch (MQClientException e) { } catch (RemotingException e) { } finally { this.lockNamesrv.unlock(); } } else {} } catch (InterruptedException e) { } return false; }
注意:客户端会有定时任务,默认每隔 30s 向 NameServer 拉取 Topic 路由信息来刷新本地缓存。
2 放弃 Zookeeper
RocketMQ 设计之初也是使用 Zookeeper 做注册中心的,这参考了 Kafka 的设计。Zookeeper 是一个非常成熟的注册中心,还有支持主节点选举、强一致等特性。使用 Zookeeper 的架构如下:
2.1 轻量级
从上面的分析中可以看到,RocketMQ 需要保存的数据非常少,完全不必引入 Zookeeper 这种重量级的注册中心。
2.2 一致性
NameServer 集群各节点是对等的,相互之间并不会进行通信,这样确实会有短暂不一致。Broker 启动时会跟所有的 NameServer 建立长链接,发送注册信息。注册成功后,每 30s 会向 NameServer 发送心跳,NameServer 收到心跳后更新 Broker 的 lastUpdateTimestamp。
Zookeeper 使用 ZAB 协议来保证节点之间数据的强一致性,这要求在每一个写请求都需要在节点上写事务日志,同时需要将内存数据持久化到磁盘以保证一致性和持久性。对于 RocketMQ 这种元数据非常少的简单场景,有点小题大做了。
放弃强一致而选择可用性也是 RocketMQ 放弃 Zookeeper 的选择,这也让 NameServer 的设计更加简单。
2.3 并发注册
NameServer 处理 Broker 注册的时候,考虑到多个 Broker 并发注册的问题,保存路由信息时采用了 ReadWriteLock 中的写锁,代码如下:
public RegisterBrokerResult registerBroker( //省略参数 final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { this.lock.writeLock().lockInterruptibly(); Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { } return result; }
2.4 Broker 上下线
如果有新的 Broker 加入时,NameServer 并不会主动向客户端推送新的 Broker 信息,而是需要客户端的定时任务(30s 一次)去主动拉取,这样客户端保存的路由信息跟 NameServer 会有短暂的不一致。
同样,Broker 掉线后,NameServer 会用定时任务(10s 一次)检测 Broker 最后更新时间是否超过 120s,如果超过就把 Broker 路由信息删除。在客户端,同样需要定时任务(30s 一次)去主动拉取,客户端保存的路由信息跟 NameServer 也会有短暂的不一致。
2.5 扩展性
从上面分析看到,NameServer 集群各节点是对等的,当集群有压力时,横向扩展非常容易。而 Zookeeper 在写扩展方面非常不灵活。
2.6 Broker 主从集群
在 Broker 主从集群中,RocketMQ 实现了基于 raft 协议的 DLedger 算法,可以基于 DLedger 进行日志复制。如果 Master 节点发生故障,可以基于 DLedger 自动进行主从切换。这可以完全不依赖于 Zookeeper 的实现。
2.7 运维
如果引入 Zookeeper,运维人员必须要具备运维 Zookeeper 的能力,这又增加了运维的复杂性。
3 总结
- 对于注册中心,RocketMQ 集群需要保存的元数据非常少,完全没有必要引入 Zookeeper 这种重量级的注册中心。
- RocketMQ 实现了基于 raft 协议的 DLedger 算法,可以保证 Broker 集群高可用,不用依赖 Zookeeper。
- NameServer 是 RocketMQ 内部组件,实现简单,易于扩展,不用考虑运维复杂性。