首先从github中拉取Rocketmq的代码,进行运行。
1.由于rocketmq需要依赖nameServer,类似于zookeeper。首先启动时,配置好NamesrvStartup的环境变量信息,也即rocketmq的ROCKEMQ_HOME与你的项目对应。接着就可以启动了。
/*** nameServer启动类*/publicclassNamesrvStartup { privatestaticInternalLoggerlog; privatestaticPropertiesproperties=null; privatestaticCommandLinecommandLine=null; publicstaticvoidmain(String[] args) { main0(args); } //启动NamesrvController做了:创建namesrvController和启动controller,返回controllerpublicstaticNamesrvControllermain0(String[] args) { try { NamesrvControllercontroller=createNamesrvController(args); start(controller); Stringtip="The Name Server boot success. serializeType="+RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); returncontroller; } catch (Throwablee) { e.printStackTrace(); System.exit(-1); } returnnull; } //创建namesrvController:首先设置配置信息rocketmq的版本信息publicstaticNamesrvControllercreateNamesrvController(String[] args) throwsIOException, JoranException { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson();Optionsoptions=ServerUtil.buildCommandlineOptions(newOptions()); commandLine=ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), newPosixParser()); if (null==commandLine) { System.exit(-1); returnnull; } //创建namesrvCnfig、nettyServerConfig,并进行配置的填充//采用-c和-p两种方式进行填充finalNamesrvConfignamesrvConfig=newNamesrvConfig(); finalNettyServerConfignettyServerConfig=newNettyServerConfig(); nettyServerConfig.setListenPort(9876); if (commandLine.hasOption('c')) { Stringfile=commandLine.getOptionValue('c'); if (file!=null) { InputStreamin=newBufferedInputStream(newFileInputStream(file)); properties=newProperties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } if (commandLine.hasOption('p')) { InternalLoggerconsole=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null==namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } LoggerContextlc= (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfiguratorconfigurator=newJoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() +"/conf/logback_namesrv.xml"); log=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); finalNamesrvControllercontroller=newNamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties); returncontroller; } //启动namesrvControllerpublicstaticNamesrvControllerstart(finalNamesrvControllercontroller) throwsException { if (null==controller) { thrownewIllegalArgumentException("NamesrvController is null"); } booleaninitResult=controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //添加钩子函数Runtime.getRuntime().addShutdownHook(newShutdownHookThread(log, newCallable<Void>() { publicVoidcall() throwsException { controller.shutdown(); returnnull; } })); //启动controllercontroller.start(); returncontroller; } publicstaticvoidshutdown(finalNamesrvControllercontroller) { controller.shutdown(); } //使用-c和-p的方式进行环境变量参数的添加publicstaticOptionsbuildCommandlineOptions(finalOptionsoptions) { Optionopt=newOption("c", "configFile", true, "Name server config properties file"); opt.setRequired(false); options.addOption(opt); opt=newOption("p", "printConfigItem", false, "Print all config item"); opt.setRequired(false); options.addOption(opt); returnoptions; } publicstaticPropertiesgetProperties() { returnproperties; } }
启动NamesrvStartup的时候,控制台看到启动成功,则说明启动成功了。
2.启动brokerStartup
启动broker的时候,需要先配置ROCKETMQ_HOME。接着配置conf,里面有三个配置类信息
配置好了,就可以启动broker了
/*** broker启动类*/publicclassBrokerStartup { publicstaticPropertiesproperties=null; publicstaticCommandLinecommandLine=null; publicstaticStringconfigFile=null; publicstaticInternalLoggerlog; publicstaticvoidmain(String[] args) { start(createBrokerController(args)); } //启动brokerControllerpublicstaticBrokerControllerstart(BrokerControllercontroller) { try { controller.start(); Stringtip="The broker["+controller.getBrokerConfig().getBrokerName() +", "+controller.getBrokerAddr() +"] boot success. serializeType="+RemotingCommand.getSerializeTypeConfigInThisServer(); if (null!=controller.getBrokerConfig().getNamesrvAddr()) { tip+=" and name server is "+controller.getBrokerConfig().getNamesrvAddr(); } log.info(tip); System.out.printf("%s%n", tip); returncontroller; } catch (Throwablee) { e.printStackTrace(); System.exit(-1); } returnnull; } //关闭brokerpublicstaticvoidshutdown(finalBrokerControllercontroller) { if (null!=controller) { controller.shutdown(); } } //创建BrokerController,可以看到配置信息:// brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig//在jvm中添加钩子函数publicstaticBrokerControllercreateBrokerController(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); if (null==System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize=131072; } if (null==System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize=131072; } try { //PackageConflictDetect.detectFastjson();Optionsoptions=ServerUtil.buildCommandlineOptions(newOptions()); commandLine=ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), newPosixParser()); if (null==commandLine) { System.exit(-1); } finalBrokerConfigbrokerConfig=newBrokerConfig(); finalNettyServerConfignettyServerConfig=newNettyServerConfig(); finalNettyClientConfignettyClientConfig=newNettyClientConfig(); nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode==TlsMode.ENFORCING)))); nettyServerConfig.setListenPort(10911); finalMessageStoreConfigmessageStoreConfig=newMessageStoreConfig(); if (BrokerRole.SLAVE==messageStoreConfig.getBrokerRole()) { intratio=messageStoreConfig.getAccessMessageInMemoryMaxRatio() -10; messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); } if (commandLine.hasOption('c')) { Stringfile=commandLine.getOptionValue('c'); if (file!=null) { configFile=file; InputStreamin=newBufferedInputStream(newFileInputStream(file)); properties=newProperties(); properties.load(in); properties2SystemEnv(properties); MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); MixAll.properties2Object(properties, messageStoreConfig); BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); if (null==brokerConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } StringnamesrvAddr=brokerConfig.getNamesrvAddr(); if (null!=namesrvAddr) { try { String[] addrArray=namesrvAddr.split(";"); for (Stringaddr : addrArray) { RemotingUtil.string2SocketAddress(addr); } } catch (Exceptione) { System.out.printf( "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr); System.exit(-3); } } switch (messageStoreConfig.getBrokerRole()) { caseASYNC_MASTER: caseSYNC_MASTER: brokerConfig.setBrokerId(MixAll.MASTER_ID); break; caseSLAVE: if (brokerConfig.getBrokerId() <=0) { System.out.printf("Slave's brokerId must be > 0"); System.exit(-3); } break; default: break; } if (messageStoreConfig.isEnableDLegerCommitLog()) { brokerConfig.setBrokerId(-1); } messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() +1); LoggerContextlc= (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfiguratorconfigurator=newJoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(brokerConfig.getRocketmqHome() +"/conf/logback_broker.xml"); //使用-p或者m添加参数if (commandLine.hasOption('p')) { InternalLoggerconsole=InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig); MixAll.printObjectProperties(console, nettyServerConfig); MixAll.printObjectProperties(console, nettyClientConfig); MixAll.printObjectProperties(console, messageStoreConfig); System.exit(0); } elseif (commandLine.hasOption('m')) { InternalLoggerconsole=InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); MixAll.printObjectProperties(console, brokerConfig, true); MixAll.printObjectProperties(console, nettyServerConfig, true); MixAll.printObjectProperties(console, nettyClientConfig, true); MixAll.printObjectProperties(console, messageStoreConfig, true); System.exit(0); } log=InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); MixAll.printObjectProperties(log, brokerConfig); MixAll.printObjectProperties(log, nettyServerConfig); MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); finalBrokerControllercontroller=newBrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); // remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties); booleaninitResult=controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //在jvm里面添加钩子函数Runtime.getRuntime().addShutdownHook(newThread(newRunnable() { privatevolatilebooleanhasShutdown=false; privateAtomicIntegershutdownTimes=newAtomicInteger(0); publicvoidrun() { synchronized (this) { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown=true; longbeginTime=System.currentTimeMillis(); controller.shutdown(); longconsumingTimeTotal=System.currentTimeMillis() -beginTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }, "ShutdownHook")); returncontroller; } catch (Throwablee) { e.printStackTrace(); System.exit(-1); } returnnull; } privatestaticvoidproperties2SystemEnv(Propertiesproperties) { if (properties==null) { return; } StringrmqAddressServerDomain=properties.getProperty("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME); StringrmqAddressServerSubGroup=properties.getProperty("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP); System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain); System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup); } //构建c、p、m方式添加参数信息privatestaticOptionsbuildCommandlineOptions(finalOptionsoptions) { Optionopt=newOption("c", "configFile", true, "Broker config properties file"); opt.setRequired(false); options.addOption(opt); opt=newOption("p", "printConfigItem", false, "Print all config item"); opt.setRequired(false); options.addOption(opt); opt=newOption("m", "printImportantConfig", false, "Print important config item"); opt.setRequired(false); options.addOption(opt); returnoptions; } }
控制台打印启动的信息可以看到启动成功
3.启动example中的producer
/*** This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.* 生产者启动类*/publicclassProducer { publicstaticvoidmain(String[] args) throwsMQClientException, InterruptedException { /** Instantiate with a producer group name.*/DefaultMQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name"); /** Specify name server addresses.* <p/>** Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR* <pre>* {@code* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");* }* </pre>*//** Launch the instance.*/producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (inti=0; i<1000; i++) { try { /** Create a message instance, specifying topic, tag and message body.*/Messagemsg=newMessage("TopicTest"/* Topic */, "TagA"/* Tag */, ("Hello RocketMQ "+i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); /** Call send message to deliver message to one of brokers.*/SendResultsendResult=producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exceptione) { e.printStackTrace(); Thread.sleep(1000); } } /** Shut down once the producer instance is not longer in use.*/producer.shutdown(); } }
4.启动example中的consumer
/*** This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.* 消费者启动类*/publicclassConsumer { publicstaticvoidmain(String[] args) throwsInterruptedException, MQClientException { /** Instantiate with specified consumer group name.*/DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name_4"); /** Specify name server addresses.* <p/>** Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR* <pre>* {@code* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");* }* </pre>*//** Specify where to start in case the specified consumer group is a brand new one.*/consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /** Subscribe one more more topics to consume.*/consumer.subscribe("TopicTest", "*"); /** Register callback to execute on arrival of messages fetched from brokers.*/consumer.registerMessageListener(newMessageListenerConcurrently() { publicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>msgs, ConsumeConcurrentlyContextcontext) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** Launch the consumer instance.*/consumer.start(); System.out.printf("Consumer Started.%n"); } }
此时你还可以看到你创建好的data里面产生的文件信息:
可以看到里面包含了提交的日志信息、配置信息、消费者队列、索引、abort、checkpoint、lock。同时这个文件夹比较大,有2.41G。
启动完成,看到生产者和消费者的信息时,说明源码环境构建完成。同时从启动来看,也提供了分析源码和学习源码的方向,首先nameserver,然后broker,接着producer发送消息,producer消费消息。