MQ系列4:NameServer 原理解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: MQ系列4:NameServer 原理解析

1 关于NameServer

上一节的 MQ系列3:RocketMQ 架构分析,我们大致介绍了 RocketMQ的基本组件构成,包括 NameServer、Broker、Producer以及Consumer四部分。

NameServer,指的是服务可以根据给定的名字来进行资源或对象的地址定位,并获取有关的属性信息。在Rocket中也一样,NameServer是 RocketMQ 的服务注册中心(类似于 Kafka 集群 后面的 Zookeeper 集群一样, 对集群元数据进行管理),根据元数据(ip、port和router信息)来唯一定位服务。RocketMQ 需要先启动 NameServer ,再启动 Rocket 中的 Broker。

2 NameServer运行流程

2.1 注册

注册发生在Broker启动之后,启动后快速与NameServer建立长连接,并每30s对NameService发送一次心跳包,Broker会将自己的IP Address、Port、Router 等信息随着心跳一并注册到 NameServer中。

这里的RouterInfo 主要指Broker下包含哪些Topic信息,这种映射关系方便后面消息的生产和消费的时候进行寻址。

image.png

注册使用到的核心数据结构如下:

HashMap brokerAddrTable

  • HashMap 的 Key 是 Broker 的名称,存储了一个Broker服务所对应的属性信息。
  • Value 是个对象,数据结构如下:

字段

类型

说明

cluster

String

所属的集群名称

broker

String

broker的名称

brokerAddress

HashMap

Broker的IP地址列表,包含一个Master IP地址列表 和 多个Slave IP地址列表

