前面我们已经通过quickstrat可以看到nameServer的启动:
/*** nameServer启动类*/publicclassNamesrvStartup { privatestaticInternalLoggerlog; privatestaticPropertiesproperties=null; privatestaticCommandLinecommandLine=null; publicstaticvoidmain(String[] args) { main0(args); } //启动NamesrvController做了:创建namesrvController和启动controller,返回controllerpublicstaticNamesrvControllermain0(String[] args) { try { NamesrvControllercontroller=createNamesrvController(args); start(controller); Stringtip="The Name Server boot success. serializeType="+RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); returncontroller; } catch (Throwablee) { e.printStackTrace(); System.exit(-1); } returnnull; } //创建namesrvController:首先设置配置信息rocketmq的版本信息publicstaticNamesrvControllercreateNamesrvController(String[] args) throwsIOException, JoranException { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson();Optionsoptions=ServerUtil.buildCommandlineOptions(newOptions()); commandLine=ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), newPosixParser()); if (null==commandLine) { System.exit(-1); returnnull; } //创建namesrvCnfig、nettyServerConfig,并进行配置的填充//采用-c和-p两种方式进行填充finalNamesrvConfignamesrvConfig=newNamesrvConfig(); finalNettyServerConfignettyServerConfig=newNettyServerConfig(); nettyServerConfig.setListenPort(9876); if (commandLine.hasOption('c')) { Stringfile=commandLine.getOptionValue('c'); if (file!=null) { InputStreamin=newBufferedInputStream(newFileInputStream(file)); properties=newProperties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } if (commandLine.hasOption('p')) { InternalLoggerconsole=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null==namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } LoggerContextlc= (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfiguratorconfigurator=newJoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() +"/conf/logback_namesrv.xml"); log=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); finalNamesrvControllercontroller=newNamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties); returncontroller; } //启动namesrvControllerpublicstaticNamesrvControllerstart(finalNamesrvControllercontroller) throwsException { if (null==controller) { thrownewIllegalArgumentException("NamesrvController is null"); } //初始化:加载kv配置、创建网络对象,开启两个定时任务(心跳检查),注册jvm钩子booleaninitResult=controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //添加关机钩子函数Runtime.getRuntime().addShutdownHook(newShutdownHookThread(log, newCallable<Void>() { publicVoidcall() throwsException { controller.shutdown(); returnnull; } })); //启动controllercontroller.start(); returncontroller; } publicstaticvoidshutdown(finalNamesrvControllercontroller) { controller.shutdown(); } //使用-c和-p的方式进行环境变量参数的添加publicstaticOptionsbuildCommandlineOptions(finalOptionsoptions) { Optionopt=newOption("c", "configFile", true, "Name server config properties file"); opt.setRequired(false); options.addOption(opt); opt=newOption("p", "printConfigItem", false, "Print all config item"); opt.setRequired(false); options.addOption(opt); returnoptions; } publicstaticPropertiesgetProperties() { returnproperties; } }
从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。
NamesrvController的属性信息、构造函数:
publicclassNamesrvController { privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); //namesrvConfig配置信息privatefinalNamesrvConfignamesrvConfig; //NettyServer配置信息privatefinalNettyServerConfignettyServerConfig; //定时任务privatefinalScheduledExecutorServicescheduledExecutorService=Executors.newSingleThreadScheduledExecutor(newThreadFactoryImpl( "NSScheduledThread")); //kv配置管理privatefinalKVConfigManagerkvConfigManager; //路由信息管理privatefinalRouteInfoManagerrouteInfoManager; //远程服务privateRemotingServerremotingServer; //brokerHousekeepingServiceprivateBrokerHousekeepingServicebrokerHousekeepingService; privateExecutorServiceremotingExecutor; //配置privateConfigurationconfiguration; privateFileWatchServicefileWatchService; //namesrvController:从入参可以看到里面包含两个重要的配置namesrvConfig、nettyServerConfig//除了这两个重要的配置之外,还添加了kvConfigManager、routeInfoManager、brokerHousekeepingService、//ConfigurationpublicNamesrvController(NamesrvConfignamesrvConfig, NettyServerConfignettyServerConfig) { this.namesrvConfig=namesrvConfig; this.nettyServerConfig=nettyServerConfig; this.kvConfigManager=newKVConfigManager(this); this.routeInfoManager=newRouteInfoManager(); this.brokerHousekeepingService=newBrokerHousekeepingService(this); this.configuration=newConfiguration( log, this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); } }
NamesrvConfig配置信息:
publicclassNamesrvConfig { //rocketmqHome信息privateStringrocketmqHome=System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); //K-V配置路径privateStringkvConfigPath=System.getProperty("user.home") +File.separator+"namesrv"+File.separator+"kvConfig.json"; //配置存储路径privateStringconfigStorePath=System.getProperty("user.home") +File.separator+"namesrv"+File.separator+"namesrv.properties"; //productEnvNameprivateStringproductEnvName="center"; //clusterTesprivatebooleanclusterTest=false; //消息有序默认falseprivatebooleanorderMessageEnable=false; }
NettysrvComfig配置信息
/*** NettyServer配置*/publicclassNettyServerConfigimplementsCloneable { //监听端口号privateintlistenPort=8888; //worker线程数privateintserverWorkerThreads=8; //callbackExecutor线程数privateintserverCallbackExecutorThreads=0; //选择器线程数privateintserverSelectorThreads=3; //OnewaySemaphoreValue值privateintserverOnewaySemaphoreValue=256; //异步SemaphoreValue值privateintserverAsyncSemaphoreValue=64; //通道最大闲置时间privateintserverChannelMaxIdleTimeSeconds=120; //socket的发送和接收的bufSizeprivateintserverSocketSndBufSize=NettySystemConfig.socketSndbufSize; privateintserverSocketRcvBufSize=NettySystemConfig.socketRcvbufSize; //开启堆外内存privatebooleanserverPooledByteBufAllocatorEnable=true; }
RouteInfoManager路由信息:
***路由信息*/publicclassRouteInfoManager { privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); //broker通道过期时间privatefinalstaticlongBROKER_CHANNEL_EXPIRED_TIME=1000*60*2; //读写锁privatefinalReadWriteLocklock=newReentrantReadWriteLock(); //topic消息队列路由表privatefinalHashMap<String/* topic */, List<QueueData>>topicQueueTable; //broker基础表privatefinalHashMap<String/* brokerName */, BrokerData>brokerAddrTable; //cluster集群表privatefinalHashMap<String/* clusterName */, Set<String/* brokerName */>>clusterAddrTable; //broker状态表privatefinalHashMap<String/* brokerAddr */, BrokerLiveInfo>brokerLiveTable; //FilterServer表privatefinalHashMap<String/* brokerAddr */, List<String>/* Filter Server */>filterServerTable; //路由信息构造函数publicRouteInfoManager() { this.topicQueueTable=newHashMap<String, List<QueueData>>(1024); this.brokerAddrTable=newHashMap<String, BrokerData>(128); this.clusterAddrTable=newHashMap<String, Set<String>>(32); this.brokerLiveTable=newHashMap<String, BrokerLiveInfo>(256); this.filterServerTable=newHashMap<String, List<String>>(256); }
start(final NamesrvController controller)中进行初始化,创建nettyServer对象,开启两个定时任务,添加关闭钩子,然后进行启动操作
//进行初始化publicbooleaninitialize() { //加载kv配置信息this.kvConfigManager.load(); //netty远程服务:nettyServer配置、brokerHousekeepingServicethis.remotingServer=newNettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //远程线程this.remotingExecutor=Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), newThreadFactoryImpl("RemotingExecutorThread_")); //注册处理this.registerProcessor(); //下面两个定时任务是心跳检查//定时任务:路由信息管理扫描没有激活的brokerthis.scheduledExecutorService.scheduleAtFixedRate(newRunnable() { publicvoidrun() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //定时任务:kv配置管理调用定时打印所有this.scheduledExecutorService.scheduleAtFixedRate(newRunnable() { publicvoidrun() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); //注册一个监听去重新加载Ssl上下文if (TlsSystemConfig.tlsMode!=TlsMode.DISABLED) { // Register a listener to reload SslContexttry { fileWatchService=newFileWatchService( newString[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, newFileWatchService.Listener() { booleancertChanged, keyChanged=false; publicvoidonChanged(Stringpath) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged=true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged=true; } if (certChanged&&keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged=keyChanged=false; reloadServerSslContext(); } } privatevoidreloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exceptione) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } returntrue; }
启动服务:
publicvoidstart() throwsException { this.remotingServer.start(); if (this.fileWatchService!=null) { this.fileWatchService.start(); } }
启动服务
//reomotingCelint,使用Netty,可以看到DefaultEventExecutorGroup继承MultithreadEventExecutorGroup//而在Netty中,我们知道MultithreadEventExecutorGroup的构造方法是NioEventLoopGroup的构造方法//构造方法:DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory)publicvoidstart() { //创建NioEventLoopGroupthis.defaultEventExecutorGroup=newDefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), newThreadFactory() { privateAtomicIntegerthreadIndex=newAtomicInteger(0); //重写线程方法publicThreadnewThread(Runnabler) { returnnewThread(r, "NettyClientWorkerThread_"+this.threadIndex.incrementAndGet()); } }); //创建引导 客户端 填充信息Bootstraphandler=this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(newChannelInitializer<SocketChannel>() { //重写initChannel方法publicvoidinitChannel(SocketChannelch) throwsException { ChannelPipelinepipeline=ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null!=sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } //pipeline添加信息,以及handlerpipeline.addLast( defaultEventExecutorGroup, newNettyEncoder(), newNettyDecoder(), newIdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), newNettyConnectManageHandler(), newNettyClientHandler()); } }); //扫描响应表启动 定时任务this.timer.scheduleAtFixedRate(newTimerTask() { publicvoidrun() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwablee) { log.error("scanResponseTable exception", e); } } }, 1000*3, 1000); //如果通道事件监听不为空,则启动if (this.channelEventListener!=null) { this.nettyEventExecutor.start(); } }
定时任务:
/*** <p>* This method is periodically invoked to scan and expire deprecated request.* 定期调用此方法以扫描和终止已弃用的请求* </p>*/publicvoidscanResponseTable() { finalList<ResponseFuture>rfList=newLinkedList<ResponseFuture>(); //迭代器Iterator<Entry<Integer, ResponseFuture>>it=this.responseTable.entrySet().iterator(); while (it.hasNext()) { Entry<Integer, ResponseFuture>next=it.next(); ResponseFuturerep=next.getValue(); if ((rep.getBeginTimestamp() +rep.getTimeoutMillis() +1000) <=System.currentTimeMillis()) { rep.release(); it.remove(); rfList.add(rep); log.warn("remove timeout request, "+rep); } } for (ResponseFuturerf : rfList) { try { executeInvokeCallback(rf); } catch (Throwablee) { log.warn("scanResponseTable, operationComplete Exception", e); } } }
总结:从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。