代码很少,却很优秀!RocketMQ的NameServer做到了!

本文涉及的产品
可视分析地图(DataV-Atlas),3 个项目,100M 存储空间
简介: 本文深入剖析了RocketMQ的注册中心NameServer,基于RocketMQ release-5.2.0版本。NameServer作为Broker、Producer与Consumer之间的纽带,仅由少数几个类构成,却实现了高性能与轻量化。文章详细介绍了NameServer的AP设计思想、简洁的数据结构及心跳机制。AP设计避免了复杂的分布式协议,简化了网络开销;数据结构主要包括路由表、Broker信息等;心跳机制则通过定时扫描确保Broker的活跃状态。通过这些核心设计,NameServer实现了高效稳定的注册与发现功能。

你好,我是猿java。

今天我们来一起深入分析 RocketMQ的注册中心 NameServer。

本文基于 RocketMQ release-5.2.0

首先,我们回顾下 RocketMQ的内核原理鸟瞰图:

rocketmq-kernel.png

从上面的鸟瞰图,我们可以看出:Nameserver即和 Broker交互,也和 Producer和 Consumer交互,因此,在 RocketMQ中,Nameserver起到了一个纽带性的作用。

接着,我们再看看 NameServer的工程结构,如下图:

nameserver-project.png

整个工程只有 11个类(老版本好像只有不到 10个类),为什么 RocketMQ可以用如此少的 Class类,设计出如此高性能且轻量的注册中心?

我觉得最核心的有 3点是:

  1. AP设计思想
  2. 简单的数据结构
  3. 心跳机制

AP设计思想

像 ZooKeeper,采用了 Zab (Zookeeper Atomic Broadcast) 这种比较重的协议,必须大多数节点(过半数)可用,才能确保了数据的一致性和高可用,大大增加了网络开销和复杂度。

而 NameServer遵守了 CAP理论中 AP,在一个 NameServer集群中,NameServer节点之间是P2P(Peer to Peer)的对等关系,并且 NameServer之间并没有通信,减少很多不必要的网络开销,即便只剩一个 NameServer节点也能继续工作,足以保证高可用。

数据结构

NameServer维护了一套比较简单的数据结构,内部维护了一个路由表,该路由表包含以下几个核心元数据,对应的源码类RouteInfoManager如下:

public class RouteInfoManager {
   
    private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; // broker失效时间 120s
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
  • topicQueueTable: Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
  • brokerAddrTable: Broker基础信息,包括brokerName、所属集群名称、主备Broker地址
  • clusterAddrTable: Broker集群信息,存储集群中所有Broker名称
  • brokerLiveTable: Broker状态信息,NameServer每次收到心跳包是会替换该信息
  • filterServerTable: Broker上的FilterServer列表,用于过滤标签(Tag)或 SQL表达式,以减轻 Consumer的负担,提高消息消费的效率。

TopicRouteData

TopicRouteData是 NameServer中最重要的数据结构之一,它包括了 Topic对应的所有 Broker信息以及每个 Broker上的队列信息,filter服务器列表,其源码如下:

public class TopicRouteData {
   
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String, List<String>> filterServerTable;
    //It could be null or empty
    private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
}

BrokerData

BrokerData包含了 Broker的基本属性,状态,所在集群以及 Broker服务器的 IP地址,其源码如下:

public class BrokerData {
   
    private String cluster;//所在的集群
    private String brokerName;//所在的brokerName
    private HashMap<Long, String> brokerAddrs;//该broker对应的机器IP列表
    private String zoneName; // 区域名称
}

QueueData

QueueData包含了 BrokerName,readQueue的数量,writeQueue的数量等信息,对应的源码类是QueueData,其源码如下:

public class QueueData {
   
