代码很少,却很优秀!RocketMQ的NameServer做到了!

简介: 本文深入剖析了RocketMQ的注册中心NameServer,基于RocketMQ release-5.2.0版本。NameServer作为Broker、Producer与Consumer之间的纽带,仅由少数几个类构成,却实现了高性能与轻量化。文章详细介绍了NameServer的AP设计思想、简洁的数据结构及心跳机制。AP设计避免了复杂的分布式协议,简化了网络开销;数据结构主要包括路由表、Broker信息等;心跳机制则通过定时扫描确保Broker的活跃状态。通过这些核心设计,NameServer实现了高效稳定的注册与发现功能。

你好,我是猿java。

今天我们来一起深入分析 RocketMQ的注册中心 NameServer。

本文基于 RocketMQ release-5.2.0

首先,我们回顾下 RocketMQ的内核原理鸟瞰图:

rocketmq-kernel.png

从上面的鸟瞰图,我们可以看出:Nameserver即和 Broker交互,也和 Producer和 Consumer交互,因此,在 RocketMQ中,Nameserver起到了一个纽带性的作用。

接着,我们再看看 NameServer的工程结构,如下图:

nameserver-project.png

整个工程只有 11个类(老版本好像只有不到 10个类),为什么 RocketMQ可以用如此少的 Class类,设计出如此高性能且轻量的注册中心?

我觉得最核心的有 3点是:

  1. AP设计思想
  2. 简单的数据结构
  3. 心跳机制

AP设计思想

像 ZooKeeper,采用了 Zab (Zookeeper Atomic Broadcast) 这种比较重的协议,必须大多数节点(过半数)可用,才能确保了数据的一致性和高可用,大大增加了网络开销和复杂度。

而 NameServer遵守了 CAP理论中 AP,在一个 NameServer集群中,NameServer节点之间是P2P(Peer to Peer)的对等关系,并且 NameServer之间并没有通信,减少很多不必要的网络开销,即便只剩一个 NameServer节点也能继续工作,足以保证高可用。

数据结构

NameServer维护了一套比较简单的数据结构,内部维护了一个路由表,该路由表包含以下几个核心元数据,对应的源码类RouteInfoManager如下:

public class RouteInfoManager {
   
    private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; // broker失效时间 120s
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
  • topicQueueTable: Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
  • brokerAddrTable: Broker基础信息,包括brokerName、所属集群名称、主备Broker地址
  • clusterAddrTable: Broker集群信息,存储集群中所有Broker名称
  • brokerLiveTable: Broker状态信息,NameServer每次收到心跳包是会替换该信息
  • filterServerTable: Broker上的FilterServer列表,用于过滤标签(Tag)或 SQL表达式,以减轻 Consumer的负担,提高消息消费的效率。

TopicRouteData

TopicRouteData是 NameServer中最重要的数据结构之一,它包括了 Topic对应的所有 Broker信息以及每个 Broker上的队列信息,filter服务器列表,其源码如下:

public class TopicRouteData {
   
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String, List<String>> filterServerTable;
    //It could be null or empty
    private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
}

BrokerData

BrokerData包含了 Broker的基本属性,状态,所在集群以及 Broker服务器的 IP地址,其源码如下:

public class BrokerData {
   
    private String cluster;//所在的集群
    private String brokerName;//所在的brokerName
    private HashMap<Long, String> brokerAddrs;//该broker对应的机器IP列表
    private String zoneName; // 区域名称
}

QueueData

QueueData包含了 BrokerName,readQueue的数量,writeQueue的数量等信息,对应的源码类是QueueData,其源码如下:

public class QueueData {
   
