一文读懂RocketMQ的高可用机制——集群管理高可用

简介: 一文读懂RocketMQ的高可用机制——集群管理高可用

一、前言

在前三篇我们介绍了

一文读懂RocketMQ的高可用机制——消息存储高可用

一文读懂RocketMQ的高可用机制——消息发送高可用

一文读懂RocketMQ的高可用机制——消息消费高可用


这一篇我们来说一下集群管理是如何保证高可用的。

集群管理的高可用主要体现在 NameServer 的设计上,NameServer 承担着路由注册中心的作用。当部分 NameServer 节点宕机时不会有什么糟糕的影响,只剩一个 NameServer 节点 RocketMQ 集群也能正常运行,即使 NameServer 全部宕机,也不影响已经运行的 Broker、Producer 和 Consumer。

在说 NameServer 之前,我们是否有以下几点思考。既然作为路由注册中心,那有哪些路由信息注册到了 NameServer?生产者如何知道消息要发送到哪台消息服务器呢?当 Broker 不可用后,NameServer 并不会立即将变更后的注册信息推送至 Client(Producer/Consumer),此时 RocketMQ 如何保证 Client 正常发送/消费消息?

带着这几个疑问来开启我们的 RocketMQ 集群管理高可用之旅。

二、架构设计

1、NameServer 互相独立,彼此没有通信关系,单台 NameServer 挂掉,不影响其他 NameServer。

2、NameServer 不去连接别的机器,不主动推消息。

3、单个 Broker(Master、Slave) 与所有 NameServer 进行定时注册,以便告知 NameServer 自己还活着。

  • Broker 每隔 30 秒向所有 NameServer 发送心跳,心跳包含了自身的 topic 配置信息。
  • NameServer 每隔 10 秒,扫描所有还存活的 broker 连接,如果某个连接的最后更新时间与当前时间差值超过 2 分钟,则断开此连接,NameServer 也会断开此 broker 下所有与 slave 的连接。同时更新 topic 与队列的对应关系,但不通知生产者和消费者。
  • Broker slave 同步或者异步从 Broker master 上拷贝数据。


4、Consumer 随机与一个 NameServer 建立长连接,如果该 NameServer 断开,则从 NameServer 列表中查找下一个进行连接。

  • Consumer 主要从 NameServer 中根据 Topic 查询 Broker 的地址,查到就会缓存到客户端,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。
  • 如果 Broker 宕机,则 NameServer 会将其剔除,而 Consumer 端的定时任务 MQClientInstance.this.updateTopicRouteInfoFromNameServer 每 30 秒执行一次,将 Topic 对应的 Broker 地址拉取下来,此地址只有 Slave 地址了,此时 Consumer 从 Slave 上消费。
  • 消费者与 Master 和 Slave 都建有连接,在不同场景有不同的消费规则。


5、Producer 随机与一个 NameServer 建立长连接,每隔 30 秒(此处时间可配置)从 NameServer 获取 Topic 的最新队列情况,如果某个 Broker Master 宕机,Producer 最多 30 秒才能感知,在这个期间,发往该 broker master 的消息失败。Producer 向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。

  • 生产者与所有的 master 连接,但不能向 slave 写入。
  • 客户端是先从 NameServer 寻址的,得到可用 Broker 的 IP 和端口信息,然后据此信息连接 broker。

综上所述,NameServer 在 RocketMQ 中的作用:

  • NameServer 用来保存活跃的 broker 列表,包括 Master 和 Slave 。
  • NameServer 用来保存所有 topic 和该 topic 所有队列的列表。
  • NameServer 用来保存所有 broker 的 Filter 列表。
  • 命名服务器为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。

三、启动流程

NameServer 启动流程重点关注两部分:路由信息维护和网络通信(包括心跳),涉及到的核心类如下图所示。NameServerStartup 是 NameServer 的启动类,负责解析配置文件、加载运行时参数信息和初始化并启动 NameServerController。NameServerController 是 NameServer 的核心控制器,其通过 RouteInfoManager 管理路由信息,通过 RemotingServer 与 RocketMQ 其他组件(Broker、Producer 和 Consumer)通信。

NameServer 的配置参数包括 NamesrvConfig 和 NettyServerConfig:

NamesrvConfig 为 NameServer 业务参数,如 RocketMQ 主目录路径、KV 配置属性持久化路径、是否支持顺序消息等;

NettyServerConfig 为 NameServer 网络参数,如监听端口、IO 线程池线程个数、业务线程池线程个数、是否开启 epoll 等。

在分析源码之前我们先来看张时序图:

