5 张图告诉你 RocketMQ 为什么不使用 Zookeeper 做注册中心

本文涉及的产品
可视分析地图(DataV-Atlas),3 个项目,100M 存储空间
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
数据可视化DataV,5个大屏 1个月
简介: 5 张图告诉你 RocketMQ 为什么不使用 Zookeeper 做注册中心

大家好,我是君哥。

RocketMQ 选择了自己写 NameServer 做注册中心而没有选择 Zookeeper,这是为什么呢?

首先看一下 RocketMQ 的架构,如下图:

微信图片_20221213120125.png

RocketMQ 的 Broker 注册到 NameServer 集群,而生产者和消费者则需要从 NameServer 拉取消息。

1 NameServer

1.1 Broker 注册

Broker 启动时,会向 NameServer 发送注册消息,相关的 UML 类图如下:

微信图片_20221213120151.png

我们看一下 BrokerOuterAPI 的 registerBrokerAll 方法,代码如下:

//BrokerOuterAPI.java
public List<RegisterBrokerResult> registerBrokerAll(
 //省略参数
 final boolean compressed) {
 final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
 if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
        //省略 requestHeader 封装
  final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
  for (final String namesrvAddr : nameServerAddressList) {
   brokerOuterExecutor.execute(new Runnable() {
    @Override
    public void run() {
     try {
      RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
      if (result != null) {
       registerBrokerResultList.add(result);
      }
     } catch (Exception e) {
     } finally {
      countDownLatch.countDown();
     }
    }
   });
  }
  try {
   countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
  } catch (InterruptedException e) {
  }
 }
 return registerBrokerResultList;
}

可以看到,当 Broker 启动时,会向所有的 NameServer 发送注册消息,NameServer 端的注册内容如下:

微信图片_20221213120213.png

从上图中看出,需要在 NameServer 上保存的数据其实是很少的。

注意:

1.Broker 向 NameServer 注册时,会注册到所有的 NameServer 服务器, NameServer 并不是分布式存储,NameServer 集群是去中心化的。

2.NameServer 会有定时任务(每 10s 一次)检查 Broker 是否下线了,判断依据是 120s 内有没有收到心跳,如果没有收到,则关闭 channel,把 Broker 信息从本地缓存移除。代码见 RouteInfoManager 类 scanNotActiveBroker 方法。

3.Broker 启动时,同时会启动定时任务,每 30s 向 NameServer 发送注册消息,NameServer 收到注册消息后更新心跳时间(BrokerLiveInfo.lastUpdateTimestamp)。

下面是 Broker 对 NameServer 的两个请求码:

  • 注册:RequestCode.REGISTER_BROKER
  • 心跳:RequestCode.QUERY_DATA_VERSION

1.2 新建 Topic

创建 Topic 时,Broker 会向 NameServer 发送注册消息。代码如下:

//BrokerController 类
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
 TopicConfig registerTopicConfig = topicConfig;
 //省略
 ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
 topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
 TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
 topicConfigSerializeWrapper.setDataVersion(dataVersion);
 topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
 doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}

最终调用了上一节的 registerBrokerAll 的方法。NameServer 收到注册消息后更新本地保存的数据,所保存的数据并没有增加新数据。

1.3 客户端

对于生产者和消费者,在发送和拉取消息时,首先会从本地缓存获取 Topic 路由信息,如果获取失败,则需要从 NameServer 进行获取。下面是获取 Topic 路由信息的 UML 类图:

微信图片_20221213120237.png

看一下更新 Topic 路由信息的核心代码:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
 DefaultMQProducer defaultMQProducer) {
 try {
  if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
   try {
    TopicRouteData topicRouteData;
    if (isDefault && defaultMQProducer != null) {
     //根据默认 Topic 来取
     topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
      clientConfig.getMqClientApiTimeout());
     //省略部分逻辑
    } else {
     topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
    }
    if (topicRouteData != null) {
        //判断路由信息是否发送变化
     TopicRouteData old = this.topicRouteTable.get(topic);
     boolean changed = topicRouteDataIsChange(old, topicRouteData);
     if (!changed) {
      changed = this.isNeedUpdateTopicRouteInfo(topic);
     } else {}
     if (changed) {
      TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
      for (BrokerData bd : topicRouteData.getBrokerDatas()) {
       this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
      }
      // Update Pub info
      if (!producerTable.isEmpty()) {
       //更新生产者缓存
      }
      // Update sub info
      if (!consumerTable.isEmpty()) {
       //更新消费者缓存
      }
      log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
      this.topicRouteTable.put(topic, cloneTopicRouteData);
      return true;
     }
    } else {
     log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
    }
   } catch (MQClientException e) {
   } catch (RemotingException e) {
   } finally {
    this.lockNamesrv.unlock();
   }
  } else {}
 } catch (InterruptedException e) {
 }
 return false;
}

