【消息中间件】面试官:说一说NameServer的路由注册和剔除吧?

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 接下来主要从五个方面学习NameServer,分别是架构设计、启动流程、路由注册、故障删除、路由发现。

前言

大家好,我是小郭,在前面一篇的文章,我们已经完成了RocketMQ在docker环境下的搭建,接下来主要从五个方面学习NameServer,分别是架构设计、启动流程、路由注册、故障删除、路由发现。

NameServer是什么?

网络异常,图片无法展示
|

  • 从RocketMQ的架构图中,多个NameServer节点组成集群提高RocketMQ的可用性
  • NameServer节点是无状态的,节点间不通信,所以需要Broker轮询去进行注册
    主要做两件事:
  1. 对Topic进行保存和管理,为消息客户端根据Topic提供路由信息服务
  2. 对Broker进行保存和管理,保存Broker注册信息,同时保持长连接
  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

NameServer的架构设计

网络异常,图片无法展示
|

从源码入手,看启动流程

思考问题:

  1. 如果是你设计启动流程,怎么设计最简单的启动方式?

入口:org.apache.rocketmq.namesrv.NamesrvStartup#main0

STEP1:解析配置文件 ,填充NamesrvConfig、NettyServerConfig,返回NamesrvController实例

我们在启动NameServer的时候通常会使用./mqbaneserver -c configFile -p,那-c和-p主要有什么作用呢?

  1. 通过-c 带上指定 文件,我们可以通过文件读取配置
// 通过-c指令指定配置文件地址
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                namesrvConfig.setConfigStorePath(file);
                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }
  1. 利用-p 可以输出当前加载的配置信息
if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }
复制代码
  1. NamesrvConfig主要配置信息
// 获取RocketMq主目录
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// 存储KV配置属性的持久化路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// 配置文件路径,启动时候使用 -c 选项
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
//是否支持顺序消息
private boolean orderMessageEnable = false;

STEP2:根据属性创建NamesrvController 实例,并初始化该实例

入口:NamesrvStartup#start

  1. 加载Kv配置
public void load() {
            String content = null;
            try {
                // 通过存储KV配置属性的持久化路径获取内容
                content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
            } catch (IOException e) {
                log.warn("Load KV config table exception", e);
            }
            if (content != null) {
            // 序列化转换
                KVConfigSerializeWrapper kvConfigSerializeWrapper =
                    KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
                if (null != kvConfigSerializeWrapper) {
                    this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
                    log.info("load KV config table OK");
                }
            }
        }
  1. 定时任务 每10s扫描一次Broker,移除失活Broker
// 定时任务 每10s扫描一次Broker,移除失活Broker
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);

超过120s,则从NameServer移除,直接移除是为了降低设计的复杂度

//超过120s失联,移除
    if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
        RemotingUtil.closeChannel(next.getValue().getChannel());
        it.remove();
        log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
        this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
    }
  1. 定时任务 每隔10分钟,打印一次Kv配置信息
// 每隔10分钟,打印一次Kv配置信息
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);

STEP3:NameServer停止流程,利用JVM钩子函数,监听Broker

钩子函数的作用也大多数用来收尾的,在JVM进程关闭之前,进行一些清理工作比如关闭资源或者同步等。

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));

最后,基于Netty实现的RPC服务端,NettyRemotingServer通过nettyServerConfig配置进行启动

通过上面对源码的分析,现在可以来回答思考的问题,我们只需要配置NamesrvConfig和nettyServerConfig,就能利用netty启动NameServer。

路由注册

上面提过NameServer对路由信息管理,主要是进行路由的注册、故障剔除和路由信息存储。

问题:

  1. 路由信息如何注册到NameServer?
  2. Broker出现故障是如何发现和解决的?

STEP 1: 利用线程池,遍历NameServer依次发送心跳包

入口:BrokerController#start#registerBrokerAll#doRegisterBrokerAll

  1. 开启一个线程池定时任务,每隔30s向集群中所有NameServer发送心跳包
    只有当调度任务来的时候,线程池才会去启动一个线程
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
  1. 遍历所有NameServer列表,封装请求头 RegisterBrokerRequestHeader进行发送

获取全部的Address地址

List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

