rocketmq学习2

简介: 前面我们已经通过quickstrat可以看到nameServer的启动:从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。NamesrvController的属性信息、构造函数:

前面我们已经通过quickstrat可以看到nameServer的启动:

/*** nameServer启动类*/publicclassNamesrvStartup {
privatestaticInternalLoggerlog;
privatestaticPropertiesproperties=null;
privatestaticCommandLinecommandLine=null;
publicstaticvoidmain(String[] args) {
main0(args);
    }
//启动NamesrvController做了:创建namesrvController和启动controller,返回controllerpublicstaticNamesrvControllermain0(String[] args) {
try {
NamesrvControllercontroller=createNamesrvController(args);
start(controller);
Stringtip="The Name Server boot success. serializeType="+RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
returncontroller;
        } catch (Throwablee) {
e.printStackTrace();
System.exit(-1);
        }
returnnull;
    }
//创建namesrvController:首先设置配置信息rocketmq的版本信息publicstaticNamesrvControllercreateNamesrvController(String[] args) throwsIOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();Optionsoptions=ServerUtil.buildCommandlineOptions(newOptions());
commandLine=ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), newPosixParser());
if (null==commandLine) {
System.exit(-1);
returnnull;
        }
//创建namesrvCnfig、nettyServerConfig,并进行配置的填充//采用-c和-p两种方式进行填充finalNamesrvConfignamesrvConfig=newNamesrvConfig();
finalNettyServerConfignettyServerConfig=newNettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
Stringfile=commandLine.getOptionValue('c');
if (file!=null) {
InputStreamin=newBufferedInputStream(newFileInputStream(file));
properties=newProperties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
            }
        }
if (commandLine.hasOption('p')) {
InternalLoggerconsole=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
        }
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null==namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
        }
LoggerContextlc= (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfiguratorconfigurator=newJoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() +"/conf/logback_namesrv.xml");
log=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
finalNamesrvControllercontroller=newNamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);
returncontroller;
    }
//启动namesrvControllerpublicstaticNamesrvControllerstart(finalNamesrvControllercontroller) throwsException {
if (null==controller) {
thrownewIllegalArgumentException("NamesrvController is null");
        }
//初始化:加载kv配置、创建网络对象,开启两个定时任务(心跳检查),注册jvm钩子booleaninitResult=controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
        }
//添加关机钩子函数Runtime.getRuntime().addShutdownHook(newShutdownHookThread(log, newCallable<Void>() {
@OverridepublicVoidcall() throwsException {
controller.shutdown();
returnnull;
            }
        }));
//启动controllercontroller.start();
returncontroller;
    }
publicstaticvoidshutdown(finalNamesrvControllercontroller) {
controller.shutdown();
    }
//使用-c和-p的方式进行环境变量参数的添加publicstaticOptionsbuildCommandlineOptions(finalOptionsoptions) {
Optionopt=newOption("c", "configFile", true, "Name server config properties file");
opt.setRequired(false);
options.addOption(opt);
opt=newOption("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
returnoptions;
    }
publicstaticPropertiesgetProperties() {
returnproperties;
    }
}

从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。

NamesrvController的属性信息、构造函数:

publicclassNamesrvController {
privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//namesrvConfig配置信息privatefinalNamesrvConfignamesrvConfig;
//NettyServer配置信息privatefinalNettyServerConfignettyServerConfig;
//定时任务privatefinalScheduledExecutorServicescheduledExecutorService=Executors.newSingleThreadScheduledExecutor(newThreadFactoryImpl(
"NSScheduledThread"));
//kv配置管理privatefinalKVConfigManagerkvConfigManager;
//路由信息管理privatefinalRouteInfoManagerrouteInfoManager;
//远程服务privateRemotingServerremotingServer;
//brokerHousekeepingServiceprivateBrokerHousekeepingServicebrokerHousekeepingService;
privateExecutorServiceremotingExecutor;
//配置privateConfigurationconfiguration;
privateFileWatchServicefileWatchService;
//namesrvController:从入参可以看到里面包含两个重要的配置namesrvConfig、nettyServerConfig//除了这两个重要的配置之外,还添加了kvConfigManager、routeInfoManager、brokerHousekeepingService、//ConfigurationpublicNamesrvController(NamesrvConfignamesrvConfig, NettyServerConfignettyServerConfig) {
this.namesrvConfig=namesrvConfig;
this.nettyServerConfig=nettyServerConfig;
this.kvConfigManager=newKVConfigManager(this);
this.routeInfoManager=newRouteInfoManager();
this.brokerHousekeepingService=newBrokerHousekeepingService(this);
this.configuration=newConfiguration(
log,
this.namesrvConfig, this.nettyServerConfig        );
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }
}

