1.1.2 Eclipse调试RocketMQ源码
本节将展示在Eclipse中启动NameServer、Broker,并运行消息发送与消息消费示例程序。
1.启动NameServer
Step1:展开namesrv模块,右键NamesrvStartup.java,移动到Debug As,选中Debug Configurations,弹出Debug Configurations对话框,如图1-14所示。
Step2:选中Java Application条目并单击右键,选择New弹出Debug Configurations对话框,如图1-15所示。
Step3:设置RocketMQ运行主目录。选择Environment选项卡,添加环境变量ROCKET_HOME。
Step4:在RocketMQ运行主目录中创建conf、logs、store三个文件夹,如图1-16所示。
Step5:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml文件复制到conf目录中,logback_namesrv.xml文件则只需修改日志文件的目录,broker.conf文件内容如下所示。
代码清单1-3 broker.conf文件
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
存储路径
storePathRootDir=D:\rocketmq\store
commitLog 存储路径
storePathCommitLog=D:\rocketmq\store\commitlog
消费队列存储路径
storePathConsumeQueue=D:\rocketmq\store\consumequeue
消息索引存储路径
storePathIndex=D:\rocketmq\store\index
checkpoint 文件存储路径
storeCheckpoint=D:\rocketmq\store\checkpoint
abort 文件存储路径
abortFile=D:\rocketmq\store\abort
Step6:在Eclipse Debug中运行NamesrvStartup,并输出“The Name Server boot success. Serializetype=JSON”。
2.启动Broker
Step1:展开broker模块,右键BrokerStartup.java,移动到Debug As,选中Debug Configurations,弹出如图1-17所示的对话框,选择arguments选项卡,配置-c属性指定broker配置文件路径。
Step2:切换选项卡Environment,配置RocketMQ主目录,如图1-18所示。
Step3:以Debug模式运行BrokerStartup.java,查看${ROCKET_HOME}/logs/broker.log文件,未报错则表示启动成功。
代码清单1-4 broker启动日志截图
2018-03-22 20:47:29 INFO main - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:47:29 INFO main - The broker[broker-a, 192.168.1.3:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-03-22 20:47:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:48:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-03-22 20:48:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:49:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
3.使用RocketMQ提供的实例验证消息发送与消息消费
Step1:修改org.apache.rocketmq.example.quickstart.Producer示例程序,设置消息生产者NameServer地址。
代码清单1-5 消息发送示例程序
public class Producer {
public static void main(String[] args) throws MQClientException,
InterruptedException {
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("TopicTest"/* Topic */,"TagA"/* Tag */,
("Hello RocketMQ " + i).getBytes
(RemotingHelper.DEFAULT_CHARSET)/* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
Step2:运行该示例程序,查看运行结果,如果输出代码清单1-6所示结果则表示消息发送成功。
代码清单1-6 消息发送结果
SendResult [sendStatus=SEND_OK, msgId=C0A8010325B46D06D69C70A211400000,
offsetMsgId=C0A8010300002A9F0000000000000000, messageQueue=MessageQueue
[topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
Step3:修改org.apache.rocketmq.example.quickstart.Consumer示例程序,设置消息消费者NameServer地址。
代码清单1-7 消息消费示例程序
public class Consumer {
public static void main(String[] args) throws InterruptedException,
MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
Step4:运行消息消费者程序,如果输出如下所示则表示消息消费成功。
代码清单1-8 消息消费结果
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,
storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1521723269443,
bornHost=/192.168.1.3:57034, storeTimestamp=1521723269510,
storeHost=/192.168.1.3:10911, msgId=C0A8010300002A9F0000000000000000,
commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0,
preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,
properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1521723841419,
UNIQ_KEY=C0A8010325B46D06D69C70A211400000, WAIT=true, TAGS=TagA}, body=16]]]
消息发送与消息消费都成功,则说明RocketMQ调试环境已经成功搭建了,可以直接Debug源码,探知RocketMQ的实现奥秘了。