Rocketmq学习一

简介: 首先从github中拉取Rocketmq的代码,进行运行。1.由于rocketmq需要依赖nameServer,类似于zookeeper。首先启动时,配置好NamesrvStartup的环境变量信息,也即rocketmq的ROCKEMQ_HOME与你的项目对应。接着就可以启动了。

首先从github中拉取Rocketmq的代码,进行运行。

1.由于rocketmq需要依赖nameServer,类似于zookeeper。首先启动时,配置好NamesrvStartup的环境变量信息,也即rocketmq的ROCKEMQ_HOME与你的项目对应。接着就可以启动了。

/*** nameServer启动类*/publicclassNamesrvStartup {
privatestaticInternalLoggerlog;
privatestaticPropertiesproperties=null;
privatestaticCommandLinecommandLine=null;
publicstaticvoidmain(String[] args) {
main0(args);
    }
//启动NamesrvController做了:创建namesrvController和启动controller,返回controllerpublicstaticNamesrvControllermain0(String[] args) {
try {
NamesrvControllercontroller=createNamesrvController(args);
start(controller);
Stringtip="The Name Server boot success. serializeType="+RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
returncontroller;
        } catch (Throwablee) {
e.printStackTrace();
System.exit(-1);
        }
returnnull;
    }
//创建namesrvController:首先设置配置信息rocketmq的版本信息publicstaticNamesrvControllercreateNamesrvController(String[] args) throwsIOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();Optionsoptions=ServerUtil.buildCommandlineOptions(newOptions());
commandLine=ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), newPosixParser());
if (null==commandLine) {
System.exit(-1);
returnnull;
        }
//创建namesrvCnfig、nettyServerConfig,并进行配置的填充//采用-c和-p两种方式进行填充finalNamesrvConfignamesrvConfig=newNamesrvConfig();
finalNettyServerConfignettyServerConfig=newNettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
Stringfile=commandLine.getOptionValue('c');
if (file!=null) {
InputStreamin=newBufferedInputStream(newFileInputStream(file));
properties=newProperties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
            }
        }
if (commandLine.hasOption('p')) {
InternalLoggerconsole=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
        }
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null==namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
        }
LoggerContextlc= (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfiguratorconfigurator=newJoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() +"/conf/logback_namesrv.xml");
log=InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
finalNamesrvControllercontroller=newNamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);
returncontroller;
    }
//启动namesrvControllerpublicstaticNamesrvControllerstart(finalNamesrvControllercontroller) throwsException {
if (null==controller) {
thrownewIllegalArgumentException("NamesrvController is null");
        }
booleaninitResult=controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
        }
//添加钩子函数Runtime.getRuntime().addShutdownHook(newShutdownHookThread(log, newCallable<Void>() {
@OverridepublicVoidcall() throwsException {
controller.shutdown();
returnnull;
            }
        }));
//启动controllercontroller.start();
returncontroller;
    }
publicstaticvoidshutdown(finalNamesrvControllercontroller) {
controller.shutdown();
    }
//使用-c和-p的方式进行环境变量参数的添加publicstaticOptionsbuildCommandlineOptions(finalOptionsoptions) {
Optionopt=newOption("c", "configFile", true, "Name server config properties file");
opt.setRequired(false);
options.addOption(opt);
opt=newOption("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
returnoptions;
    }
publicstaticPropertiesgetProperties() {
returnproperties;
    }
}

微信图片_20221214024210.png


启动NamesrvStartup的时候,控制台看到启动成功,则说明启动成功了。

2.启动brokerStartup

启动broker的时候,需要先配置ROCKETMQ_HOME。接着配置conf,里面有三个配置类信息

微信图片_20221214024207.png

配置好了,就可以启动broker了

/*** broker启动类*/publicclassBrokerStartup {
publicstaticPropertiesproperties=null;
publicstaticCommandLinecommandLine=null;
publicstaticStringconfigFile=null;
publicstaticInternalLoggerlog;
publicstaticvoidmain(String[] args) {
start(createBrokerController(args));
    }
//启动brokerControllerpublicstaticBrokerControllerstart(BrokerControllercontroller) {
try {
controller.start();
Stringtip="The broker["+controller.getBrokerConfig().getBrokerName() +", "+controller.getBrokerAddr() +"] boot success. serializeType="+RemotingCommand.getSerializeTypeConfigInThisServer();
if (null!=controller.getBrokerConfig().getNamesrvAddr()) {
tip+=" and name server is "+controller.getBrokerConfig().getNamesrvAddr();
            }
log.info(tip);
System.out.printf("%s%n", tip);
returncontroller;
        } catch (Throwablee) {
e.printStackTrace();
System.exit(-1);
        }
returnnull;
    }
