1 前言
承接着我的上一篇文章RocketMQ的概念与特性,相信大家对RocketMQ都有了一定地了解,为了进一步深入学习RocketMQ的设计,我们要把它的源码部署起来并启动运行,接下来就一步步让RocketMQ在我们的Mac上跑起来吧!
2 前置条件
部署前,我们的Mac需要具备如下软件:
- JDK 1.8
- Maven 3.2.X+
- IDEA
因为本次的部署需要直接编译RocketMQ源码,构建出RocketMQ可执行包。
3 下载源码
打开RocketMQ在Github上的主页,获取仓库地址,然后在本地电脑上克隆本仓库。
git clone https://github.com/apache/rocketmq.git
4 启动RocketMQ服务器
4.1 启动NameServer
根据RocketMQ的概念,我们知道,NameServer 名称服务充当路由消息的提供者,生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。可以把NameServer理解为RocketMQ的路由中心,它提供轻量级服务发现和路由,主要的作用是存储路由信息,管理broker节点,包括路由的查找、注册和删除。 因此,我们第一步是需要将Name Server启动。
在RocketMQ工程的namesrv包中找到入口类
Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation
这个报错是因为在为nameserver设置相关配置时没有设置成功。报错的代码段如下:
if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); }
这是提示我们,需要配置一个ROCKETMQ_HOME
的环境变量
public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
为此,我们需要将distribution所在的目录作为路由中心的路径/Users/bytedance/IdeaProjects/rocketmq-all-4.9.2/distribution,并在IDEA配置它为环境变量:
再次运行main函数,就会发现启动成功。
4.2 启动Broker Server
根据RocketMQ的概念,我们知道,Broker是消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 因此,接下来,我们需要将Broker Server启动。
前面我们在本地启动了Name Server,我们需要知道Name Server的IP地址和端口号,并告诉Broker Server,Broker Server才能够到指定的Name Server去注册它的信息,从而让生产者或者消费者去Name Server找到Broker Server,并进行消息的生成与消费。
通过阅读代码可以知道,Name Server在启动时会启动一个Netty服务器,用于网络传输,Netty监听的端口号为9876,因此Name Server的监听地址就为:127.0.0.1:9876
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { // ...... final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); // ...... }
所以,我们在启动Broker Server时,配置上对应的Name Server地址以及环境变量
- 启动Broker时需要指定注册的NameServer地址,在启动命令中输入
-n 127.0.0.1:9876
- 环境变量与NameServer的一样,同样是
ROCKETMQ_HOME
最后运行main方法,Broker Server启动成功
到这一步,RocketMQ的路由中心和接收发消息的服务器就启动成功了,也即完成了单机部署,现在,我们可以通过Name Server和Broker Server来进行消息传递了。
5 生产与消费Example
5.1 生产者
package com.zhongger.learn; import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.Serializable; import java.io.UnsupportedEncodingException; /** * @author zhongmingyi * @date 2022/5/14 5:18 下午 */ public class RMQProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("zhongger-producer"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); ZhonggerMessage msgContent = new ZhonggerMessage(); msgContent.setMsg("你好RMQ,我是生产者"); String jsonString = JSON.toJSONString(msgContent); Message msg = new Message("zhongger-topic", jsonString.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("Send result %s", sendResult); producer.shutdown(); } } class ZhonggerMessage implements Serializable { private String msg; public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } @Override public String toString() { return "ZhonggerMessage{" + "msg='" + msg + '\'' + '}'; } }
使用DefaultMQProducer类来创建生产者实例,并指定消息组Group和路由中心地址
启动生产者实例
创建消息,指定Topic(用于区分消息的类别)
发送消息
关闭生产者实例
我上述的例子,是把ZhonggerMessage对象利用JSON序列化为byte数组发送给RocketMQ,发送成功后显示如下:
5.2 消费者
package com.zhongger.learn; import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @author zhongmingyi * @date 2022/5/14 5:24 下午 */ public class RMQConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zhongger-consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("zhongger-topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { msgs.forEach(messageExt -> { ZhonggerMessage zmg = JSON.parseObject(messageExt.getBody(), ZhonggerMessage.class); System.out.println(zmg.getMsg()); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
使用DefaultMQPushConsumer类来创建消费者实例,并指定消息组Group、路由中心地址、消费模式、消息类别。
注册消息监听器,监听消息,消费消息,返回消费成功标识。
我上述的例子,消费者成功消费了生产者的消息,并把byte数组反序列化为ZhonggerMessage对象,最终把结果打印到控制台中,结果符合预期,如下: