前提
要会Netty吧,如果不会的话,感觉应该看不懂吧。
跟源码思路
其实很多源码的讲解都是把一个类都标上注释,其实我感觉这样的人很厉害,因为他确实对这个代码很精通。我的风格比较偷懒,我们想看哪一部分就跟哪一部分和哪个分支,其他的没必要看,这样你就能偷懒了,所以这篇文章想跟的是Broker注册到NameServer源码以及Broker与NameServer的心跳。你调试的时候不也是这样吗,哪报错了你就进哪个分支,不关心的分支我们不去分析。
注意:本文只关心Broker注册到NameServer和心跳逻辑,其他都不关心。
启动源码分析
源码版本
RocketMQ4.9.1
NameServer启动流程
启动入口我们从NamesrvStartup#main0开始
public static NamesrvController main0(String[] args) { try { //创建NameServer,其实就初始化一些变量,跳过 NamesrvController controller = createNamesrvController(args); //启动controller start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
接下来就是NamesrvStartup#start方法
public static NamesrvController start(final NamesrvController controller) throws Exception { //初始化,就是给一些属性初始化,其中remotingServer得到初始化 boolean initResult = controller.initialize(); //省略 //启动controller controller.start(); return controller; }
下面跟NamesrvController#start方法
public void start() throws Exception { //启动NettyServer服务器 this.remotingServer.start(); if (this.fileWatchService != null) { this.fileWatchService.start(); } }
此时在NettyRemotingServer#start就启动了一个ServerBootstrap服务端
public void start() { //省略 ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } }); //省略 try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } //省略 }
综上:NameServer的启动流程的核心就是在NettyRemotingServer#start就启动了一个ServerBootstrap并监听9876端口
Broker启动流程
还是直接看BrokerController#start方法吧,反正前面也是debug
public void start() throws Exception { //省略 //向NameServer注册自己的信息 if (!messageStoreConfig.isEnableDLegerCommitLog()) { startProcessorByHa(messageStoreConfig.getBrokerRole()); handleSlaveSynchronize(messageStoreConfig.getBrokerRole()); 向NameServer注册自己的信息 this.registerBrokerAll(true, false, true); } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //心跳,重复注册自己 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); } if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); } }
Broker给NameServer发心跳
Broker启动一个定时任务,每次都会向NameServer注册自己,不断覆盖到NameServer存的Broker的信息,从而达到心跳的效果,我只能说一个字,秀。
//BrokerController#start public void start() throws Exception { //省略 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //心跳,重复注册自己 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); //省略 }
broker和NameServer之间维护连接
NameServer维护和Broker之间的连接
////NamesrvController#initialize public boolean initialize() { //省略 //定时任务,根据broker注册到nameServer的时间与此时此刻时间的阈值去判断该broker是否还存活 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //省略 return true; }
总结
1 broker和NameServer之间的心跳我以为是发送心跳包去实现的,结果是通过不断的向nameserver注册自己实现的
2 NameServer通过定时任务不断的扫描brokerLiveTable去根据时间阈值(broker注册的时间和此时此刻的时间差距)实现维护连接