启动类:

org.apache.rocketmq.namesrv.NamesrvStartup

1、解析配置文件,填充 NameServerConfig、NettyServerConfig 属性值,并创建 NamesrvController。

代码:

NamesrvStartup#createNamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }
    // 创建NamesrvConfig
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // 创建NettyServerConfig
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    // 设置启动端口号
    nettyServerConfig.setListenPort(9876);
    // 解析启动-c参数
    if (commandLine.hasOption('c')) {
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
            namesrvConfig.setConfigStorePath(file);
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }
    // 解析启动-p参数
    if (commandLine.hasOption('p')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }
    // 将启动参数填充到namesrvConfig,nettyServerConfig
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
    // 创建NameServerController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
    return controller;
}

2、NamesrvConfig 属性

  • rocketmqHome:rocketmq 主目录
  • kvConfig:NameServer 存储 KV 配置属性的持久化路径
  • configStorePath:nameServer 默认配置文件路径
  • orderMessageEnable:是否支持顺序消息


3、NettyServerConfig 属性

  • listenPort:NameServer 监听端口,该值默认会被初始化为 9876;
  • serverWorkerThreads:Netty 业务线程池线程个数
  • serverCallbackExecutorThreads:Netty public 任务线程池线程个数, Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池,则由 public 线程池执行
  • serverSelectorThreads:IO 线程池个数,主要是 NameServer、Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网路请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方;
  • serverOnewaySemaphoreValue:send oneway 消息请求并发读(Broker端参数);
  • serverAsyncSemaphoreValue:异步消息发送最大并发度;
  • serverChannelMaxIdleTimeSeconds:网络连接最大的空闲时间,默认 120s
  • serverSocketSndBufSize:网络socket发送缓冲区大小
  • serverSocketRcvBufSize:网络接收端缓存区大小
  • serverPooledByteBufAllocatorEnable:ByteBuffer 是否开启缓存;
  • useEpollNativeSelector:是否启用 Epoll IO 模型

4、根据启动属性创建 NamesrvController 实例,并初始化该实例。NameServerController 实例为 NameServer 核心控制器。

代码:

NamesrvController#initialize

public boolean initialize() {
    // 加载KV配置
    this.kvConfigManager.load();
    // 创建NettyServer网络处理对象
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    // 开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    this.registerProcessor();
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
    // 开启定时任务:每隔10min打印一次KV配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        }
                    }
                    private void reloadServerSslContext() {
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }
    return true;
}

5、在 JVM 进程关闭之前,先将线程池关闭,及时释放资源。

代码:

NamesrvStartup#start

public static NamesrvController start(final NamesrvController controller) throws Exception {
    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }
    // 注册JVM钩子函数代码
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            // 释放资源
            controller.shutdown();
            return null;
        }
    }));
    controller.start();
    return controller;
}

四、路由管理

NameServer 作为路由注册中心,其核心作用是为 Client 提供消息发送/消费的路由信息。Master Broker 节点和所有 NameServer 建立长连接,每个 NameServer 节点拥有所有 Topic 对应 Queue 以及 Broker 的映射关系,包括路由注册、路由删除等,具体由 RouteInfoManager 负责管理。

1、路由元信息

代码:

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager

  • topicQueueTable
    维护了 Topic 和其对应消息队列的映射关系,QueueData 记录了一条队列的元信息:所在 Broker、读队列数量、写队列数量等。
  • brokerAddrTable
    维护了 Broker Name 和 Broker 元信息的映射关系,Broker 通常以 Master-Slave 架构部署,BrokerData 记录了同一个 Broker Name 下所有节点的地址信息。
  • clusterAddrTable
    维护了 Broker 的集群信息。
  • brokerLiveTable
    维护了 Broker 的存活信息。NameServer 在收到来自 Broker 的心跳消息后,更新 BrokerLiveInfo 中的 lastUpdateTimestamp,如果 NameServer 长时间未收到 Broker 的心跳信息,NameServer 就会将其移除。
  • filterServerTable
    Broker 上的 FilterServer 列表,用于类模式消息过滤。

还是有点抽象是不是,没关系,看下面这张图与上面的存储关系对比下你就很清晰了。

2、NameServer 请求处理

NettyRequestProcessor 定义了 RocketMQ 处理网络请求的接口,DefaultRequestProcessor 实现了该接口并负责处理 NameServer 收到的网络请求。NettyRequestProcessor 定义如下。

// org.apache.rocketmq.remoting.netty.NettyRequestProcessor
public interface NettyRequestProcessor {
    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws Exception;
    boolean rejectRequest();
}

