Linux环境下安装RocketMQ(MetaQ)

简介: 一:RocketMQ简介 RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: 1.能够保证严格的消息顺序 2.提供丰富的消息拉取模式 3.高效的订阅者水平扩展能力 4.实时的消息订阅机制 5.亿级消息堆积能力 二:安装RocketMQ 下载源码 首先我们从githup上获取RocketMQ的源码,目前最新的版本为3.5.8,下载地址为:https://

一:RocketMQ简介

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

1.能够保证严格的消息顺序

2.提供丰富的消息拉取模式

3.高效的订阅者水平扩展能力

4.实时的消息订阅机制

5.亿级消息堆积能力

二:安装RocketMQ

下载源码

首先我们从githup上获取RocketMQ的源码,目前最新的版本为3.5.8,下载地址为:https://github.com/alibaba/RocketMQ/releases 或者 wget  https://github.com/alibaba/RocketMQ/releases/alibaba/RocketMQ/archive/v3.5.8.tar.gz。 请注意:此时我们下载的是源码,直接解压时不能用的,所以我们需要编译之后才能使用。

编译源码

在进行编译源码之前我们需要安装JDK。如果你已经安装过了,请跳过这里。如果你还没有安装过JDK,请参考这篇文章( Linux环境下安装JDK)。然后我们还需要安装一下Maven。Maven的安装还是比较简单,只需要去官方上下载的安装吧,然后直接解压,再配置一下环境变量就OK。接下来我们把刚才下载来的RockeMQ的源码解压到/usr/local/rockemq-source文件夹中。在源码中有一个Install.sh。如图所示:
运行sh install.sh。在编译完成之后,我们只要target目录下的alibaba-rocketmq这个文件夹中内容,把alibaba-rocketmq文件夹中的内容移动到/usr/local/rocketmq中。如果你不想编译的话,可以从这里下载编译之后的rocketmq。( rocketmq3.5.8)。

配置环境变量

接下来我们需要配置一下环境变量。在终端中输入以下命令:vi /etc/profile ,在文件的末尾中添加如下两句话:export rocketmq=/usr/local/rocketmq  export PATH=$PATH:$rocketmq/bin。接下来我们使配置的换将变量生效:source /etc/profile.

三:启动RocketMQ

接下来我们启动一下刚才编译的RocketMQ.在启动之前我们需要修改一下RocketMQ启动的内存大小(如果你的系统内存比较大的话,请忽略)。我们进入到/usr/local/rocketmq/bin中,在终端中输入以下命令修改mqnamesrv的内存大小:vi runserver.sh.修改为如图的内容:
,接下来修改broker的内存大小:vi runbroker.sh:

启动mqnameserver

进入到/usr/local/rocketmq/bin中输入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最后的这个 & 不要少。

启动mqbroker

进入到/usr/local/rocketmq/bin中输入以下命令: nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以换成你刚才启动mqnamesrv的IP。autoCreateTopicEnable=true 这句话不要少了。最后的 & 也不要少了。
我们可以通过 ps aux | grep java命令来查看启动的情况。

到此,rocketmq的安装完毕。
四:RocketMQ的小例子
producer:
package com.zkn.newlearn.rocketmq;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

import java.util.concurrent.TimeUnit;

/**
 * Created by zkn on 2016/10/27.
 */
public class ProducerTest01 {

    public static void main(String[] args) {

        /**
         * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
         * 注意:ProducerGroupName需要由应用来保证唯一<br>
         * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
         * 因为服务器会回查这个Group下的任意一个Producer
         */
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        //producer.setNamesrvAddr("192.168.180.1:9876");
        producer.setNamesrvAddr("192.168.180.133:9876");
        producer.setInstanceName("Producer");
        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         * 注意:切记不可以在每次发送消息时,都调用start方法
         */
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 100; i++) {
            try {
                /**
                 * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
                 * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
                 * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
                 * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
                 */
                {
                    Message msg = new Message("TopicTest1",// topic
                            "TagA",// tag
                            "OrderID001",// key
                            ("Hello MetaQ").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }

                {
                    Message msg = new Message("TopicTest2",
                            "TagB",
                            "OrderID001",
                            ("Hello MetaQ TagB".getBytes()));

                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }

                {
                    Message msg = new Message("TopicTest3",
                            "TagC",
                            "OrderID001",
                            ("Hello MetaQ TagC").getBytes());

                    SendResult sendResult = producer.send(msg);

                    System.out.println(sendResult);
                }

                TimeUnit.MILLISECONDS.sleep(1000);

            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            }
        }
        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出销毁方法里调用shutdown方法
         */
        producer.shutdown();
    }
}
consumer:
package com.zkn.newlearn.rocketmq;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by zkn on 2016/10/27.
 */
public class ConsumerTest01 {

