RocketMQ的单机部署

简介: RocketMQ的单机部署

1 前言


承接着我的上一篇文章RocketMQ的概念与特性,相信大家对RocketMQ都有了一定地了解,为了进一步深入学习RocketMQ的设计,我们要把它的源码部署起来并启动运行,接下来就一步步让RocketMQ在我们的Mac上跑起来吧!


2 前置条件


部署前,我们的Mac需要具备如下软件:


  • JDK 1.8
  • Maven 3.2.X+
  • IDEA


因为本次的部署需要直接编译RocketMQ源码,构建出RocketMQ可执行包。


3 下载源码


打开RocketMQ在Github上的主页,获取仓库地址,然后在本地电脑上克隆本仓库。


git clone https://github.com/apache/rocketmq.git


4 启动RocketMQ服务器


4.1 启动NameServer


根据RocketMQ的概念,我们知道,NameServer 名称服务充当路由消息的提供者,生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。可以把NameServer理解为RocketMQ的路由中心,它提供轻量级服务发现和路由,主要的作用是存储路由信息,管理broker节点,包括路由的查找、注册和删除。 因此,我们第一步是需要将Name Server启动。


在RocketMQ工程的namesrv包中找到入口类

Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation

这个报错是因为在为nameserver设置相关配置时没有设置成功。报错的代码段如下:

if (null == namesrvConfig.getRocketmqHome()) {
    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
    System.exit(-2);
}

这是提示我们,需要配置一个ROCKETMQ_HOME 的环境变量


public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";


为此,我们需要将distribution所在的目录作为路由中心的路径/Users/bytedance/IdeaProjects/rocketmq-all-4.9.2/distribution,并在IDEA配置它为环境变量:


4de10d99927044a1923acebfcc730ca6.png

再次运行main函数,就会发现启动成功。

fb24aaa24beb46f78af4faa50747093f.png

4.2 启动Broker Server


根据RocketMQ的概念,我们知道,Broker是消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 因此,接下来,我们需要将Broker Server启动。


前面我们在本地启动了Name Server,我们需要知道Name Server的IP地址和端口号,并告诉Broker Server,Broker Server才能够到指定的Name Server去注册它的信息,从而让生产者或者消费者去Name Server找到Broker Server,并进行消息的生成与消费。


通过阅读代码可以知道,Name Server在启动时会启动一个Netty服务器,用于网络传输,Netty监听的端口号为9876,因此Name Server的监听地址就为:127.0.0.1:9876

    public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
       // ......
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
    // ......
    }

所以,我们在启动Broker Server时,配置上对应的Name Server地址以及环境变量

  • 启动Broker时需要指定注册的NameServer地址,在启动命令中输入-n 127.0.0.1:9876
  • 环境变量与NameServer的一样,同样是ROCKETMQ_HOME

ab06a86a8cc743738aaa6dae19960375.png

最后运行main方法,Broker Server启动成功

0287504c475f4348b41cde485d3244b5.png


到这一步,RocketMQ的路由中心和接收发消息的服务器就启动成功了,也即完成了单机部署,现在,我们可以通过Name Server和Broker Server来进行消息传递了。


5 生产与消费Example


5.1 生产者

package com.zhongger.learn;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
/**
 * @author zhongmingyi
 * @date 2022/5/14 5:18 下午
 */
public class RMQProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("zhongger-producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        ZhonggerMessage msgContent = new ZhonggerMessage();
        msgContent.setMsg("你好RMQ,我是生产者");
        String jsonString = JSON.toJSONString(msgContent);
        Message msg = new Message("zhongger-topic", jsonString.getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(msg);
        System.out.printf("Send result %s", sendResult);
        producer.shutdown();
    }
}
class ZhonggerMessage implements Serializable {
    private String msg;
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    @Override
    public String toString() {
        return "ZhonggerMessage{" +
                "msg='" + msg + '\'' +
                '}';
    }
}