NamesrvConfig配置信息:

publicclassNamesrvConfig {
//rocketmqHome信息privateStringrocketmqHome=System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//K-V配置路径privateStringkvConfigPath=System.getProperty("user.home") +File.separator+"namesrv"+File.separator+"kvConfig.json";
//配置存储路径privateStringconfigStorePath=System.getProperty("user.home") +File.separator+"namesrv"+File.separator+"namesrv.properties";
//productEnvNameprivateStringproductEnvName="center";
//clusterTesprivatebooleanclusterTest=false;
//消息有序默认falseprivatebooleanorderMessageEnable=false;
}

NettysrvComfig配置信息

/*** NettyServer配置*/publicclassNettyServerConfigimplementsCloneable {
//监听端口号privateintlistenPort=8888;
//worker线程数privateintserverWorkerThreads=8;
//callbackExecutor线程数privateintserverCallbackExecutorThreads=0;
//选择器线程数privateintserverSelectorThreads=3;
//OnewaySemaphoreValue值privateintserverOnewaySemaphoreValue=256;
//异步SemaphoreValue值privateintserverAsyncSemaphoreValue=64;
//通道最大闲置时间privateintserverChannelMaxIdleTimeSeconds=120;
//socket的发送和接收的bufSizeprivateintserverSocketSndBufSize=NettySystemConfig.socketSndbufSize;
privateintserverSocketRcvBufSize=NettySystemConfig.socketRcvbufSize;
//开启堆外内存privatebooleanserverPooledByteBufAllocatorEnable=true;
}

RouteInfoManager路由信息:

***路由信息*/publicclassRouteInfoManager {
privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//broker通道过期时间privatefinalstaticlongBROKER_CHANNEL_EXPIRED_TIME=1000*60*2;
//读写锁privatefinalReadWriteLocklock=newReentrantReadWriteLock();
//topic消息队列路由表privatefinalHashMap<String/* topic */, List<QueueData>>topicQueueTable;
//broker基础表privatefinalHashMap<String/* brokerName */, BrokerData>brokerAddrTable;
//cluster集群表privatefinalHashMap<String/* clusterName */, Set<String/* brokerName */>>clusterAddrTable;
//broker状态表privatefinalHashMap<String/* brokerAddr */, BrokerLiveInfo>brokerLiveTable;
//FilterServer表privatefinalHashMap<String/* brokerAddr */, List<String>/* Filter Server */>filterServerTable;
//路由信息构造函数publicRouteInfoManager() {
this.topicQueueTable=newHashMap<String, List<QueueData>>(1024);
this.brokerAddrTable=newHashMap<String, BrokerData>(128);
this.clusterAddrTable=newHashMap<String, Set<String>>(32);
this.brokerLiveTable=newHashMap<String, BrokerLiveInfo>(256);
this.filterServerTable=newHashMap<String, List<String>>(256);
    }

start(final NamesrvController controller)中进行初始化,创建nettyServer对象,开启两个定时任务,添加关闭钩子,然后进行启动操作

