RocketMQ-笔记 1:namesrv

简介: RocketMQ技术内幕-路由注册,删除,发现

1. namesrv的来源

  • namesrv整体来说和zookeeper的类似,为了保证消息的高可用,我们通常会布置多台broker,那么客户端在发送消息的时候怎么去选择,或者某一台宕机了,客户端怎么去感知到,namesrv就是为了解决这种问题,负责管理集群,broker,topic等元数据,动态的维护broker列表,剔除已经宕机的broker,根据负载均衡算法可以选择到一台broker。

2. 解析namesrv的启动流程

  • 这个我们需要参考一下源码看一下启动方法和重要的参数信息:.NamesrvStartup

    • 1,解析配置文件填充属性到NamesrvConfig和NettyServerConfig两个对象中,配置文件的来源是主要是在启动的时候通过-c 指定文件的。
    • 2,根据启动的属性创建namesrvController对象,并且进行初始化
    • 3,加载kv配置,并且创建netty网络传输对象,然后启动两个定时任务。

      • namesrv每隔10s中扫描一次broker,根据最后一次的心跳检测时间剔除已经挂掉的broker;
      • namesrv每隔10分钟打印一次kv配置信息
    • 4,注册JVM 钩子函数并启动服务器,以便监听Broker、消息生产者的网络请求。

3. 两主两从的部署结构图,下面的解析都是按照这个图进行解析的

QQ_20191024173006

4. 解析路由元数据的数据结构

  • 下面主要是namesrv主要记录了哪些元数据,其主要是在RouteInfoManager中的5个hashMap中

    • HashMap> topicQueueTable : topic 消息队列路由信息
{
    "topicTest": [
        {
            "brokerName": "brokerName-a",
            "readQueueNums": 4,
            "writeQueueNums": 4,
            "perm": 6,
            "topicSynFlag": 0
        }
    ]
}
  • HashMap brokerAddrTable:broker的集群信息,包含集群,name,地址
{
    "broker-a": {
        "cluster": "c1",
        "brokerName": "broker-a",
        "brokerAddrs": [
            "0:192.168.8.101:10000",
            "1:192.168.8.102:10000"
        ]
    },

    "broker-b": {
        "cluster": "c1",
        "brokerName": "broker-b",
        "brokerAddrs": [
            "0:192.168.8.103:10000",
            "1:192.168.8.104:10000"
        ]
    }
}
  • HashMap> clusterAddrTable : Broker 集群信息
{
    "c1": [
        "broker-a",
        "broker-b"
    ]
}
  • HashMap brokerLiveTable : broker状态信息
{
    "192.168.8.101:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": "192.168.8.102:10000"
    },

    "192.168.8.102:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": ""
    },
    "192.168.8.103:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": "192.168.8.104:10000"
    },

    "192.168.8.104:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": ""
    }
}
  • HashMap> filterServerTable : broker上的FilterServer列表。
{
    "192.168.8.101:10000": ["192.168.9.101:10000"],

    "192.168.8.102:10000": ["192.168.9.102:10000"]
}

4. 路由信息的注册

  • RocketMQ的路由注册主要是broker与nameSrv的心跳功能检测,当broker启动的时候会向集群中所有的namesrv发送心跳语句,每隔30s向namesrv发送心跳包,namesrv收到心跳包之后会更新brokerLiveTable缓存中的lastUpdateTimestamp时间,然后nameSrv的定时任务会每隔10s扫描brokerLiveTable,如果超过120s没有收到心跳包则将其剔除并且关闭socket连接。
  • 源码层次解析步骤

    • 1,brokerController的start方法中发送心跳包注册registerBrokerAll,
    • 2,调用BrokerOuterAPI中的registerBrokerAll中主要遍历所有的nameSrv列表,BrokerSrv依次向nameSrv列表发送心跳包
    • 3,调用BrokerOuterAPI的registerBroker发送request信息,RocketMQ是基于netty传输的,如果我们需要网络跟踪,rocketMQ会为每个消息生成一个requestCode,然后服务端会有对应的网络处理器(processs包中),只需整库搜索 questCode 即可找到相应的处理逻辑

      • 心跳包request的信息

        • brokerAddr:地址信息
        • brokerId:0代表master,大于0代表slave
        • brokerName:名称
        • clusterName:集群名称
        • haServerAddr:master地址,初次请求时为空,slave向nameSrv注册之后返回
        • requestBody:

          • filterServerList:消息过滤列表
          • topicConfigWrapper:主体配置
        • body:消息体
    • nameSrv接收到网络请求之后开始进行处理,DefaultRequestProcessor默认的处理器,如果请求类型为 RequestCode REGISTER_BROKER,那么最终会被转发到RoutelnfoManager#registerBroker。

      • 1,首先在方法开始加入一个写锁,防止并发的修改RouteInfoManager中的路由表,然后开始维护clusterAddrTable信息,首先判断broker所属的集群是不是已经存在,如果不存在则创建并且将broker名称放入到集合中。
      • 2,维护BrokerData,首先从brokerAddrTable中根据brokerName获取broker信息,如果获取不到则将新建的brokerData信息放入并且将registerFirst设置为true,表示第一次注册,如果已经存在则将其替换并且将registerFirst设置为false,
      • 3,如果broker为master,并且Broker Topic配置信息发生变化或者是初次注册,那么则需要创建或者更新topicQueueTable的信息,如果在发送消息的时候topic是不存在的,那么如果设置了brokerConfig中的autoCreateTopicEnable为true,就会返回一个rocketMQ的默认的路由信息。
      • 4,更新brokerLiveInfo的最后更新时间,这个是路由删除的最重要的列表
      • 5,注册Broker上的filter过滤列表
      • 6,完成路由的注册