使用DefaultMQProducer类来创建生产者实例,并指定消息组Group和路由中心地址

启动生产者实例

创建消息,指定Topic(用于区分消息的类别)

发送消息

关闭生产者实例


我上述的例子,是把ZhonggerMessage对象利用JSON序列化为byte数组发送给RocketMQ,发送成功后显示如下:


9d8e890424324d82a691cd7ff0cf02fc.png

5.2 消费者

package com.zhongger.learn;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
 * @author zhongmingyi
 * @date 2022/5/14 5:24 下午
 */
public class RMQConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zhongger-consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("zhongger-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach(messageExt -> {
                    ZhonggerMessage zmg = JSON.parseObject(messageExt.getBody(), ZhonggerMessage.class);
                    System.out.println(zmg.getMsg());
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

使用DefaultMQPushConsumer类来创建消费者实例,并指定消息组Group、路由中心地址、消费模式、消息类别。

注册消息监听器,监听消息,消费消息,返回消费成功标识。


我上述的例子,消费者成功消费了生产者的消息,并把byte数组反序列化为ZhonggerMessage对象,最终把结果打印到控制台中,结果符合预期,如下:

70acfa300e514acbaf4cdf16a4b89a43.png

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
768 5
|
3月前
|
消息中间件 监控 Docker
Docker环境下快速部署RabbitMQ教程。
就这样,你成功地用魔法召唤出了RabbitMQ,还把它和你的应用程序连接了起来。现在,消息会像小溪流水一样,在你的系统中自由流淌。别忘了,兔子们不喜欢孤独,他们需要你细心的关怀,不时地监控它们,确保他们的世界运转得井井有条。
221 18
|
3月前
|
消息中间件 存储 Kafka
一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。
1861 8
 一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性
|
7月前
|
消息中间件 监控 RocketMQ
Docker部署RocketMQ5.2.0集群
本文详细介绍了如何使用Docker和Docker Compose部署RocketMQ 5.2.0集群。通过创建配置文件、启动集群和验证容器状态,您可以快速搭建起一个RocketMQ集群环境。希望本文能够帮助您更好地理解和应用RocketMQ,提高消息中间件的部署和管理效率。
942 91
|
5月前
|
消息中间件 Java 中间件
RocketMQ实战—2.RocketMQ集群生产部署
本文主要介绍了大纲什么是消息中间件、消息中间件的技术选型、RocketMQ的架构原理和使用方式、消息中间件路由中心的架构原理、Broker的主从架构原理、高可用的消息中间件生产部署架构、部署一个小规模的RocketMQ集群进行压测、如何对RocketMQ集群进行可视化的监控和管理、进行OS内核参数和JVM参数的调整、如何对小规模RocketMQ集群进行压测、消息中间件集群生产部署规划梳理。
RocketMQ实战—2.RocketMQ集群生产部署
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
395 3
|
9月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
136 6
|
11月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
429 2
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
NoSQL 关系型数据库 Redis
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
mall在linux环境下的部署(基于Docker容器),docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongodb、minio详细教程,拉取镜像、运行容器
mall在linux环境下的部署(基于Docker容器),Docker安装mysql、redis、nginx、rabbitmq、elasticsearch、logstash、kibana、mongo
|
11月前
|
消息中间件
实践部署《云消息队列RabbitMQ实践》测评
《云消息队列RabbitMQ实践》解决方案原理清晰,尤其在异步通信和解耦方面解释详尽。对初学者而言,部分术语如消息持久化、确认机制及集群性能优化可更细致。部署过程文档详实,涵盖主要环节,但插件配置等细节存在环境问题,需查阅社区资料解决。该方案展示了RabbitMQ的高吞吐量、灵活路由和可靠消息传递能力,但在高可用性和消息丢失处理上可提供更深入配置建议。适用于高并发和解耦场景,如订单处理、日志收集,有助于提升系统可扩展性。总体部署体验良好,实用性较强。
125 0