//进行初始化publicbooleaninitialize() {
//加载kv配置信息this.kvConfigManager.load();
//netty远程服务:nettyServer配置、brokerHousekeepingServicethis.remotingServer=newNettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//远程线程this.remotingExecutor=Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), newThreadFactoryImpl("RemotingExecutorThread_"));
//注册处理this.registerProcessor();
//下面两个定时任务是心跳检查//定时任务:路由信息管理扫描没有激活的brokerthis.scheduledExecutorService.scheduleAtFixedRate(newRunnable() {
@Overridepublicvoidrun() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
//定时任务:kv配置管理调用定时打印所有this.scheduledExecutorService.scheduleAtFixedRate(newRunnable() {
@Overridepublicvoidrun() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
//注册一个监听去重新加载Ssl上下文if (TlsSystemConfig.tlsMode!=TlsMode.DISABLED) {
// Register a listener to reload SslContexttry {
fileWatchService=newFileWatchService(
newString[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath                },
newFileWatchService.Listener() {
booleancertChanged, keyChanged=false;
@OverridepublicvoidonChanged(Stringpath) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
                        }
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged=true;
                        }
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged=true;
                        }
if (certChanged&&keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged=keyChanged=false;
reloadServerSslContext();
                        }
                    }
privatevoidreloadServerSslContext() {
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exceptione) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }
returntrue;
}

启动服务:

publicvoidstart() throwsException {
this.remotingServer.start();
if (this.fileWatchService!=null) {
this.fileWatchService.start();
    }
}

启动服务

//reomotingCelint,使用Netty,可以看到DefaultEventExecutorGroup继承MultithreadEventExecutorGroup//而在Netty中,我们知道MultithreadEventExecutorGroup的构造方法是NioEventLoopGroup的构造方法//构造方法:DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory)@Overridepublicvoidstart() {
//创建NioEventLoopGroupthis.defaultEventExecutorGroup=newDefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
newThreadFactory() {
privateAtomicIntegerthreadIndex=newAtomicInteger(0);
//重写线程方法@OverridepublicThreadnewThread(Runnabler) {
returnnewThread(r, "NettyClientWorkerThread_"+this.threadIndex.incrementAndGet());
            }
        });
//创建引导 客户端 填充信息Bootstraphandler=this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(newChannelInitializer<SocketChannel>() {
//重写initChannel方法@OverridepublicvoidinitChannel(SocketChannelch) throwsException {
ChannelPipelinepipeline=ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null!=sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
                    } else {
log.warn("Connections are insecure as SSLContext is null!");
                    }
                }
//pipeline添加信息,以及handlerpipeline.addLast(
defaultEventExecutorGroup,
newNettyEncoder(),
newNettyDecoder(),
newIdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
newNettyConnectManageHandler(),
newNettyClientHandler());
            }
        });
//扫描响应表启动 定时任务this.timer.scheduleAtFixedRate(newTimerTask() {
@Overridepublicvoidrun() {
try {
NettyRemotingClient.this.scanResponseTable();
            } catch (Throwablee) {
log.error("scanResponseTable exception", e);
            }
        }
    }, 1000*3, 1000);
//如果通道事件监听不为空,则启动if (this.channelEventListener!=null) {
this.nettyEventExecutor.start();
    }
}

定时任务:

/*** <p>* This method is periodically invoked to scan and expire deprecated request.* 定期调用此方法以扫描和终止已弃用的请求* </p>*/publicvoidscanResponseTable() {
finalList<ResponseFuture>rfList=newLinkedList<ResponseFuture>();
//迭代器Iterator<Entry<Integer, ResponseFuture>>it=this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture>next=it.next();
ResponseFuturerep=next.getValue();
if ((rep.getBeginTimestamp() +rep.getTimeoutMillis() +1000) <=System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, "+rep);
        }
    }
for (ResponseFuturerf : rfList) {
try {
executeInvokeCallback(rf);
        } catch (Throwablee) {
log.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}

总结:从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
6月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
161 0
|
3月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
3月前
|
消息中间件
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
62 0
|
6月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
73 0
|
5月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
6月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
58 0
|
6月前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
745 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
1354 13
|
6月前
|
消息中间件 存储 缓存
消息队列学习之rocketmq
【4月更文挑战第1天】消息队列学习之rocketmq
45 0