    /**
     * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
     * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
     */
    public static void main(String[] args) {

        /**
         * 注意:ConsumerGroupName需要由应用来保证唯一
         */
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");
        //pushConsumer.setNamesrvAddr("192.168.180.1:9876");
        pushConsumer.setNamesrvAddr("192.168.180.133:9876");
        pushConsumer.setInstanceName("Consumer");
        try {
            /**
             * 订阅指定topic下tags分别等于TagA或TagC或TagD
             * 两个参数:第一个参数是topic第二个参数是tags
             */
            pushConsumer.subscribe("TopicTest1", "TagA || TagC || TagD");
            /**
            * 订阅指定topic下所有消息<br>
            * 注意:一个consumer对象可以订阅多个topic
            */
            //pushConsumer.subscribe("TopicTest2", "*");
            pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());
                    MessageExt messageExt = msgs.get(0);
                    if("TopicTest1".equals(messageExt.getTopic())){
                        // 执行TopicTest1的消费逻辑
                        if (messageExt.getTags() != null && messageExt.getTags().equals("TagA")) {
                            // 执行TagA的消费
                            System.out.println(new String(messageExt.getBody()));
                        }else if(messageExt.getTags() != null && messageExt.getTags().equals("TagB")){
                            System.out.println(new String(messageExt.getBody()));
                        }else if(messageExt.getTags() != null && messageExt.getTags().equals("TagC")) {
                            System.out.println(new String(messageExt.getBody()));
                        }
                    }else if("TopicTest2".equals(messageExt.getTopic())){
                        System.out.println(new String(messageExt.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         */
        try {
            pushConsumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        System.out.println("Consumer Started.");
    }
}
package com.zkn.newlearn.rocketmq;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by zkn on 2016/10/30.
 */
public class ConsumerTest02 extends ConsumerTest01 {

    public static void main(String[] args) {

        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");
        //pushConsumer.setNamesrvAddr("192.168.180.1:9876");
        pushConsumer.setNamesrvAddr("192.168.180.133:9876");
        pushConsumer.setInstanceName("Consumer");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        try {
            pushConsumer.subscribe("TopicTest2", "*");
            pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                 @Override
                  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        MessageExt messageExt = msgs.get(0);
                        if("TopicTest2".equals(messageExt.getTopic())){
                            System.out.println(new String(messageExt.getBody()));
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 }
            }
            );
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        try {
            pushConsumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}







相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
Ubuntu Linux Shell
(已解决)Linux环境—bash: wget: command not found; Docker pull报错Error response from daemon: Get https://registry-1.docker.io/v2/: net/http: request canceled
(已成功解决)Linux环境报错—bash: wget: command not found;常见Linux发行版本,Linux中yum、rpm、apt-get、wget的区别;Docker pull报错Error response from daemon: Get https://registry-1.docker.io/v2/: net/http: request canceled
463 68
(已解决)Linux环境—bash: wget: command not found; Docker pull报错Error response from daemon: Get https://registry-1.docker.io/v2/: net/http: request canceled
|
10天前
|
Ubuntu Java Linux
Linux 安装 Qualcomm ® SnapdragonTM Profiler
通过本文的详细介绍,您应该已经成功在 Linux 系统上安装并配置了 Qualcomm® Snapdragon™ Profiler,并能够连接 Android 设备进行性能分析。Snapdragon Profiler 提供了丰富的工具和功能,可以帮助开发者深入了解应用程序的性能瓶颈,从而进行优化。希望本文能对您有所帮助,让您在开发过程中更高效地使用 Snapdragon Profiler 进行性能分析和优化。
41 10
|
11天前
|
Linux
Linux安装svn并启动
Linux安装svn并启动
48 10
|
1月前
|
Oracle 关系型数据库 Linux
linux8安装oracle 11g遇到的问题记录
Oracle 11g在Linux 8上安装时会遇到link编译环节的问题。官方建议忽略安装中的链接错误,安装完成后应用DBPSU 11.2.0.4.240716补丁及一次性补丁33991024,再重新编译二进制文件,并配置监听器和数据库。但因11g已退出服务期,这些补丁需付费获取。网上信息显示22年1月的PSU补丁也可解决问题,找到该补丁后按常规方式打补丁即可。如有需求或疑问可咨询我。
70 20
|
30天前
|
弹性计算 运维 Ubuntu
os-copilot在Alibaba Cloud Linux镜像下的安装与功能测试
我顺利使用了OS Copilot的 -t -f 功能,我的疑惑是在换行的时候就直接进行提问了,每次只能写一个问题,没法连续换行更有逻辑的输入问题。 我认为 -t 管道 功能有用 ,能解决环境问题的连续性操作。 我认为 -f 管道 功能有用 ,可以单独创建可连续性提问的task问题。 我认为 | 对文件直接理解在新的服务器理解有很大的帮助。 此外,我还有建议 可以在非 co 的环境下也能进行连续性的提问。
71 7
|
2月前
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
2月前
|
Linux Python
Linux 安装python3.7.6
本教程介绍在Linux系统上安装Python 3.7.6的步骤。首先使用`yum`安装依赖环境,包括zlib、openssl等开发库。接着通过`wget`下载Python 3.7.6源码包并解压。创建目标文件夹`/usr/local/python3`后,进入解压目录执行配置、编译和安装命令。最后设置软链接,使`python3`和`pip3`命令生效。
|
2月前
|
Ubuntu Linux
Linux 各发行版安装 ping 命令指南
如何在不同 Linux 发行版(Ubuntu/Debian、CentOS/RHEL/Fedora、Arch Linux、openSUSE、Alpine Linux)上安装 `ping` 命令,详细列出各发行版的安装步骤和验证方法,帮助系统管理员和网络工程师快速排查网络问题。
203 20
|
2月前
|
NoSQL 关系型数据库 MySQL
Linux安装jdk、mysql、redis
Linux安装jdk、mysql、redis
219 7
|
2月前
|
Unix Linux 编译器
UNIX/Linux 上的安装
UNIX/Linux 上的安装。
63 2