    private String brokerName;//所在的brokerName
    private int readQueueNums;// 读队列数量
    private int writeQueueNums;// 写队列数量
    private int perm; // 读写权限,参考PermName 类
    private int topicSysFlag; // topic同步标记,参考TopicSysFlag 类
}

元数据举例

为了更好地理解元数据,这里对每一种元数据都给出一个数据实例:

topicQueueTable:{
    "topicA":[
        {
            "brokeName":"broker-a",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6, 
            "topicSyncFlag":0 
        },
        {
            "brokeName":"broker-b",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6, 
            "topicSyncFlag":0
        }
    ],
    "topicB":[]
}
brokeAddrTable:{
    "broker-a":{
        "cluster":"cluster-1",
        "brokerName":"broker-a",
        "brokerAddrs":{
            0:"192.168.0.1:8000",
            1:"192.168.0.2:8000"
        }
    },
    "broker-b":{
        "cluster":"cluster-1",
        "brokerName":"broker-b",
        "brokerAddrs":{
            0:"192.168.0.3:8000",
            1:"192.168.0.4:8000"
        }
    }
}
brokerLiveTable:{
    "192.168.0.1:8000":{
        "lastUpdateTimestamp":1533434434344,//long 的时间戳
        "dataVersion":dataVersionObj, //参考DataVersion类
        "channel":channelObj,// 参考io.netty.channel.Channel
        "haServerAddr":"192.168.0.2:8000"
    },
    "192.168.0.2:8000":{
        "lastUpdateTimestamp":1533434434344,//long 的时间戳
        "dataVersion":dataVersionObj, //参考DataVersion类
        "channel":channelObj,// 参考io.netty.channel.Channel
        "haServerAddr":"192.168.0.1:8000"
    },
    "192.168.0.3:8000":{ },
    "192.168.0.4:8000":{ },
}
clusterAddrTable:{
    "cluster-1":[{"broker-a"},{"broker-b"}],
    "cluster-2":[],
}
filterServerTable:{
    "192.168.0.1:8000":[{"192.168.0.1:7000"}{"192.168.0.1:9000"}],
    "192.168.0.2:8000":[{"192.168.0.2:7000"}{"192.168.0.2:9000"}],
}

心跳机制

心跳机制是 NameServer维护 Broker的路由信息最重要的一个抓手,主要分为接收心跳、处理心跳、心跳超时 3部分:

接收心跳

Broker每 30s会向所有的 NameServer发送心跳包,告诉它们自己还存活着,从而更新自己在 NameServer的状态,整体交互如下图:
img.png

处理心跳

NameServer收到心跳包时会更新 brokerLiveTable缓存中 BrokerLiveInfo的 lastUpdateTimeStamp信息,整体交互如下图:

nameserver-heart-receive.png

处理逻辑可以参考源码:
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest#brokerHeartbeat:

public RemotingCommand brokerHeartbeat(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
   
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final BrokerHeartbeatRequestHeader requestHeader =
        (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);

    this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getClusterName(), requestHeader.getBrokerAddr());

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

心跳超时

NameServer每隔 10s(每隔5s + 5s延迟)扫描 brokerLiveTable检查 Broker的状态,如果在 120s内未收到 Broker心跳,则认为 Broker异常,会从路由表将该 Broker摘除并关闭 Socket连接,同时还会更新路由表的其他信息,整体交互如下图:

nameserver-heart-expired.png

private void startScheduleService() {
   
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
        5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
}

源码参考:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unRegisterBroker(),核心流程:

  1. 遍历brokerAddrTable
  2. 遍历broker地址
  3. 根据 broker地址移除 brokerAddr
  4. 如果当前 Topic只包含待移除的 Broker,则移除该 Topic

其他核心源码解读

NameServer启动

NameServer的启动类为:org.apache.rocketmq.namesrv.NamesrvStartup,整个流程如下图:
nameserver-startup.png

NameServer启动最核心的 3个事情是:

  1. 加载配置:NameServerConfig、NettyServerConfig主要是映射配置文件,并创建 NamesrvController。
  2. 启动 Netty通信服务:NettyRemotingServer是 NameServer和Broker,Producer,Consumer通信的底层通道 Netty服务器。
  3. 启动定时器和钩子程序:NameServerController实例一方面处理 Netty接收到消息后,一方面内部有多个定时器和钩子程序,它是 NameServer的核心控制器。

总结

NameServer并没有采用复杂的分布式协议来保持数据的一致性,而是采用 CAP理论中的 AP,各个节点之间是Peer to Peer的对等关系,数据的一致性通过心跳机制,定时器,延时感知来完成。

NameServer最核心的 3点设计是:

  1. AP的设计思想
  2. 简单的数据结构
  3. 心跳机制

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注:猿java,持续输出硬核文章。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 安全 fastjson
消息队列 MQ使用问题之NameServer集群是什么结构
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之如何设置nameserver监听的IP
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。
|
消息中间件 存储 负载均衡
RocketMQ 源码分析——NameServer
- 编写优雅、高效的代码。RocketMQ作为阿里双十一交易核心链路产品,支撑千万级并发、万亿级数据洪峰。读源码可以积累编写高效、优雅代码的经验。 - 提升微观的架构设计能力,重点在思维和理念。Apache RocketMQ作为Apache顶级项目,它的架构设计是值得大家借鉴的。 - 解决工作中、学习中的各种疑难杂症。在使用RocketMQ过程中遇到消费卡死、卡顿等问题可以通过阅读源码的方式找到问题并给予解决。 - 在BATJ一线互联网公司面试中展现优秀的自己。大厂面试中,尤其是阿里系的公司,你有RocketMQ源码体系化知识,必定是一个很大的加分项。
220 0
|
消息中间件 存储 Java
RocketMQ的NameServer执行流程学习梳理
首先NamesrvStartUp启动,首先经过main()方法,也是我们常见的main方法进入到main0()执行创建controller操作与启动controller操作这两个操作。而创建controller的操作则首先需要拿到namesrvConfig的配置信息和NettyServerConfig的配置信息,此时会 创建这两个对象,并填充配置信息然后放入到创建的controller对象中的构造函数中,并进行controller的启动操作,而启动操作首先会初始化一些信息和添加jvm钩子,也即会进行如下操作:加载键值对配置管理器、创建远程服务器remotingServer,创建远程线程池rem
123 2
RocketMQ的NameServer执行流程学习梳理
|
消息中间件 RocketMQ
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
372 0
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
|
消息中间件 数据可视化 Java
RocketMq之nameserver源码阅读
RocketMq之nameserver源码阅读
117 0
RocketMq之nameserver源码阅读
|
消息中间件 存储 负载均衡
RocketMQ的路由中心:NameServer源码解析
RocketMQ的路由中心:NameServer源码解析
310 0
RocketMQ的路由中心:NameServer源码解析
|
消息中间件 存储 负载均衡
MQ系列4:NameServer 原理解析
MQ系列4:NameServer 原理解析
304 0
MQ系列4:NameServer 原理解析
|
消息中间件 Java Shell
RocketMQ概念详细之NameServer
NameServer 相关信息
4957 0