RocketMQ-笔记 1:namesrv

简介: RocketMQ技术内幕-路由注册,删除,发现

1. namesrv的来源

  • namesrv整体来说和zookeeper的类似,为了保证消息的高可用,我们通常会布置多台broker,那么客户端在发送消息的时候怎么去选择,或者某一台宕机了,客户端怎么去感知到,namesrv就是为了解决这种问题,负责管理集群,broker,topic等元数据,动态的维护broker列表,剔除已经宕机的broker,根据负载均衡算法可以选择到一台broker。

2. 解析namesrv的启动流程

  • 这个我们需要参考一下源码看一下启动方法和重要的参数信息:.NamesrvStartup

    • 1,解析配置文件填充属性到NamesrvConfig和NettyServerConfig两个对象中,配置文件的来源是主要是在启动的时候通过-c 指定文件的。
    • 2,根据启动的属性创建namesrvController对象,并且进行初始化
    • 3,加载kv配置,并且创建netty网络传输对象,然后启动两个定时任务。

      • namesrv每隔10s中扫描一次broker,根据最后一次的心跳检测时间剔除已经挂掉的broker;
      • namesrv每隔10分钟打印一次kv配置信息
    • 4,注册JVM 钩子函数并启动服务器,以便监听Broker、消息生产者的网络请求。

3. 两主两从的部署结构图,下面的解析都是按照这个图进行解析的

QQ_20191024173006

4. 解析路由元数据的数据结构

  • 下面主要是namesrv主要记录了哪些元数据,其主要是在RouteInfoManager中的5个hashMap中

    • HashMap> topicQueueTable : topic 消息队列路由信息
{
    "topicTest": [
        {
            "brokerName": "brokerName-a",
            "readQueueNums": 4,
            "writeQueueNums": 4,
            "perm": 6,
            "topicSynFlag": 0
        }
    ]
}
  • HashMap brokerAddrTable:broker的集群信息,包含集群,name,地址
{
    "broker-a": {
        "cluster": "c1",
        "brokerName": "broker-a",
        "brokerAddrs": [
            "0:192.168.8.101:10000",
            "1:192.168.8.102:10000"
        ]
    },

    "broker-b": {
        "cluster": "c1",
        "brokerName": "broker-b",
        "brokerAddrs": [
            "0:192.168.8.103:10000",
            "1:192.168.8.104:10000"
        ]
    }
}
  • HashMap> clusterAddrTable : Broker 集群信息
{
    "c1": [
        "broker-a",
        "broker-b"
    ]
}
  • HashMap brokerLiveTable : broker状态信息
{
    "192.168.8.101:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": "192.168.8.102:10000"
    },

    "192.168.8.102:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": ""
    },
    "192.168.8.103:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": "192.168.8.104:10000"
    },

    "192.168.8.104:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": ""
    }
}
  • HashMap> filterServerTable : broker上的FilterServer列表。
{
    "192.168.8.101:10000": ["192.168.9.101:10000"],

    "192.168.8.102:10000": ["192.168.9.102:10000"]
}

4. 路由信息的注册

  • RocketMQ的路由注册主要是broker与nameSrv的心跳功能检测,当broker启动的时候会向集群中所有的namesrv发送心跳语句,每隔30s向namesrv发送心跳包,namesrv收到心跳包之后会更新brokerLiveTable缓存中的lastUpdateTimestamp时间,然后nameSrv的定时任务会每隔10s扫描brokerLiveTable,如果超过120s没有收到心跳包则将其剔除并且关闭socket连接。
  • 源码层次解析步骤

    • 1,brokerController的start方法中发送心跳包注册registerBrokerAll,
    • 2,调用BrokerOuterAPI中的registerBrokerAll中主要遍历所有的nameSrv列表,BrokerSrv依次向nameSrv列表发送心跳包
    • 3,调用BrokerOuterAPI的registerBroker发送request信息,RocketMQ是基于netty传输的,如果我们需要网络跟踪,rocketMQ会为每个消息生成一个requestCode,然后服务端会有对应的网络处理器(processs包中),只需整库搜索 questCode 即可找到相应的处理逻辑

      • 心跳包request的信息

        • brokerAddr:地址信息
        • brokerId:0代表master,大于0代表slave
        • brokerName:名称
        • clusterName:集群名称
        • haServerAddr:master地址,初次请求时为空,slave向nameSrv注册之后返回
        • requestBody:

          • filterServerList:消息过滤列表
          • topicConfigWrapper:主体配置
        • body:消息体
    • nameSrv接收到网络请求之后开始进行处理,DefaultRequestProcessor默认的处理器,如果请求类型为 RequestCode REGISTER_BROKER,那么最终会被转发到RoutelnfoManager#registerBroker。

      • 1,首先在方法开始加入一个写锁,防止并发的修改RouteInfoManager中的路由表,然后开始维护clusterAddrTable信息,首先判断broker所属的集群是不是已经存在,如果不存在则创建并且将broker名称放入到集合中。
      • 2,维护BrokerData,首先从brokerAddrTable中根据brokerName获取broker信息,如果获取不到则将新建的brokerData信息放入并且将registerFirst设置为true,表示第一次注册,如果已经存在则将其替换并且将registerFirst设置为false,
      • 3,如果broker为master,并且Broker Topic配置信息发生变化或者是初次注册,那么则需要创建或者更新topicQueueTable的信息,如果在发送消息的时候topic是不存在的,那么如果设置了brokerConfig中的autoCreateTopicEnable为true,就会返回一个rocketMQ的默认的路由信息。
      • 4,更新brokerLiveInfo的最后更新时间,这个是路由删除的最重要的列表
      • 5,注册Broker上的filter过滤列表
      • 6,完成路由的注册