//关闭brokerpublicstaticvoidshutdown(finalBrokerControllercontroller) {
if (null!=controller) {
controller.shutdown();
        }
    }
//创建BrokerController,可以看到配置信息:// brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig//在jvm中添加钩子函数publicstaticBrokerControllercreateBrokerController(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
if (null==System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize=131072;
        }
if (null==System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize=131072;
        }
try {
//PackageConflictDetect.detectFastjson();Optionsoptions=ServerUtil.buildCommandlineOptions(newOptions());
commandLine=ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
newPosixParser());
if (null==commandLine) {
System.exit(-1);
            }
finalBrokerConfigbrokerConfig=newBrokerConfig();
finalNettyServerConfignettyServerConfig=newNettyServerConfig();
finalNettyClientConfignettyClientConfig=newNettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode==TlsMode.ENFORCING))));
nettyServerConfig.setListenPort(10911);
finalMessageStoreConfigmessageStoreConfig=newMessageStoreConfig();
if (BrokerRole.SLAVE==messageStoreConfig.getBrokerRole()) {
intratio=messageStoreConfig.getAccessMessageInMemoryMaxRatio() -10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
            }
if (commandLine.hasOption('c')) {
Stringfile=commandLine.getOptionValue('c');
if (file!=null) {
configFile=file;
InputStreamin=newBufferedInputStream(newFileInputStream(file));
properties=newProperties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
                }
            }
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
if (null==brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
            }
StringnamesrvAddr=brokerConfig.getNamesrvAddr();
if (null!=namesrvAddr) {
try {
String[] addrArray=namesrvAddr.split(";");
for (Stringaddr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
                    }
                } catch (Exceptione) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
                }
            }
switch (messageStoreConfig.getBrokerRole()) {
caseASYNC_MASTER:
caseSYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
caseSLAVE:
if (brokerConfig.getBrokerId() <=0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
                    }
break;
default:
break;
            }
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
            }
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() +1);
LoggerContextlc= (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfiguratorconfigurator=newJoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() +"/conf/logback_broker.xml");
//使用-p或者m添加参数if (commandLine.hasOption('p')) {
InternalLoggerconsole=InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
            } elseif (commandLine.hasOption('m')) {
InternalLoggerconsole=InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
            }
log=InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
finalBrokerControllercontroller=newBrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);
booleaninitResult=controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
            }
//在jvm里面添加钩子函数Runtime.getRuntime().addShutdownHook(newThread(newRunnable() {
privatevolatilebooleanhasShutdown=false;
privateAtomicIntegershutdownTimes=newAtomicInteger(0);
@Overridepublicvoidrun() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown=true;
longbeginTime=System.currentTimeMillis();
controller.shutdown();
longconsumingTimeTotal=System.currentTimeMillis() -beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                        }
                    }
                }
            }, "ShutdownHook"));
returncontroller;
        } catch (Throwablee) {
e.printStackTrace();
System.exit(-1);
        }
returnnull;
    }
privatestaticvoidproperties2SystemEnv(Propertiesproperties) {
if (properties==null) {
return;
        }
StringrmqAddressServerDomain=properties.getProperty("rmqAddressServerDomain", MixAll.WS_DOMAIN_NAME);
StringrmqAddressServerSubGroup=properties.getProperty("rmqAddressServerSubGroup", MixAll.WS_DOMAIN_SUBGROUP);
System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain);
System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
    }
//构建c、p、m方式添加参数信息privatestaticOptionsbuildCommandlineOptions(finalOptionsoptions) {
Optionopt=newOption("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
opt=newOption("p", "printConfigItem", false, "Print all config item");
opt.setRequired(false);
options.addOption(opt);
opt=newOption("m", "printImportantConfig", false, "Print important config item");
opt.setRequired(false);
options.addOption(opt);
returnoptions;
    }
}


