一、前言
在前三篇我们介绍了
这一篇我们来说一下集群管理是如何保证高可用的。
集群管理的高可用主要体现在 NameServer 的设计上,NameServer 承担着路由注册中心的作用。当部分 NameServer 节点宕机时不会有什么糟糕的影响,只剩一个 NameServer 节点 RocketMQ 集群也能正常运行,即使 NameServer 全部宕机,也不影响已经运行的 Broker、Producer 和 Consumer。
在说 NameServer 之前,我们是否有以下几点思考。既然作为路由注册中心,那有哪些路由信息注册到了 NameServer?生产者如何知道消息要发送到哪台消息服务器呢?当 Broker 不可用后,NameServer 并不会立即将变更后的注册信息推送至 Client(Producer/Consumer),此时 RocketMQ 如何保证 Client 正常发送/消费消息?
带着这几个疑问来开启我们的 RocketMQ 集群管理高可用之旅。
二、架构设计
1、NameServer 互相独立,彼此没有通信关系,单台 NameServer 挂掉,不影响其他 NameServer。
2、NameServer 不去连接别的机器,不主动推消息。
3、单个 Broker(Master、Slave) 与所有 NameServer 进行定时注册,以便告知 NameServer 自己还活着。
- Broker 每隔 30 秒向所有 NameServer 发送心跳,心跳包含了自身的 topic 配置信息。
- NameServer 每隔 10 秒,扫描所有还存活的 broker 连接,如果某个连接的最后更新时间与当前时间差值超过 2 分钟,则断开此连接,NameServer 也会断开此 broker 下所有与 slave 的连接。同时更新 topic 与队列的对应关系,但不通知生产者和消费者。
- Broker slave 同步或者异步从 Broker master 上拷贝数据。
4、Consumer 随机与一个 NameServer 建立长连接,如果该 NameServer 断开,则从 NameServer 列表中查找下一个进行连接。
- Consumer 主要从 NameServer 中根据 Topic 查询 Broker 的地址,查到就会缓存到客户端,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。
- 如果 Broker 宕机,则 NameServer 会将其剔除,而 Consumer 端的定时任务 MQClientInstance.this.updateTopicRouteInfoFromNameServer 每 30 秒执行一次,将 Topic 对应的 Broker 地址拉取下来,此地址只有 Slave 地址了,此时 Consumer 从 Slave 上消费。
- 消费者与 Master 和 Slave 都建有连接,在不同场景有不同的消费规则。
5、Producer 随机与一个 NameServer 建立长连接,每隔 30 秒(此处时间可配置)从 NameServer 获取 Topic 的最新队列情况,如果某个 Broker Master 宕机,Producer 最多 30 秒才能感知,在这个期间,发往该 broker master 的消息失败。Producer 向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。
- 生产者与所有的 master 连接,但不能向 slave 写入。
- 客户端是先从 NameServer 寻址的,得到可用 Broker 的 IP 和端口信息,然后据此信息连接 broker。
综上所述,NameServer 在 RocketMQ 中的作用:
- NameServer 用来保存活跃的 broker 列表,包括 Master 和 Slave 。
- NameServer 用来保存所有 topic 和该 topic 所有队列的列表。
- NameServer 用来保存所有 broker 的 Filter 列表。
- 命名服务器为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。
三、启动流程
NameServer 启动流程重点关注两部分:路由信息维护和网络通信(包括心跳),涉及到的核心类如下图所示。NameServerStartup 是 NameServer 的启动类,负责解析配置文件、加载运行时参数信息和初始化并启动 NameServerController。NameServerController 是 NameServer 的核心控制器,其通过 RouteInfoManager 管理路由信息,通过 RemotingServer 与 RocketMQ 其他组件(Broker、Producer 和 Consumer)通信。
NameServer 的配置参数包括 NamesrvConfig 和 NettyServerConfig:
NamesrvConfig 为 NameServer 业务参数,如 RocketMQ 主目录路径、KV 配置属性持久化路径、是否支持顺序消息等;
NettyServerConfig 为 NameServer 网络参数,如监听端口、IO 线程池线程个数、业务线程池线程个数、是否开启 epoll 等。
在分析源码之前我们先来看张时序图:
启动类:
org.apache.rocketmq.namesrv.NamesrvStartup
1、解析配置文件,填充 NameServerConfig、NettyServerConfig 属性值,并创建 NamesrvController。
代码:
NamesrvStartup#createNamesrvController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } // 创建NamesrvConfig final NamesrvConfig namesrvConfig = new NamesrvConfig(); // 创建NettyServerConfig final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 设置启动端口号 nettyServerConfig.setListenPort(9876); // 解析启动-c参数 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } // 解析启动-p参数 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } // 将启动参数填充到namesrvConfig,nettyServerConfig MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); // 创建NameServerController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); return controller; }
2、NamesrvConfig 属性
- rocketmqHome:rocketmq 主目录
- kvConfig:NameServer 存储 KV 配置属性的持久化路径
- configStorePath:nameServer 默认配置文件路径
- orderMessageEnable:是否支持顺序消息
3、NettyServerConfig 属性
- listenPort:NameServer 监听端口,该值默认会被初始化为 9876;
- serverWorkerThreads:Netty 业务线程池线程个数;
- serverCallbackExecutorThreads:Netty public 任务线程池线程个数, Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池,则由 public 线程池执行;
- serverSelectorThreads:IO 线程池个数,主要是 NameServer、Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网路请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方;
- serverOnewaySemaphoreValue:send oneway 消息请求并发读(Broker端参数);
- serverAsyncSemaphoreValue:异步消息发送最大并发度;
- serverChannelMaxIdleTimeSeconds:网络连接最大的空闲时间,默认 120s;
- serverSocketSndBufSize:网络socket发送缓冲区大小;
- serverSocketRcvBufSize:网络接收端缓存区大小;
- serverPooledByteBufAllocatorEnable:ByteBuffer 是否开启缓存;
- useEpollNativeSelector:是否启用 Epoll IO 模型;
4、根据启动属性创建 NamesrvController 实例,并初始化该实例。NameServerController 实例为 NameServer 核心控制器。
代码:
NamesrvController#initialize
public boolean initialize() { // 加载KV配置 this.kvConfigManager.load(); // 创建NettyServer网络处理对象 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // 开启定时任务:每隔10min打印一次KV配置 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true; }
5、在 JVM 进程关闭之前,先将线程池关闭,及时释放资源。
代码:
NamesrvStartup#start
public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 注册JVM钩子函数代码 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { // 释放资源 controller.shutdown(); return null; } })); controller.start(); return controller; }
四、路由管理
NameServer 作为路由注册中心,其核心作用是为 Client 提供消息发送/消费的路由信息。Master Broker 节点和所有 NameServer 建立长连接,每个 NameServer 节点拥有所有 Topic 对应 Queue 以及 Broker 的映射关系,包括路由注册、路由删除等,具体由 RouteInfoManager 负责管理。
1、路由元信息
代码:
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager
- topicQueueTable
维护了 Topic 和其对应消息队列的映射关系,QueueData 记录了一条队列的元信息:所在 Broker、读队列数量、写队列数量等。 - brokerAddrTable
维护了 Broker Name 和 Broker 元信息的映射关系,Broker 通常以 Master-Slave 架构部署,BrokerData 记录了同一个 Broker Name 下所有节点的地址信息。 - clusterAddrTable
维护了 Broker 的集群信息。 - brokerLiveTable
维护了 Broker 的存活信息。NameServer 在收到来自 Broker 的心跳消息后,更新 BrokerLiveInfo 中的 lastUpdateTimestamp,如果 NameServer 长时间未收到 Broker 的心跳信息,NameServer 就会将其移除。 - filterServerTable
Broker 上的 FilterServer 列表,用于类模式消息过滤。
还是有点抽象是不是,没关系,看下面这张图与上面的存储关系对比下你就很清晰了。
2、NameServer 请求处理
NettyRequestProcessor 定义了 RocketMQ 处理网络请求的接口,DefaultRequestProcessor 实现了该接口并负责处理 NameServer 收到的网络请求。NettyRequestProcessor 定义如下。
// org.apache.rocketmq.remoting.netty.NettyRequestProcessor public interface NettyRequestProcessor { RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception; boolean rejectRequest(); }
RemotingCommand 包含请求编码(code),针对不同的请求编码执行对应的请求处理逻辑。请求编码定义在 RequestCode 常量类中,本节以 REGISTER_BROKER(Broker 注册 & 心跳)为例,结合 Broker 的启动流程阐述请求从被 Broker 发出到被 NameServer 处理的流程。
Broker 启动类 BrokerStartup 在初始化核心控制器 BrokerController 阶段注册定时任务,定时发送 HTTP 请求获取 NameServer 地址列表并存储于 remotingClient。此处考虑一个问题:在弃用 ZooKeeper 后,RocketMQ 不存在注册中心供 NameServer 注册,那么 NameServer 地址列表是如何维护的?在获取过时的地址列表后,RocketMQ 如何持续保证可用性?BrokerController 定时获取地址列表核心逻辑精简如下。
org.apache.rocketmq.broker.BrokerController#initialize
org.apache.rocketmq.broker.out.BrokerOuterAPI#fetchNameServerAddr
org.apache.rocketmq.common.namesrv.TopAddressing#fetchNSAddr(boolean, long)
Broker 在获取到 NameServer 地址列表后,针对每个地址开启一个线程,将自身信息同步(默认)注册至 NameServer。Broker 利用 CountDownLatch 等待所有线程注册工作完成后,继续执行后续的工作。下面我们就来看下 Broker 是如何路由注册到 NameServer 上的。
3、路由注册
3.1 发送心跳包
RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳信息,每隔 30s 向集群中所有 NameServer 发送心跳包,NameServer 收到心跳包时会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdataTimeStamp 信息,然后 NameServer 每隔 10s 扫描 brokerLiveTable,如果连续 120s 没有收到心跳包,NameServer 将移除 Broker 的路由信息同时关闭 Socket 连接。
org.apache.rocketmq.broker.BrokerController#start
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); // 获得nameServer地址信息 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); // 遍历所有nameserver列表 if (nameServerAddressList != null && nameServerAddressList.size() > 0) { // 封装请求头 final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); // 封装请求体 RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { // 分别向NameServer注册 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker
3.2 处理心跳包
DefaultRequestProcessor 网路处理类解析请求 类型,如果请求类型是为 REGISTER_BROKER,则将请求转发到 RouteInfoManager#regiesterBroker。
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
维护路由信息
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { // 加锁 this.lock.writeLock().lockInterruptibly(); // 维护clusterAddrTable Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; // 维护brokerAddrTable BrokerData brokerData = this.brokerAddrTable.get(brokerName); // 第一次注册,则创建brokerData if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } // 非第一次注册,更新Broker Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // 维护topicQueueTable if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 维护brokerLiveTable BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } // 维护filterServerList if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
4、路由删除
Broker 每隔 30s 向 NameServer 发送一个心跳包,心跳包包含 BrokerId、Broker 地址、Broker 名称, Broker 所属集群名称、 Broker 关联的 FilterServer 列表。但是如果 Broker 宕机,NameServer 无法收到心跳包,此时 NameServer 如何来剔除这些失效的 Broker 呢? NameServer 会每隔 10s 扫描 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker 失效,移除该 Broker ,关闭与 Broker 连接,同时更新 topicQueueTable、brokerAddrTable 、brokerLiveTable、filterServerTable 。
RocketMQ 有两个触发点来删除路由信息:
- NameServer 定期扫描 brokerLiveTable 检测上次心跳包与当前系统的时间差,如果时间超过 120s,则需要移除 broker。
- Broker 在正常关闭的情况下,会执行 unregisterBroker 指令。
这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该 broker 相关的信息。
org.apache.rocketmq.namesrv.NamesrvController#initialize
public void scanNotActiveBroker() { // 获得brokerLiveTable Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); // 遍历brokerLiveTable while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); // 如果收到心跳包的时间距当时时间是否超过120s if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { // 关闭连接 RemotingUtil.closeChannel(next.getValue().getChannel()); // 移除broker it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); // 维护路由表 this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { try { this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); } if (brokerAddrFound != null && brokerAddrFound.length() > 0) { try { try { // 申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除 this.lock.writeLock().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); // 维护brokerAddrTable String brokerNameFound = null; boolean removeBrokerName = false; Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); // 遍历brokerAddrTable while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); // 遍历broker地址 Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); // 根据broker地址移除brokerAddr if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } // 如果当前主题只包含待移除的broker,则移除该topic if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } } // 维护clusterAddrTable if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); // 遍历clusterAddrTable while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); // 获得集群名称 String clusterName = entry.getKey(); // 获得集群中brokerName集合 Set<String> brokerNames = entry.getValue(); // 从brokerNames中移除brokerNameFound boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); // 如果集群中不包含任何broker,则移除该集群 it.remove(); } break; } } } // 维护topicQueueTable队列 if (removeBrokerName) { // 遍历topicQueueTable Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); // 主题名称 String topic = entry.getKey(); // 队列集合 List<QueueData> queueDataList = entry.getValue(); // 遍历该主题队列 Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { // 从队列中移除为活跃broker信息 QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } } // 如果该topic的队列为空,则移除该topic if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } } } finally { // 释放写锁 this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } }
5、路由发现
RocketMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); // 调用RouteInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、 filterServerTable中 // 分别填充TopicRouteData的List<QueueData>、List<BrokerData>、 filterServer TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); // 如果找到主题对应你的路由信息并且该主题为顺序消息,则从NameServer KVConfig中获取 关于顺序消息相关的配置填充路由信息 if (topicRouteData != null) { if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; }
五、总结
最后我们再来看下我们前言提的那三个问题。既然作为路由注册中心,那有哪些路由信息注册到了 NameServer?生产者如何知道消息要发送到哪台消息服务器呢?当 Broker 不可用后,NameServer 并不会立即将变更后的注册信息推送至 Client(Producer/Consumer),此时 RocketMQ 如何保证 Client 正常发送/消费消息?
第一个问题在给定 Topic 的情况下,Client 根据负载均衡策略选择合适的消息队列,进一步获取到对应的 Broker 地址信息,具体有哪些路由信息注册到了 NameServer,可以看上面的路由元信息 RouteInfoManager 类。
第二个问题可以归纳为由于路由注册、路由删除以及路由发现,使生产者如何知道消息要发送到哪台消息服务器。
第三个问题对于 Broker 无效的场景,RocketMQ 牺牲了 C,选择了 AP,即通过 Client 重试保证可用性,由此产生的重复消息问题,通过 Client 幂等处理逻辑来规避。
最后老周再上一张图,涵盖了上述整个核心流程。到此为止,RocketMQ 的高可用机制——消息存储高可用、消息发送高可用、消息消费高可用以及集群管理高可用都分析完毕,希望对你有帮助。
欢迎大家关注我的公众号【老周聊架构】,Java后端主流技术栈的原理、源码分析、架构以及各种互联网高并发、高性能、高可用的解决方案。
喜欢的话,点赞、再看、分享三连。