    private String brokerName;//所在的brokerName
    private int readQueueNums;// 读队列数量
    private int writeQueueNums;// 写队列数量
    private int perm; // 读写权限,参考PermName 类
    private int topicSysFlag; // topic同步标记,参考TopicSysFlag 类
}

元数据举例

为了更好地理解元数据,这里对每一种元数据都给出一个数据实例:

topicQueueTable:{
    "topicA":[
        {
            "brokeName":"broker-a",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6, 
            "topicSyncFlag":0 
        },
        {
            "brokeName":"broker-b",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6, 
            "topicSyncFlag":0
        }
    ],
    "topicB":[]
}
brokeAddrTable:{
    "broker-a":{
        "cluster":"cluster-1",
        "brokerName":"broker-a",
        "brokerAddrs":{
            0:"192.168.0.1:8000",
            1:"192.168.0.2:8000"
        }
    },
    "broker-b":{
        "cluster":"cluster-1",
        "brokerName":"broker-b",
        "brokerAddrs":{
            0:"192.168.0.3:8000",
            1:"192.168.0.4:8000"
        }
    }
}
brokerLiveTable:{
    "192.168.0.1:8000":{
        "lastUpdateTimestamp":1533434434344,//long 的时间戳
        "dataVersion":dataVersionObj, //参考DataVersion类
        "channel":channelObj,// 参考io.netty.channel.Channel
        "haServerAddr":"192.168.0.2:8000"
    },
    "192.168.0.2:8000":{
        "lastUpdateTimestamp":1533434434344,//long 的时间戳
        "dataVersion":dataVersionObj, //参考DataVersion类
        "channel":channelObj,// 参考io.netty.channel.Channel
        "haServerAddr":"192.168.0.1:8000"
    },
    "192.168.0.3:8000":{ },
    "192.168.0.4:8000":{ },
}
clusterAddrTable:{
    "cluster-1":[{"broker-a"},{"broker-b"}],
    "cluster-2":[],
}
filterServerTable:{
    "192.168.0.1:8000":[{"192.168.0.1:7000"}{"192.168.0.1:9000"}],
    "192.168.0.2:8000":[{"192.168.0.2:7000"}{"192.168.0.2:9000"}],
}

心跳机制

心跳机制是 NameServer维护 Broker的路由信息最重要的一个抓手,主要分为接收心跳、处理心跳、心跳超时 3部分:

接收心跳

Broker每 30s会向所有的 NameServer发送心跳包,告诉它们自己还存活着,从而更新自己在 NameServer的状态,整体交互如下图:
img.png

处理心跳

NameServer收到心跳包时会更新 brokerLiveTable缓存中 BrokerLiveInfo的 lastUpdateTimeStamp信息,整体交互如下图:

nameserver-heart-receive.png

处理逻辑可以参考源码:
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest#brokerHeartbeat:

public RemotingCommand brokerHeartbeat(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
   
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final BrokerHeartbeatRequestHeader requestHeader =
        (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);

    this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getClusterName(), requestHeader.getBrokerAddr());

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

心跳超时

NameServer每隔 10s(每隔5s + 5s延迟)扫描 brokerLiveTable检查 Broker的状态,如果在 120s内未收到 Broker心跳,则认为 Broker异常,会从路由表将该 Broker摘除并关闭 Socket连接,同时还会更新路由表的其他信息,整体交互如下图:

nameserver-heart-expired.png

private void startScheduleService() {
   
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
        5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);
}

源码参考:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unRegisterBroker(),核心流程:

  1. 遍历brokerAddrTable
  2. 遍历broker地址
  3. 根据 broker地址移除 brokerAddr
  4. 如果当前 Topic只包含待移除的 Broker,则移除该 Topic

其他核心源码解读

NameServer启动

NameServer的启动类为:org.apache.rocketmq.namesrv.NamesrvStartup,整个流程如下图:
nameserver-startup.png

NameServer启动最核心的 3个事情是:

  1. 加载配置:NameServerConfig、NettyServerConfig主要是映射配置文件,并创建 NamesrvController。
  2. 启动 Netty通信服务:NettyRemotingServer是 NameServer和Broker,Producer,Consumer通信的底层通道 Netty服务器。
  3. 启动定时器和钩子程序:NameServerController实例一方面处理 Netty接收到消息后,一方面内部有多个定时器和钩子程序,它是 NameServer的核心控制器。

总结

NameServer并没有采用复杂的分布式协议来保持数据的一致性,而是采用 CAP理论中的 AP,各个节点之间是Peer to Peer的对等关系,数据的一致性通过心跳机制,定时器,延时感知来完成。

NameServer最核心的 3点设计是:

  1. AP的设计思想
  2. 简单的数据结构
  3. 心跳机制

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注:猿java,持续输出硬核文章。

目录
相关文章
|
23天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
16天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
20天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2574 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
18天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
3天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
2天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
159 2
|
20天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1575 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
22天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
962 14
|
3天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
204 2
|
17天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
727 10