注意:客户端会有定时任务,默认每隔 30s 向 NameServer 拉取 Topic 路由信息来刷新本地缓存。

2 放弃 Zookeeper

RocketMQ 设计之初也是使用 Zookeeper 做注册中心的,这参考了 Kafka 的设计。Zookeeper 是一个非常成熟的注册中心,还有支持主节点选举、强一致等特性。使用 Zookeeper 的架构如下:

微信图片_20221213120300.png

2.1 轻量级

从上面的分析中可以看到,RocketMQ 需要保存的数据非常少,完全不必引入 Zookeeper 这种重量级的注册中心。

2.2 一致性

NameServer 集群各节点是对等的,相互之间并不会进行通信,这样确实会有短暂不一致。Broker 启动时会跟所有的 NameServer 建立长链接,发送注册信息。注册成功后,每 30s 会向 NameServer 发送心跳,NameServer 收到心跳后更新 Broker 的 lastUpdateTimestamp。

Zookeeper 使用 ZAB 协议来保证节点之间数据的强一致性,这要求在每一个写请求都需要在节点上写事务日志,同时需要将内存数据持久化到磁盘以保证一致性和持久性。对于 RocketMQ 这种元数据非常少的简单场景,有点小题大做了。

放弃强一致而选择可用性也是 RocketMQ 放弃 Zookeeper 的选择,这也让 NameServer 的设计更加简单。

2.3 并发注册

NameServer 处理 Broker 注册的时候,考虑到多个 Broker 并发注册的问题,保存路由信息时采用了 ReadWriteLock 中的写锁,代码如下:

public RegisterBrokerResult registerBroker(
  //省略参数
  final Channel channel) {
 RegisterBrokerResult result = new RegisterBrokerResult();
 try {
  try {
   this.lock.writeLock().lockInterruptibly();
   Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
   if (null == brokerNames) {
    brokerNames = new HashSet<String>();
    this.clusterAddrTable.put(clusterName, brokerNames);
   }
   BrokerData brokerData = this.brokerAddrTable.get(brokerName);
   if (null == brokerData) {
    registerFirst = true;
    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
    this.brokerAddrTable.put(brokerName, brokerData);
   }
   BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
     new BrokerLiveInfo(
       System.currentTimeMillis(),
       topicConfigWrapper.getDataVersion(),
       channel,
       haServerAddr));
  } finally {
   this.lock.writeLock().unlock();
  }
 } catch (Exception e) {
 }
 return result;
}

2.4 Broker 上下线

如果有新的 Broker 加入时,NameServer 并不会主动向客户端推送新的 Broker 信息,而是需要客户端的定时任务(30s 一次)去主动拉取,这样客户端保存的路由信息跟 NameServer 会有短暂的不一致。

同样,Broker 掉线后,NameServer 会用定时任务(10s 一次)检测 Broker 最后更新时间是否超过 120s,如果超过就把 Broker 路由信息删除。在客户端,同样需要定时任务(30s 一次)去主动拉取,客户端保存的路由信息跟 NameServer 也会有短暂的不一致。

2.5 扩展性

从上面分析看到,NameServer 集群各节点是对等的,当集群有压力时,横向扩展非常容易。而 Zookeeper 在写扩展方面非常不灵活。

2.6 Broker 主从集群

在 Broker 主从集群中,RocketMQ 实现了基于 raft 协议的 DLedger 算法,可以基于 DLedger 进行日志复制。如果 Master 节点发生故障,可以基于 DLedger 自动进行主从切换。这可以完全不依赖于 Zookeeper 的实现。

2.7 运维

如果引入 Zookeeper,运维人员必须要具备运维 Zookeeper 的能力,这又增加了运维的复杂性。