微信图片_20221214024201.png

控制台打印启动的信息可以看到启动成功

3.启动example中的producer

/*** This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.* 生产者启动类*/publicclassProducer {
publicstaticvoidmain(String[] args) throwsMQClientException, InterruptedException {
/** Instantiate with a producer group name.*/DefaultMQProducerproducer=newDefaultMQProducer("please_rename_unique_group_name");
/** Specify name server addresses.* <p/>** Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR* <pre>* {@code* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");* }* </pre>*//** Launch the instance.*/producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (inti=0; i<1000; i++) {
try {
/** Create a message instance, specifying topic, tag and message body.*/Messagemsg=newMessage("TopicTest"/* Topic */,
"TagA"/* Tag */,
                    ("Hello RocketMQ "+i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */                );
/** Call send message to deliver message to one of brokers.*/SendResultsendResult=producer.send(msg);
System.out.printf("%s%n", sendResult);
            } catch (Exceptione) {
e.printStackTrace();
Thread.sleep(1000);
            }
        }
/** Shut down once the producer instance is not longer in use.*/producer.shutdown();
    }
}


微信图片_20221214024156.png

4.启动example中的consumer

/*** This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.* 消费者启动类*/publicclassConsumer {
publicstaticvoidmain(String[] args) throwsInterruptedException, MQClientException {
/** Instantiate with specified consumer group name.*/DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name_4");
/** Specify name server addresses.* <p/>** Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR* <pre>* {@code* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");* }* </pre>*//** Specify where to start in case the specified consumer group is a brand new one.*/consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/** Subscribe one more more topics to consume.*/consumer.subscribe("TopicTest", "*");
/**  Register callback to execute on arrival of messages fetched from brokers.*/consumer.registerMessageListener(newMessageListenerConcurrently() {
@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>msgs,
ConsumeConcurrentlyContextcontext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
/**  Launch the consumer instance.*/consumer.start();
System.out.printf("Consumer Started.%n");
    }
}


微信图片_20221214024045.png

此时你还可以看到你创建好的data里面产生的文件信息:

微信图片_20221214024041.png

可以看到里面包含了提交的日志信息、配置信息、消费者队列、索引、abort、checkpoint、lock。同时这个文件夹比较大,有2.41G。

启动完成,看到生产者和消费者的信息时,说明源码环境构建完成。同时从启动来看,也提供了分析源码和学习源码的方向,首先nameserver,然后broker,接着producer发送消息,producer消费消息。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
8月前
|
消息中间件 JSON 缓存
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
RabbitMQ快速学习之WorkQueues模型、三种交换机、消息转换器(SpringBoot整合)
191 0
|
5月前
|
消息中间件 存储 数据库
深入学习RocketMQ的底层存储设计原理
文章深入探讨了RocketMQ的底层存储设计原理,分析了其如何通过将数据和索引映射到内存、异步刷新磁盘以及消息内容的混合存储来实现高性能的读写操作,从而保证了RocketMQ作为一款低延迟消息队列的读写性能。
|
8月前
|
消息中间件 存储 数据安全/隐私保护
深入学习RabbitMQ五种模式(一)
深入学习RabbitMQ五种模式(一)
84 0
|
7月前
|
消息中间件 IDE 数据库
RocketMQ事务消息学习及刨坑过程
RocketMQ事务消息学习及刨坑过程
|
8月前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
68 0
|
8月前
|
消息中间件 存储 监控
写了10000字:全面学习RocketMQ中间件
以上是 V 哥在授课时整理的全部 RocketMQ 的内容,在学习时重点要理解其中的含义,正所谓知其然知其所以然,希望这篇文章可以帮助兄弟们搞清楚RocketMQ的来龙去脉,必竟这是一个非常常用的分布式应用的中间件,好了,今天的内容就分享到这,我靠!已经 00:36分,建议收藏起来,慢慢消化,创作不易,喜欢请点赞转发。
1127 0
|
8月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
1903 14
|
8月前
|
消息中间件 存储 缓存
消息队列学习之rocketmq
【4月更文挑战第1天】消息队列学习之rocketmq
56 0
|
8月前
|
消息中间件 RocketMQ Docker
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)
107 0