" Broker-A":{ "cluster":"Broker-Cluster", "brokerName":"Broker-A", "cluster":{  // 1主2从    "0":"192.168.0.1:1234",    "1":"192.168.0.2:1234",    "2":"192.168.0.3:1234" } }

2.2 注册信息更新

当你对你的Broker中的Topic信息进行更新了(增、删、改)怎么办,你才需要重新将信息注册到NameServer中。

  • 如果你创建了新的 Topic,Broker会向 NameServer 发送注册信息,接收到信息后会对每个Master 角色的Broker ,创建一个新的QueueData对象。
  • 如果你修改了Topic,则NameServer 会先把旧的 QueueData 删除,在加一个新的 QueueData。
  • 如果你删除了Topic,则NameServer 会将对应的 QueueData 删除。

image.png

使用到的核心数据结构如下:

HashMap> topicQueueTable

  • HashMap 的 Key 是 Topic 的名称,里面存储了Topic的所有属性信息。
  • Value 是个列表,列表的数据类型是 QueueData,列表的length就是Topic中的 Master角色的 Broker 个数。
  • QueueData的结构如下

字段

类型

说明

brokerName

String

broker名称

readQueueNums

Long

读Queue的数量

writeQueueNums

Long

写Queue的数量

perm

Integer

权限 PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0

topicSyncFlag

Long

同步的位置标识

{   "topic-test":[ // topic名称,注意下面会用到    {      "brokerName":"Broker-A", "readQueueNums":37, "writeQueueNums":37, "perm":6,  // 读写权限 "topicSynFlag":12    },    {      "brokerName":"Broker-B", "readQueueNums":37, "writeQueueNums":37, "perm":6,  // 读写权限 "topicSynFlag":12    }   ] }

参考RocketMQ源码如下,这边加了注释,方便理解:

   /**      * 创建或者更新 MessageQueue 的数据      * @param brokerName      * @param topicConfig      */     private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {         QueueData queueData = new QueueData();         queueData.setBrokerName(brokerName); // broker 名称         queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());   // 读Queue的数量         queueData.setReadQueueNums(topicConfig.getReadQueueNums());  // 写Queue的数量         queueData.setPerm(topicConfig.getPerm());  // 权限: PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0         queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());         List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());         if (null == queueDataList) {  // 新增             queueDataList = new LinkedList<QueueData>();             queueDataList.add(queueData);             this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);             log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);         } else {   // 更新             boolean addNewOne = true;                Iterator<QueueData> it = queueDataList.iterator();             while (it.hasNext()) {                 QueueData qd = it.next();                 if (qd.getBrokerName().equals(brokerName)) {                     if (qd.equals(queueData)) {                         addNewOne = false;                     } else {                         log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,                                 queueData);                         it.remove();   // 先删除                     }                 }             }             if (addNewOne) {                 queueDataList.add(queueData);   // 再添加             }         }     }

2.3 异常清理

如果Broker挂掉,那么再被消息的生产者和消费者使用就会有问题了。这时候需要对已经宕掉的Broker进行清理,确保NamServer中注册的Broker服务信息都是Alive的。它的做法是这样的:

  • 前面我们说了,Broker每30s对NameService发送一次心跳包给NameServer
  • NameServer接收到心跳包的时候,会将当前时间戳更新到

brokerLiveTable 表的

lastUpdateTimestamp 字段中。

  • NameServer中会启动一个定时任务
  • 每10s(记住这边扫描是10s间隔,与上面心跳包区分开)扫描 一下

brokerLiveTable 表

  • 检查

lastUpdateTimestamp字段,如果时间戳与当前时间相隔超过 120s(即两分钟),则认为 Broker 已经宕了,并会将broker清除出NameServer的注册表。

使用到的核心数据结构如下:

HashMap brokerLiveTable

  • HashMap 的 Key 是 Broker服务器的地址信息(IP+Port),里面存储了该Broker服务器的基本信息。
  • Value 是个对象,结构如下:

字段

类型

说明

lastUpdateTimestamp

Long

最后一次收到心跳包的时间戳

dataVersion

DataVersion

数据版本号对象

channel

Channel

netty的Channel,IO数据交互媒介

haServerAddr

String

master地址,初次请求的时候值为空,slave向NameServer注册之后返回

2.4 消息生产和消费

上面的步骤都完成之后,NameServer这个 "中央大脑" 正式开始投入使用。这时候 ,消息的生产和消费具体是怎么做的呢?

  • Producer 或者 Consumer 启动之后会和 NameServer 建立长连接
  • 定时(默认为每30s)从 NameServer 获取Routers信息,并将路由信息保存至Producer或者Consumer的本地。
  • Producer发送一条消息

hello-brand 到 topic (

topic-test) 中

  • 因为名称为

topic-test 的 topic 存在于多个 broker中,所以需要如下几个步骤,才能找到具体的地址:

  • 先 根据 topic 名称

topic-test 查询

topicQueueTable , 选择一个并获取它的broker信息(包含brokerName)

  • 再根据已经获取到的brokerName 查询

brokerAddressTable 获取具体的Broker IP地址(一般包含1个Master和n个Slave的IP地址)

  • 拿到IP地址之后,生产者与broker建立连接,并发送消息
  • 消费者同理

3 总结

上述的流程图比较清晰的描述如下运转流程:

image.png

  • NameServer 作为整个 RocketMQ 的“中央大脑” ,负责对集群元数据进行管理,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。
  • Broker 启动后,与 NameServer 保持长连接,每 30s 发送一次发送心跳包,来确保Broker是否存活。并将 Broker 信息 ( IP+、端口等信息)以及Broker中存储的Topic信息上报。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  • NameServer有个定时任务,每10s扫描下

brokerLiveTable表 , 如果检测到某个Broker 宕机(因为使用心跳机制, 如果检测超120s(两分钟)无上报心跳),则从路由注册表中将其移除。

  • 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(通过topic名称查询

topicQueueTable获得broker名称,通过broker名称查询

brokerAddressTable获取具体的Broker IP地址),然后根据负载均衡算法从列表中选择1台Broker ,建立连接通道,进行消息发送。

  • 消费者在订阅某个topic的消息之前从 NamerServer 获取 Broker 服务器地址列表(同上),包括关联的全部Topic队列信息。进而获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费数据。
  • 生产者和消费者默认每30s 从 NamerServer 获取 Broker 服务器地址列表,以及关联的所有Topic队列信息,更新到Client本地。

参考:

https://zhuanlan.zhihu.com/p/388807516

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9天前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
48 13
|
27天前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
61 1
|
2月前
|
存储 算法 Java
解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用
在Java中,Set接口以其独特的“无重复”特性脱颖而出。本文通过解析HashSet的工作原理,揭示Set如何利用哈希算法和equals()方法确保元素唯一性,并通过示例代码展示了其“无重复”特性的具体应用。
56 3
|
3天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
26 1
|
1月前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
1月前
|
运维 持续交付 虚拟化
深入解析Docker容器化技术的核心原理
深入解析Docker容器化技术的核心原理
47 1
|
28天前
|
存储 供应链 算法
深入解析区块链技术的核心原理与应用前景
深入解析区块链技术的核心原理与应用前景
54 0
|
1月前
|
算法 Java 数据库连接
Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性
本文详细介绍了Java连接池技术,从基础概念出发,解析了连接池的工作原理及其重要性。连接池通过复用数据库连接,显著提升了应用的性能和稳定性。文章还展示了使用HikariCP连接池的示例代码,帮助读者更好地理解和应用这一技术。
60 1
|
1月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
1月前
|
JavaScript 前端开发 API
Vue.js响应式原理深度解析:从Vue 2到Vue 3的演进
Vue.js响应式原理深度解析:从Vue 2到Vue 3的演进
57 0

推荐镜像

更多