5. 路由信息的删除

  • 路由删除的两种方式

    • broker主动方式:当broker正常关机的时候,会发送一个unregisterBroker指令
    • nameSrv主动方式:nameSrv在启动的时候,会开启两个定时任务,其中有一个是每隔10s扫描一次brokerLiveInfo列表,然后获取列表中每一个元素的lastupdateTime,如果超过120s没有收到broker的心跳包则会将其剔除列表关闭channel,

      • 判断时间超过120s
      • 申请写锁
      • 根据brokerAddress删除brokerLiveTable和FilterServerTable
      • 维护brokerAddrTable,遍历brokerAddrTable获取BrokeData,然后从brokeData中的属性brokerAddr(map集合)中剔除,如果剔除之后集合为空则从brokerAddrTable一处brokerName
      • 根据 BrokerName,从 clusterAddrTable 中找到 Broker并从集群中移除,如果移
        除后,集群中不包含任何 Broker,则将改集群从clusterAddrTable 中一处
      • 根据 brokerName遍历所有主题的队列,如果队列中包含了当前 Broker的队列则移除,如果 topic 只包含待移除 Broker 的队列的话,从路由表中删除该 topic
      • 释放写锁
        然后申请一个写锁(防止并发更新)删除brokerLiveTable,FilterServerTable信息,然后维护brokerAddrTable,clusterAddrTable。

6. 路由信息的发现

  • RocketMQ路由并非是实时的,当topic路由发生变化的时候,不会主动推送给客户端,而是由客户端定时的拉取最新的路由信息。
  • 返回给客户端的对象是TopicRouteData,其结构是
orderTopicConf:顺序消息配置内容,来自于kvConfig
List<QueueData> queueDatas:topic队列元数据
List<BrokerData> brokerDatas:topic 分布的 broker 元数据
HashMap< String/ * brokerAdress*/,List<String> /* filt rServer* /> : broker 上过滤服务器地址列表
  • 路由发现实现类DefaultRequestProcessor的getRoutelnfoByTopic

    • 调用 RouterlnfoManager 的方法,从路由 topicQueueTable brokerAddrTable
  1. terServerTable 中分别填充 TopicRouteData 中的 List

filterServer 地址表

  • 如果找到主题对应的路由信息并且该主题为顺序消息,则从 NameServer
    KVconfig 中获取关于顺序消息相 的配置填充路由信息
  • 如果找不到路由信息 CODE 则使用 TOPIC NOT_EXISTS ,表示没有找到对应的路由

7.总结

QQ_20191028170531

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 缓存
RocketMQ最新版源码剖析注释笔记 关注我的技术博客:
RocketMQ最新版源码剖析注释笔记 关注我的技术博客:
208 0
|
消息中间件 Kafka RocketMQ
消息中间件(RocketMQ)笔记
分布式消息中间件,主要是实现分布式系统中解耦、异步消息、流量销锋、日志处理等场景。生产中用的最多的消息队列有Activemq,rabbitmq,kafka,rocketmq等。 以 Jms 规范和 rocketmq 为主来分享。版本基于 3.2.6 。 主要分享:JMS规范、Rocketmq的介绍、部署方式、特性的一些使用。
消息中间件(RocketMQ)笔记
|
消息中间件 存储 RocketMQ
RocketMQ-笔记,简介整体架构
rocketMQ技术内幕笔记
1354 0
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
610 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 Apache RocketMQ
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
万亿级数据洪峰下的消息引擎——Apache RocketMQ
312 0
《万亿级数据洪峰下的消息引擎——Apache RocketMQ》电子版地址
|
消息中间件 存储 缓存
RocketMQ Schema——让消息成为流动的结构化数据
RocketMQ Schema 提供了对消息的数据结构托管服务,同时为原生客户端提供了较为丰富的序列化/反序列化 SDK ,补齐了 RocketMQ 在数据治理和业务上下游解耦方面的短板,让数据成为流动的结构化数据,那么快来了解下实现原理吧~
470 0
RocketMQ Schema——让消息成为流动的结构化数据
|
消息中间件 存储 缓存
简述RocketMQ消息拉取过程【二】
简述RocketMQ消息拉取过程【二】
915 1
|
消息中间件 RocketMQ
简述RocketMQ消息拉取过程【一】
简述RocketMQ消息拉取过程【一】
639 0
|
消息中间件 缓存 负载均衡
RocketMQ消息生产者是如何选择Broker的
RocketMQ消息生产者是如何选择Broker的
474 1
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
489 1

热门文章

最新文章