封装请求头

final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
                requestHeader.setBrokerAddr(brokerAddr);
                requestHeader.setBrokerId(brokerId);
                requestHeader.setBrokerName(brokerName);
                requestHeader.setClusterName(clusterName);
                requestHeader.setHaServerAddr(haServerAddr);
                requestHeader.setCompressed(compressed);

遍历所有NameServer

for (final String namesrvAddr : nameServerAddressList) {
                    brokerOuterExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                                if (result != null) {
                                    registerBrokerResultList.add(result);
                                }
                                log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                            } catch (Exception e) {
                                log.warn("registerBroker Exception, {}", namesrvAddr, e);
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    });
                }
                try {
                    countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                }

STEP2:心跳包处理

入口:RouteInfoManager#registerBroker

  1. 判断 Broker所属集群是否存在,无则创建,然后将brokerName加入到集群Broker集群中
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                        if (null == brokerNames) {
                            brokerNames = new HashSet<String>();
                            this.clusterAddrTable.put(clusterName, brokerNames);
                        }
                        brokerNames.add(brokerName);
  1. 维护BrokerData信息,如果没有则初始化BrokerData信息,主要做维护集群名称,broker名称
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                        if (null == brokerData) {
                            registerFirst = true;
                            brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                            this.brokerAddrTable.put(brokerName, brokerData);
                        }
  1. 如果 brokerId为Master,并且Topic配置信息发生变化或者第一次注册,需要更新元数据
// brokerId为Master
if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
   // Topic配置信息发生变化或者第一次注册
   if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {
                ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
   if (tcTable != null) {
       // 更新元数据
       for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                        this.createAndUpdateQueueData(brokerName, entry.getValue());
            }
        }
    }
}        
  1. 更新BrokerLiveInfo
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                            new BrokerLiveInfo(
                                System.currentTimeMillis(),
                                topicConfigWrapper.getDataVersion(),
                                channel,
                                haServerAddr));
                        if (null == prevBrokerLiveInfo) {
                            log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
  1. 注册Broker的过滤器Server的地址列表
if (filterServerList != null) {
   if (filterServerList.isEmpty()) {
                                this.filterServerTable.remove(brokerAddr);
   } else {
       this.filterServerTable.put(brokerAddr, filterServerList);
   }
}
  1. 如果不是master,则查询出master信息更新到对应的masterAddr
if (MixAll.MASTER_ID != brokerId) {
   // 从BrokerAddrs中查询master信息
   String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
   if (masterAddr != null) {
      BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
      if (brokerLiveInfo != null) {
         result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
         result.setMasterAddr(masterAddr);
        }
    }
}

在注册 Broker中 ,为了防止并发修改路由信息,整体加上了一个写锁。


STEP3:同一时刻NameServer只处理一个Broker心跳包,多个心跳包串行执行

故障剔除

NameServer每隔10s扫描一下Broker,当BrokerLive的最后更新时间差距超过120s,则认为失效

遍历Broker相关的集合,移除失效的Broker信息

入口:RouteInfoManager#scanNotActiveBroker

移除失活Broker

// 定时任务 每10s扫描一次Broker,移除失活Broker
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                    }
                }, 5, 10, TimeUnit.SECONDS);        
//扫描失活Broker
public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        //超过120s失联,移除
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

移除brokerAddr

Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
    Entry<Long, String> entry = it.next();
    Long brokerId = entry.getKey();
    String brokerAddr = entry.getValue();
    if (brokerAddr.equals(brokerAddrFound)) {
        brokerNameFound = brokerData.getBrokerName();
        it.remove();
        log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
            brokerId, brokerAddr);
        break;
    }
}

移除BrokerName

Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
    Entry<String, Set<String>> entry = it.next();
    String clusterName = entry.getKey();
    Set<String> brokerNames = entry.getValue();
    boolean removed = brokerNames.remove(brokerNameFound);
    if (removed) {
        log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
            brokerNameFound, clusterName);
        if (brokerNames.isEmpty()) {
            log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                clusterName);
            it.remove();
        }
        break;
    }
}

