RocketMQ源码本地搭建调试

简介: 导入IDEA,可在命令行执行mvn compile一下,保证源码能够正确编译。本次我使用的master分支的版本-4.8.0。下面我们开始准备启动Namesrv。

1 GitHub源码


git clone https://github.com/apache/rocketmq.git


导入IDEA,可在命令行执行mvn compile一下,保证源码能够正确编译。本次我使用的master分支的版本-4.8.0。下面我们开始准备启动Namesrv。


2 启动Namesrv


到namesrv模块找到NamesrvStartup,启动main方法,报错如下:


Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation


需设置环境变量ROCKETMQ_HOME。怎么设置呢?看看报错位置:


// org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController

if (null == namesrvConfig.getRocketmqHome()) {

   System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);

   System.exit(-2);

}


从namesrvConfig.getRocketmqHome中获取的ROCKETMQ_HOME,进一步发现是NamesrvConfig里面的一个属性


// org.apache.rocketmq.common.namesrv.NamesrvConfig

// ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";

// ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));


为方便,可直接在源码NamesrvStartup的main方法第一行

也可设置环境变量ROCKETMQ_HOME

或启动时加入命令行参数 -Drocketmq.home.dir=/path

public static void main(String[] args) {

   System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/Users/apple/doc/IDEAProjects/rocketmq");

   main0(args);

}


再次启动Namesrv,报错如下:

8.png



提示在我们配置的ROCKETMQ_HOME 目录下,没找到conf\logback_namesrv.xml 配置文件。


在项目中搜索logback_namesrv.xml 文件,找到在distribution\conf 目录下存在很多配置,可直接使用这些配置文件。直接把distribution\conf 目录copy到配置的ROCKETMQ_HOME 目录下即可。

然后在来启动一下,控制台打印:



7.png

说明NameSrv启动成功


3 启动Broker


找到BrokerStartup,直接启动:


Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installationDisconnected from the target VM, address: '127.0.0.1:51797', transport: 'socket'


知道咋办了吧。


broker需要连接namrsrv,所以在BrokerStartup 的main方法第一行添加如下两行代码:


public static void main(String[] args) {

  System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/Users/apple/doc/IDEAProjects/rocketmq");

       System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");

   start(createBrokerController(args));

}


再启动:



虽然打印启动成功,再验证是否能成功发送和接收消息。


4 启动Producer


Producer的demo代码:


import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

public class ProducerTest {

   public static void main(String[] args) throws Exception {

       String namesrvAddr = "127.0.0.1:9876";

       String group = "test_group";

       String topic = "test_hello_rocketmq";

       // 构建Producer实例

       DefaultMQProducer producer = new DefaultMQProducer();

       producer.setNamesrvAddr(namesrvAddr);

       producer.setProducerGroup(group);

       // 启动producer

       producer.start();

       // 发送消息

       SendResult result = producer.send(new Message(topic, "hello rocketmq".getBytes()));

       System.out.println(result.getSendStatus());

       // 关闭producer

       producer.shutdown();

   }

}


启动ProducerTest,控制台打印SEND_OK,我们的producer发送消息也OK。


5 启动Consumer


上代码:


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

import java.util.concurrent.TimeUnit;

public class ConsumerTest {

   public static void main(String[] args) throws Exception {

       String namesrvAddr = "127.0.0.1:9876";

       String group = "test_consumer_group";

       String topic = "test_hello_rocketmq";

       // 初始化consumer

       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

       consumer.setNamesrvAddr(namesrvAddr);

       consumer.setConsumerGroup(group);

       // 订阅topic

       consumer.subscribe(topic, (String) null);

       // 设置消费的位置,由于producer已经发送了消息,所以我们设置从第一个开始消费

       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

       // 添加消息监听器

       consumer.registerMessageListener(new MessageListenerOrderly() {

           @Override

           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

               msgs.forEach(msg -> {

                   System.out.println(new String(msg.getBody()));

               });

               return ConsumeOrderlyStatus.SUCCESS;

           }

       });

       // 启动consumer

       consumer.start();

       // 由于是异步消费,所以不能立即关闭,防止消息还未消费到

       TimeUnit.SECONDS.sleep(2);

       consumer.shutdown();

   }

}



启动消费者,能够成功消费到消息,控制台打印hello rocketmq。


6 总结

中间出一些问题,都能根据提示信息在源码中找到相应解决方案,多看源码,是成长捷径。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
75 12
|
2月前
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
40 2
|
7月前
|
消息中间件 数据可视化 Go
Rabbitmq 搭建使用案例 [附源码]
Rabbitmq 搭建使用案例 [附源码]
53 0
|
3月前
|
传感器 数据可视化 网络协议
DIY可视化整合MQTT生成UniApp源码
DIY可视化整合MQTT生成UniApp源码
54 0
|
8月前
|
消息中间件 小程序 网络性能优化
蓝易云 - 直播小程序源码有用的协议知识:MQTT协
在直播小程序源码中,MQTT协议可以用于实现实时消息推送,如弹幕、聊天消息、礼物信息等。通过使用MQTT协议,可以确保消息的实时性和可靠性,从而提高用户体验。
198 0
|
8月前
|
消息中间件 Java 调度
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
【深度挖掘RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
69 1
|
8月前
|
消息中间件 Java RocketMQ
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」抽丝剥茧贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)
55 1
|
8月前
|
消息中间件 存储 NoSQL
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
【深度挖掘 RocketMQ底层源码】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
65 1
|
8月前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
141 1
|
8月前
|
消息中间件 存储 负载均衡
精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)
精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)
83 1