RocketMQ源码本地搭建调试

简介: 导入IDEA,可在命令行执行mvn compile一下,保证源码能够正确编译。本次我使用的master分支的版本-4.8.0。下面我们开始准备启动Namesrv。

1 GitHub源码


git clone https://github.com/apache/rocketmq.git


导入IDEA,可在命令行执行mvn compile一下,保证源码能够正确编译。本次我使用的master分支的版本-4.8.0。下面我们开始准备启动Namesrv。


2 启动Namesrv


到namesrv模块找到NamesrvStartup,启动main方法,报错如下:


Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation


需设置环境变量ROCKETMQ_HOME。怎么设置呢?看看报错位置:


// org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController

if (null == namesrvConfig.getRocketmqHome()) {

   System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);

   System.exit(-2);

}


从namesrvConfig.getRocketmqHome中获取的ROCKETMQ_HOME,进一步发现是NamesrvConfig里面的一个属性


// org.apache.rocketmq.common.namesrv.NamesrvConfig

// ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";

// ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));


为方便,可直接在源码NamesrvStartup的main方法第一行

也可设置环境变量ROCKETMQ_HOME

或启动时加入命令行参数 -Drocketmq.home.dir=/path

public static void main(String[] args) {

   System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/Users/apple/doc/IDEAProjects/rocketmq");

   main0(args);

}


再次启动Namesrv,报错如下:

8.png



提示在我们配置的ROCKETMQ_HOME 目录下,没找到conf\logback_namesrv.xml 配置文件。


在项目中搜索logback_namesrv.xml 文件,找到在distribution\conf 目录下存在很多配置,可直接使用这些配置文件。直接把distribution\conf 目录copy到配置的ROCKETMQ_HOME 目录下即可。

然后在来启动一下,控制台打印:



7.png

说明NameSrv启动成功


3 启动Broker


找到BrokerStartup,直接启动:


Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installationDisconnected from the target VM, address: '127.0.0.1:51797', transport: 'socket'


知道咋办了吧。


broker需要连接namrsrv,所以在BrokerStartup 的main方法第一行添加如下两行代码:


public static void main(String[] args) {

  System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/Users/apple/doc/IDEAProjects/rocketmq");

       System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");

   start(createBrokerController(args));

}


再启动:



虽然打印启动成功,再验证是否能成功发送和接收消息。


4 启动Producer


Producer的demo代码:


import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

public class ProducerTest {

   public static void main(String[] args) throws Exception {

       String namesrvAddr = "127.0.0.1:9876";

       String group = "test_group";

       String topic = "test_hello_rocketmq";

       // 构建Producer实例

       DefaultMQProducer producer = new DefaultMQProducer();

       producer.setNamesrvAddr(namesrvAddr);

       producer.setProducerGroup(group);

       // 启动producer

       producer.start();

       // 发送消息

       SendResult result = producer.send(new Message(topic, "hello rocketmq".getBytes()));

       System.out.println(result.getSendStatus());

       // 关闭producer

       producer.shutdown();

   }

}


启动ProducerTest,控制台打印SEND_OK,我们的producer发送消息也OK。


5 启动Consumer


上代码:


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

import java.util.concurrent.TimeUnit;

public class ConsumerTest {

   public static void main(String[] args) throws Exception {

       String namesrvAddr = "127.0.0.1:9876";

       String group = "test_consumer_group";

       String topic = "test_hello_rocketmq";

       // 初始化consumer

       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

       consumer.setNamesrvAddr(namesrvAddr);

       consumer.setConsumerGroup(group);

       // 订阅topic

       consumer.subscribe(topic, (String) null);

       // 设置消费的位置,由于producer已经发送了消息,所以我们设置从第一个开始消费

       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

       // 添加消息监听器

       consumer.registerMessageListener(new MessageListenerOrderly() {

           @Override

           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

               msgs.forEach(msg -> {

                   System.out.println(new String(msg.getBody()));

               });

               return ConsumeOrderlyStatus.SUCCESS;

           }

       });

       // 启动consumer

       consumer.start();

       // 由于是异步消费,所以不能立即关闭,防止消息还未消费到

       TimeUnit.SECONDS.sleep(2);

       consumer.shutdown();

   }

}



启动消费者,能够成功消费到消息,控制台打印hello rocketmq。


6 总结

中间出一些问题,都能根据提示信息在源码中找到相应解决方案,多看源码,是成长捷径。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Java API
RocketMQ事务消息, 图文、源码学习探究~
介绍 RocketMQ是阿里巴巴开源的分布式消息中间件,它是一个高性能、低延迟、可靠的消息队列系统,用于在分布式系统中进行异步通信。 从4.3.0版本开始正式支持分布式事务消息~ RocketMq事务消息支持最终一致性:在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 原理、流程 本质上RocketMq的事务能力是基于二阶段提交来实现的 在消息发送上,将二阶段提交与本地事务绑定 本地事务执行成功,则事务消息成功,可以交由Consumer消费 本地事务执行失败,则事务消息失败,Consumer无法消费 但是,RocketMq只能保证本地事务
|
4月前
|
Java API 网络架构
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码
171 0
关于 Spring Integration 你知道多少,包含集成MQTT案例讲述及源码3
|
3月前
|
消息中间件 Apache 开发工具
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
RocketMQ-初体验RocketMQ(08)-IDEA拉取调测RocketMQ源码
38 0
|
4月前
|
消息中间件 Apache RocketMQ
电子好书发您分享《Apache RocketMQ 源码解析》
电子好书发您分享《Apache RocketMQ 源码解析》
34 1
|
29天前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
12 1
|
29天前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
27 1
|
29天前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
48 1
|
4月前
|
消息中间件 中间件 Kafka
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
RocketMQ源码(二)消息消费的模式到底是Push还是Pull?
82 1
|
4月前
|
消息中间件 负载均衡 算法
RocketMQ源码(三)简单探索Producer和Consumer与Queue之间的负载均衡策略
- Producer如何将消息负载均衡发送给queue? - Consumer如何通过负载均衡并发消费queue的消息?
461 0
|
4月前
|
消息中间件 Apache RocketMQ
电子好书发您分享《Apache RocketMQ 源码解析》
电子好书发您分享《Apache RocketMQ 源码解析》
27 1