移除Topic

Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
    this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
    Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
    String topic = entry.getKey();
    List<QueueData> queueDataList = entry.getValue();
    Iterator<QueueData> itQueueData = queueDataList.iterator();
    while (itQueueData.hasNext()) {
        QueueData queueData = itQueueData.next();
        if (queueData.getBrokerName().equals(brokerNameFound)) {
            itQueueData.remove();
            log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                topic, queueData);
        }
    }
    if (queueDataList.isEmpty()) {
        itTopicQueueTable.remove();
        log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
            topic);
    }
}

路由发现

路由发现并不是实时的,当Topic发生变动时,NameServer不主动推送给客户端。

入口:DefaultRequestProcessor#getRouteInfoByTopic

  1. 从路由表topicQueueTable、brokerAddrTable、filterServerTable获取填充TopicRouteData的数据
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
  1. 判断是否开启顺序消息,有则则从kv配置中获取关于顺序消息相关的配置填充路由信息
//是否为顺序消息
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
// 则从kv配置中获取关于顺序消息相关的配置填充路由信息
    String orderTopicConf  = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
 }
  1. TopicRouteData为空则返回未找到对应路由,有则返回

总结

通过对NameServer源码的阅读,进行一些总结

  1. NameServer通过构建NamesrvConfig和nettyServerConfig配置,调用NettyRemotingServer进行启动。
  2. Broker向NameServer注册,NameServer对Broker信息进行维护,然后每隔30s向NameServer发送心跳包。
  3. NameServer每隔10s扫描一次Broker,检测活跃Broker,当LastUpdateTime失联超过120s,则判定为不可用,并进行移除broker相关信息。
  4. 在一些关键的地方,进行配置信息、错误信息的日志输出这一点是我们可以学习的。
  5. 学习JVM钩子函数Hook的实战使用,在关闭资源或者其他的清理工作上,可以成为一种选择。

网络异常,图片无法展示
|

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
前端开发 JavaScript 安全
【面试题】路由的两种模式:hash模式和 history模式
【面试题】路由的两种模式:hash模式和 history模式
|
7月前
|
消息中间件 Java Kafka
计算机应届生一定要会的JAVA面试题:RabbitMQ是如何实现消息路由的?
一个应届生去面试,可能没有什么实战经验,今天被问到一个这样的面试题,说“RabbitMQ是如何实现消息路由的?“一下子竟然不知道如何组织语言了。今天我给大家分享一下我的理解。
71 1
|
3月前
|
消息中间件
【面试问题】MQ 消息怎么路由?
【1月更文挑战第27天】【面试问题】MQ 消息怎么路由?
|
3月前
|
移动开发 前端开发 JavaScript
【面试题】 面试官为啥总是喜欢问前端路由实现方式?
【面试题】 面试官为啥总是喜欢问前端路由实现方式?
|
3月前
|
移动开发 前端开发 JavaScript
面试官为啥总是喜欢问前端路由实现方式?
面试官为啥总是喜欢问前端路由实现方式?
|
4月前
|
存储 NoSQL 中间件
[中间件~大厂面试题] 腾讯三面,40亿的QQ号如何去重
[中间件~大厂面试题] 腾讯三面,40亿的QQ号如何去重
|
8月前
|
监控 JavaScript 前端开发
前端学习笔记202307学习笔记第五十七天-模拟面试笔记react-redux中间件机制是什么
前端学习笔记202307学习笔记第五十七天-模拟面试笔记react-redux中间件机制是什么
55 0
|
9月前
|
存储 JavaScript 前端开发
前端面试100道手写题(5)—— Router路由
前端路由,大家都使用过,那么有没有想过它是怎么实现的吗?如:Vue-Router 或者 React-Router。或许有个大概印象,但是真正要自己去实现还是没有什么思路,那么这篇文章将完整的实现思路去实现一次。
90 0
|
10月前
|
移动开发 JavaScript 算法
web前端面试高频考点——Vue原理(diff算法、模板编译、组件渲染和更新、JS实现路由)
web前端面试高频考点——Vue原理(diff算法、模板编译、组件渲染和更新、JS实现路由)
163 0
|
JavaScript 前端开发
前端面试题 Vue路由跳转的四种方式
前端面试题 Vue路由跳转的四种方式
183 0
前端面试题 Vue路由跳转的四种方式