3 总结

  1. 对于注册中心,RocketMQ 集群需要保存的元数据非常少,完全没有必要引入 Zookeeper 这种重量级的注册中心。
  2. RocketMQ 实现了基于 raft 协议的 DLedger 算法,可以保证 Broker 集群高可用,不用依赖 Zookeeper。
  3. NameServer 是 RocketMQ 内部组件,实现简单,易于扩展,不用考虑运维复杂性。
相关实践学习
Github实时数据分析与可视化
基于Github Archive公开数据集,将项目、行为等20+种事件类型数据实时采集至Hologres进行分析,并搭建可视化大屏。
阿里云实时数仓实战 - 项目介绍及架构设计
课程简介 1)学习搭建一个数据仓库的过程,理解数据在整个数仓架构的从采集、存储、计算、输出、展示的整个业务流程。 2)整个数仓体系完全搭建在阿里云架构上,理解并学会运用各个服务组件,了解各个组件之间如何配合联动。 3&nbsp;)前置知识要求 &nbsp; 课程大纲 第一章&nbsp;了解数据仓库概念 初步了解数据仓库是干什么的 第二章&nbsp;按照企业开发的标准去搭建一个数据仓库 数据仓库的需求是什么 架构 怎么选型怎么购买服务器 第三章&nbsp;数据生成模块 用户形成数据的一个准备 按照企业的标准,准备了十一张用户行为表 方便使用 第四章&nbsp;采集模块的搭建 购买阿里云服务器 安装 JDK 安装 Flume 第五章&nbsp;用户行为数据仓库 严格按照企业的标准开发 第六章&nbsp;搭建业务数仓理论基础和对表的分类同步 第七章&nbsp;业务数仓的搭建&nbsp; 业务行为数仓效果图&nbsp;&nbsp;
相关文章
|
5月前
|
消息中间件 网络协议 物联网
MQTT常见问题之物联网设备端申请动态注册时MQTT服务不可用如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
5月前
|
存储 缓存 分布式计算
ZooKeeper、Eureka注册中心对比
ZooKeeper、Eureka注册中心对比
374 0
|
5月前
|
网络协议 Nacos 数据安全/隐私保护
MSE微服务引擎注册问题之nacos注册失败如何解决
MSE(MicroService Engine)微服务引擎是阿里云提供的一种微服务治理平台,它通过提供服务注册、发现、配置管理等功能来支撑微服务架构的稳定运行;本合集旨在梳理MSE微服务引擎的核心特性、部署流程,以及实践中可能遇到的问题和相应的解决方案,以助力用户优化微服务架构的实施和管理。
|
12月前
|
分布式计算 Java Hadoop
05分布式电商项目 - 注册中心 Zookeeper
05分布式电商项目 - 注册中心 Zookeeper
65 0
|
2月前
|
XML Java 数据格式
Spring Cloud全解析:注册中心之zookeeper注册中心
使用ZooKeeper作为Spring Cloud的注册中心无需单独部署服务器,直接利用ZooKeeper服务端功能。项目通过`spring-cloud-starter-zookeeper-discovery`依赖实现服务注册与发现。配置文件指定连接地址,如`localhost:2181`。启动应用后,服务自动注册到ZooKeeper的`/services`路径下,形成临时节点,包含服务实例信息。
180 3
|
3月前
|
Nacos 微服务
Zookeeper 的 ZAB 协议 以及 zookeeper 与 nacos 注册中心比对
Zookeeper 的 ZAB 协议 以及 zookeeper 与 nacos 注册中心比对
53 4
|
3月前
|
Java Spring
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
spring cloud gateway在使用 zookeeper 注册中心时,配置https 进行服务转发
69 3
|
5月前
|
存储 缓存 分布式计算
对于注册中心,ZooKeeper、Eureka哪个更合适
对于注册中心,ZooKeeper、Eureka哪个更合适
322 1
|
5月前
|
微服务
三个微服务注册中心eureka、consul、zookeeper之间的异同点以及CAP理论图
三个微服务注册中心eureka、consul、zookeeper之间的异同点以及CAP理论图
334 0
|
5月前
|
Java Spring
spring cloud使用zookeeper作为注册中心—consumer
spring cloud使用zookeeper作为注册中心—consumer
47 0