5. 路由信息的删除

  • 路由删除的两种方式

    • broker主动方式:当broker正常关机的时候,会发送一个unregisterBroker指令
    • nameSrv主动方式:nameSrv在启动的时候,会开启两个定时任务,其中有一个是每隔10s扫描一次brokerLiveInfo列表,然后获取列表中每一个元素的lastupdateTime,如果超过120s没有收到broker的心跳包则会将其剔除列表关闭channel,

      • 判断时间超过120s
      • 申请写锁
      • 根据brokerAddress删除brokerLiveTable和FilterServerTable
      • 维护brokerAddrTable,遍历brokerAddrTable获取BrokeData,然后从brokeData中的属性brokerAddr(map集合)中剔除,如果剔除之后集合为空则从brokerAddrTable一处brokerName
      • 根据 BrokerName,从 clusterAddrTable 中找到 Broker并从集群中移除,如果移
        除后,集群中不包含任何 Broker,则将改集群从clusterAddrTable 中一处
      • 根据 brokerName遍历所有主题的队列,如果队列中包含了当前 Broker的队列则移除,如果 topic 只包含待移除 Broker 的队列的话,从路由表中删除该 topic
      • 释放写锁
        然后申请一个写锁(防止并发更新)删除brokerLiveTable,FilterServerTable信息,然后维护brokerAddrTable,clusterAddrTable。

6. 路由信息的发现

  • RocketMQ路由并非是实时的,当topic路由发生变化的时候,不会主动推送给客户端,而是由客户端定时的拉取最新的路由信息。
  • 返回给客户端的对象是TopicRouteData,其结构是
orderTopicConf:顺序消息配置内容,来自于kvConfig
List<QueueData> queueDatas:topic队列元数据
List<BrokerData> brokerDatas:topic 分布的 broker 元数据
HashMap< String/ * brokerAdress*/,List<String> /* filt rServer* /> : broker 上过滤服务器地址列表
  • 路由发现实现类DefaultRequestProcessor的getRoutelnfoByTopic

    • 调用 RouterlnfoManager 的方法,从路由 topicQueueTable brokerAddrTable
  1. terServerTable 中分别填充 TopicRouteData 中的 List

filterServer 地址表

  • 如果找到主题对应的路由信息并且该主题为顺序消息,则从 NameServer
    KVconfig 中获取关于顺序消息相 的配置填充路由信息
  • 如果找不到路由信息 CODE 则使用 TOPIC NOT_EXISTS ,表示没有找到对应的路由

7.总结

QQ_20191028170531

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 Apache
MQ产品使用合集之RocketMQ如果配置所有的ip,有些namesrv挂了的话,消息就发送失败了,消费也是失败的如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
226 2
|
消息中间件 Kafka RocketMQ
消息中间件(RocketMQ)笔记
分布式消息中间件,主要是实现分布式系统中解耦、异步消息、流量销锋、日志处理等场景。生产中用的最多的消息队列有Activemq,rabbitmq,kafka,rocketmq等。 以 Jms 规范和 rocketmq 为主来分享。版本基于 3.2.6 。 主要分享:JMS规范、Rocketmq的介绍、部署方式、特性的一些使用。
396 81
消息中间件(RocketMQ)笔记
|
消息中间件 存储 缓存
RocketMQ最新版源码剖析注释笔记 关注我的技术博客:
RocketMQ最新版源码剖析注释笔记 关注我的技术博客:
510 0
|
消息中间件 存储 RocketMQ
RocketMQ-笔记,简介整体架构
rocketMQ技术内幕笔记
1482 0
|
4月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
209 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
837 99
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
354 110
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
201 1