RocketMQ的单机部署

简介: RocketMQ的单机部署

1 前言


承接着我的上一篇文章RocketMQ的概念与特性,相信大家对RocketMQ都有了一定地了解,为了进一步深入学习RocketMQ的设计,我们要把它的源码部署起来并启动运行,接下来就一步步让RocketMQ在我们的Mac上跑起来吧!


2 前置条件


部署前,我们的Mac需要具备如下软件:


  • JDK 1.8
  • Maven 3.2.X+
  • IDEA


因为本次的部署需要直接编译RocketMQ源码,构建出RocketMQ可执行包。


3 下载源码


打开RocketMQ在Github上的主页,获取仓库地址,然后在本地电脑上克隆本仓库。


git clone https://github.com/apache/rocketmq.git


4 启动RocketMQ服务器


4.1 启动NameServer


根据RocketMQ的概念,我们知道,NameServer 名称服务充当路由消息的提供者,生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。可以把NameServer理解为RocketMQ的路由中心,它提供轻量级服务发现和路由,主要的作用是存储路由信息,管理broker节点,包括路由的查找、注册和删除。 因此,我们第一步是需要将Name Server启动。


在RocketMQ工程的namesrv包中找到入口类

Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation

这个报错是因为在为nameserver设置相关配置时没有设置成功。报错的代码段如下:

if (null == namesrvConfig.getRocketmqHome()) {
    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
    System.exit(-2);
}

这是提示我们,需要配置一个ROCKETMQ_HOME 的环境变量


public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";


为此,我们需要将distribution所在的目录作为路由中心的路径/Users/bytedance/IdeaProjects/rocketmq-all-4.9.2/distribution,并在IDEA配置它为环境变量:


4de10d99927044a1923acebfcc730ca6.png

再次运行main函数,就会发现启动成功。

fb24aaa24beb46f78af4faa50747093f.png

4.2 启动Broker Server


根据RocketMQ的概念,我们知道,Broker是消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 因此,接下来,我们需要将Broker Server启动。


前面我们在本地启动了Name Server,我们需要知道Name Server的IP地址和端口号,并告诉Broker Server,Broker Server才能够到指定的Name Server去注册它的信息,从而让生产者或者消费者去Name Server找到Broker Server,并进行消息的生成与消费。


通过阅读代码可以知道,Name Server在启动时会启动一个Netty服务器,用于网络传输,Netty监听的端口号为9876,因此Name Server的监听地址就为:127.0.0.1:9876

    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
       // ......
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
    // ......
    }

所以,我们在启动Broker Server时,配置上对应的Name Server地址以及环境变量

  • 启动Broker时需要指定注册的NameServer地址,在启动命令中输入-n 127.0.0.1:9876
  • 环境变量与NameServer的一样,同样是ROCKETMQ_HOME

ab06a86a8cc743738aaa6dae19960375.png

最后运行main方法,Broker Server启动成功

0287504c475f4348b41cde485d3244b5.png


到这一步,RocketMQ的路由中心和接收发消息的服务器就启动成功了,也即完成了单机部署,现在,我们可以通过Name Server和Broker Server来进行消息传递了。


5 生产与消费Example


5.1 生产者

package com.zhongger.learn;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
/**
 * @author zhongmingyi
 * @date 2022/5/14 5:18 下午
 */
public class RMQProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("zhongger-producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        ZhonggerMessage msgContent = new ZhonggerMessage();
        msgContent.setMsg("你好RMQ,我是生产者");
        String jsonString = JSON.toJSONString(msgContent);
        Message msg = new Message("zhongger-topic", jsonString.getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(msg);
        System.out.printf("Send result %s", sendResult);
        producer.shutdown();
    }
}
class ZhonggerMessage implements Serializable {
    private String msg;
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    @Override
    public String toString() {
        return "ZhonggerMessage{" +
                "msg='" + msg + '\'' +
                '}';
    }
}

使用DefaultMQProducer类来创建生产者实例,并指定消息组Group和路由中心地址

启动生产者实例

创建消息,指定Topic(用于区分消息的类别)

发送消息

关闭生产者实例


我上述的例子,是把ZhonggerMessage对象利用JSON序列化为byte数组发送给RocketMQ,发送成功后显示如下:


9d8e890424324d82a691cd7ff0cf02fc.png

5.2 消费者

package com.zhongger.learn;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
 * @author zhongmingyi
 * @date 2022/5/14 5:24 下午
 */
public class RMQConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zhongger-consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("zhongger-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach(messageExt -> {
                    ZhonggerMessage zmg = JSON.parseObject(messageExt.getBody(), ZhonggerMessage.class);
                    System.out.println(zmg.getMsg());
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

使用DefaultMQPushConsumer类来创建消费者实例,并指定消息组Group、路由中心地址、消费模式、消息类别。

注册消息监听器,监听消息,消费消息,返回消费成功标识。


我上述的例子,消费者成功消费了生产者的消息,并把byte数组反序列化为ZhonggerMessage对象,最终把结果打印到控制台中,结果符合预期,如下:

70acfa300e514acbaf4cdf16a4b89a43.png

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 Shell
Docker部署RabbitMQ配置日志映射本地文件
Docker部署RabbitMQ配置日志映射本地文件
152 0
|
7月前
|
消息中间件 Shell RocketMQ
百度搜索:蓝易云 ,常用环境部署—Docker安装RocketMQ教程!
通过按照上述步骤,您可以在Docker中成功安装和部署RocketMQ。请注意,上述命令仅提供了一个基本的安装和配置过程,具体配置和使用可以根据您的需求进行进一步调整。确保在执行命令之前,您已经安装并配置好Docker环境。
125 0
|
8月前
|
消息中间件 数据安全/隐私保护 Docker
Docker部署RabbitMQ
Docker部署RabbitMQ
|
2天前
|
消息中间件 Linux RocketMQ
【RocketMq】RocketMq 4.9.4 Windows-docker 部署
【RocketMq】RocketMq 4.9.4 Windows-docker 部署
185 0
【RocketMq】RocketMq 4.9.4 Windows-docker 部署
|
7月前
|
消息中间件 存储 Kubernetes
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
306 1
|
7月前
|
消息中间件 Java Apache
RocketMQ5.0 搭建 Name Server And Broker+Proxy 同进程部署、搭建RocketMQ控制台图形化界面
RocketMQ5.0 搭建 Name Server And Broker+Proxy 同进程部署、搭建RocketMQ控制台图形化界面
768 0
|
2天前
|
消息中间件 存储 Java
RocketMQ部署文档
RocketMQ部署文档
|
2天前
|
消息中间件 Kubernetes Docker
KubeSphere 核心实战之三【在kubesphere平台上部署ElasticSearch、应用商店部署RabbitMQ和应用市场部署Zookeeper】(实操篇 3/4)
KubeSphere 核心实战之三【在kubesphere平台上部署ElasticSearch、应用商店部署RabbitMQ和应用市场部署Zookeeper】(实操篇 3/4)
50 0
|
6月前
|
消息中间件 Java 开发工具
使用 Docker Compose 部署 RabbitMQ 的一些经验与踩坑记录
使用 Docker Compose 部署 RabbitMQ 的一些经验与踩坑记录
|
7月前
|
消息中间件 NoSQL 关系型数据库
【Kubernetes部署Shardingsphere、Mycat、Mysql、Redis、中间件Rocketmq、Rabbitmq、Nacos】
【Kubernetes部署Shardingsphere、Mycat、Mysql、Redis、中间件Rocketmq、Rabbitmq、Nacos】
143 0