RemotingCommand 包含请求编码(code),针对不同的请求编码执行对应的请求处理逻辑。请求编码定义在 RequestCode 常量类中,本节以 REGISTER_BROKER(Broker 注册 & 心跳)为例,结合 Broker 的启动流程阐述请求从被 Broker 发出到被 NameServer 处理的流程。

Broker 启动类 BrokerStartup 在初始化核心控制器 BrokerController 阶段注册定时任务,定时发送 HTTP 请求获取 NameServer 地址列表并存储于 remotingClient。此处考虑一个问题:在弃用 ZooKeeper 后,RocketMQ 不存在注册中心供 NameServer 注册,那么 NameServer 地址列表是如何维护的?在获取过时的地址列表后,RocketMQ 如何持续保证可用性?BrokerController 定时获取地址列表核心逻辑精简如下。

org.apache.rocketmq.broker.BrokerController#initialize

org.apache.rocketmq.broker.out.BrokerOuterAPI#fetchNameServerAddr

org.apache.rocketmq.common.namesrv.TopAddressing#fetchNSAddr(boolean, long)

Broker 在获取到 NameServer 地址列表后,针对每个地址开启一个线程,将自身信息同步(默认)注册至 NameServer。Broker 利用 CountDownLatch 等待所有线程注册工作完成后,继续执行后续的工作。下面我们就来看下 Broker 是如何路由注册到 NameServer 上的。

3、路由注册

3.1 发送心跳包

RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳信息,每隔 30s 向集群中所有 NameServer 发送心跳包,NameServer 收到心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdataTimeStamp 信息,然后 NameServer 每隔 10s 扫描 brokerLiveTable,如果连续 120s 没有收到心跳包,NameServer 将移除 Broker 的路由信息同时关闭 Socket 连接。

org.apache.rocketmq.broker.BrokerController#start

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean compressed) {
    final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
    // 获得nameServer地址信息
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    // 遍历所有nameserver列表
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
        // 封装请求头
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);
        // 封装请求体
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 分别向NameServer注册
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                        if (result != null) {
                            registerBrokerResultList.add(result);
                        }
                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }
    return registerBrokerResultList;
}

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker

3.2 处理心跳包

DefaultRequestProcessor 网路处理类解析请求 类型,如果请求类型是为 REGISTER_BROKER,则将请求转发到 RouteInfoManager#regiesterBroker。

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

维护路由信息

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            // 加锁
            this.lock.writeLock().lockInterruptibly();
            // 维护clusterAddrTable
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);
            boolean registerFirst = false;
            // 维护brokerAddrTable
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            // 第一次注册,则创建brokerData
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            // 非第一次注册,更新Broker
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Long, String> item = it.next();
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                    it.remove();
                }
            }
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);
            // 维护topicQueueTable
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }
            // 维护brokerLiveTable
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }
            // 维护filterServerList
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }
            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }
    return result;
}

4、路由删除

Broker 每隔 30s 向 NameServer 发送一个心跳包,心跳包包含 BrokerId、Broker 地址、Broker 名称, Broker 所属集群名称、 Broker 关联的 FilterServer 列表。但是如果 Broker 宕机,NameServer 无法收到心跳包,此时 NameServer 如何来剔除这些失效的 Broker 呢? NameServer 会每隔 10s 扫描 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker 失效,移除该 Broker ,关闭与 Broker 连接,同时更新 topicQueueTable、brokerAddrTable 、brokerLiveTable、filterServerTable 。

RocketMQ 有两个触发点来删除路由信息:

  • NameServer 定期扫描 brokerLiveTable 检测上次心跳包与当前系统的时间差,如果时间超过 120s,则需要移除 broker。
  • Broker 在正常关闭的情况下,会执行 unregisterBroker 指令。

这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该 broker 相关的信息。

org.apache.rocketmq.namesrv.NamesrvController#initialize

public void scanNotActiveBroker() {
  // 获得brokerLiveTable
  Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
  // 遍历brokerLiveTable
  while (it.hasNext()) {
      Entry<String, BrokerLiveInfo> next = it.next();
      long last = next.getValue().getLastUpdateTimestamp();
      // 如果收到心跳包的时间距当时时间是否超过120s
      if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
          // 关闭连接
          RemotingUtil.closeChannel(next.getValue().getChannel());
          // 移除broker
          it.remove();
          log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
          // 维护路由表
          this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
      }
  }
}
public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                this.lock.readLock().lockInterruptibly();
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
    }
    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
        try {
            try {
                // 申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除
                this.lock.writeLock().lockInterruptibly();
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);
                // 维护brokerAddrTable
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();
                // 遍历brokerAddrTable
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();
                    // 遍历broker地址
                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        // 根据broker地址移除brokerAddr
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                brokerId, brokerAddr);
                            break;
                        }
                    }
                    // 如果当前主题只包含待移除的broker,则移除该topic
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                            brokerData.getBrokerName());
                    }
                }
                // 维护clusterAddrTable
                if (brokerNameFound != null && removeBrokerName) {
                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    // 遍历clusterAddrTable
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                        // 获得集群名称
                        String clusterName = entry.getKey();
                        // 获得集群中brokerName集合
                        Set<String> brokerNames = entry.getValue();
                        // 从brokerNames中移除brokerNameFound
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                brokerNameFound, clusterName);
                            if (brokerNames.isEmpty()) {
                                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                    clusterName);
                                // 如果集群中不包含任何broker,则移除该集群
                                it.remove();
                            }
                            break;
                        }
                    }
                }
                // 维护topicQueueTable队列
                if (removeBrokerName) {
                    // 遍历topicQueueTable
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                        this.topicQueueTable.entrySet().iterator();
                    while (itTopicQueueTable.hasNext()) {
                        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                        // 主题名称
                        String topic = entry.getKey();
                        // 队列集合
                        List<QueueData> queueDataList = entry.getValue();
                        // 遍历该主题队列
                        Iterator<QueueData> itQueueData = queueDataList.iterator();
                        while (itQueueData.hasNext()) {
                            // 从队列中移除为活跃broker信息
                            QueueData queueData = itQueueData.next();
                            if (queueData.getBrokerName().equals(brokerNameFound)) {
                                itQueueData.remove();
                                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                    topic, queueData);
                            }
                        }
                        // 如果该topic的队列为空,则移除该topic
                        if (queueDataList.isEmpty()) {
                            itTopicQueueTable.remove();
                            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                topic);
                        }
                    }
                }
            } finally {
                // 释放写锁
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}

5、路由发现

RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
    // 调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、 filterServerTable中
    // 分别填充TopicRouteData的List<QueueData>、List<BrokerData>、 filterServer
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
    // 如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取 关于顺序消息相关的配置填充路由信息
    if (topicRouteData != null) {
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }
        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

五、总结

最后我们再来看下我们前言提的那三个问题。既然作为路由注册中心,那有哪些路由信息注册到了 NameServer?生产者如何知道消息要发送到哪台消息服务器呢?当 Broker 不可用后,NameServer 并不会立即将变更后的注册信息推送至 Client(Producer/Consumer),此时 RocketMQ 如何保证 Client 正常发送/消费消息?

第一个问题在给定 Topic 的情况下,Client 根据负载均衡策略选择合适的消息队列,进一步获取到对应的 Broker 地址信息,具体有哪些路由信息注册到了 NameServer,可以看上面的路由元信息 RouteInfoManager 类。

第二个问题可以归纳为由于路由注册、路由删除以及路由发现,使生产者如何知道消息要发送到哪台消息服务器。

第三个问题对于 Broker 无效的场景,RocketMQ 牺牲了 C,选择了 AP,即通过 Client 重试保证可用性,由此产生的重复消息问题,通过 Client 幂等处理逻辑来规避。

最后老周再上一张图,涵盖了上述整个核心流程。到此为止,RocketMQ 的高可用机制——消息存储高可用、消息发送高可用、消息消费高可用以及集群管理高可用都分析完毕,希望对你有帮助。



欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。


喜欢的话,点赞、再看、分享三连。


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 监控
|
7月前
|
消息中间件 存储 Kafka
如何保证MQ消息队列的高可用?
如何保证MQ消息队列的高可用?
213 0
|
7月前
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
181 1
|
1月前
|
消息中间件 存储 运维
|
29天前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
26 0
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
5月前
|
消息中间件 Java Maven
消息中间件系列教程(12) -RabbitMQ-消息确认机制
消息中间件系列教程(12) -RabbitMQ-消息确认机制
44 0
|
2月前
|
消息中间件 运维 应用服务中间件
容器化运维:构建高可用RabbitMQ集群的Docker Compose指南
容器化运维:构建高可用RabbitMQ集群的Docker Compose指南
182 0
|
3月前
|
消息中间件 Java
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
32 0
|
5月前
|
消息中间件 Java Spring
搭建高可用rabbitmq集群及spring boot实现集群配置
搭建高可用rabbitmq集群及spring boot实现集群配置

热门文章

最新文章