高性能消息中间件 RabbitMQ(二)

简介: 高性能消息中间件 RabbitMQ

2.3 账户管理

guest账户默认只允许本地使用,我们可以创建新账户远程访问RabbitMQ(如2.2中),但是不推荐远程使用MQ.

1、创建账户

# 创建账户
rabbitmqctl add_user MQzhang(用户名) MQzhang(密码)

2、给用户授予管理员角色

rabbitmqctl set_user_tags 用户名 administrator

3、给用户授权

# "/"表示虚拟机
# zj表示用户名
# ".*" ".*" ".*" 表示完整权限
rabbitmqctl set_permissions -p "/" MQzhang".*" ".*" ".*"

4、通过管控台访问rabbitmq即可。

2.4 管控台

2.5 Docker安装

1、关闭RabbitMQ服务

rabbitmqctl stop

2、在Centos7中安装docker

# 安装Docker
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
# 启动docker
systemctl start docker

3、拉取镜像

docker pull rabbitmq

4、启动MQ

docker run -d --hostname zj--name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

三、RabbitMQ工作模式

RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用)

3.1 简单模式

简介

特点

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。

项目搭建

接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。

JMS

由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

1、启动RabbitMQ

# 开启管控台插件
rabbitmq-plugins enable rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

2、创建普通maven项目,添加RabbitMQ依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
  </dependency>
</dependencies>

3、编写生产者

package com.zj.mq.Simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数*/
        channel.queueDeclare("simpleQueue",false,false,false,null);
        //5.发送消息
        String msg ="hello rabbitMQ";
        /*
         * 参数1:交换机名,""表示默认交换机
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
        channel.basicPublish("","simpleQueue",null,msg.getBytes());
        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
        System.out.println("OK");
    }
}

4.编写消费者

package com.zj.mq.Simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("simpleQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接受消息为:"+message);
            }
        });
    }
}

消费者随时在监听队列只要队列有消息就会被消费。

3.2 工作队列模式

与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者。

1、编写生产者,并产生大量消息

package com.zj.mq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数*/
        channel.queueDeclare("WorkQueue",false,false,false,null);
        //5.发送大量消息
        for (int i = 0; i < 100; i++) {
            /*
             * 参数1:交换机名,""表示默认交换机
             * 参数2:路由键,简单模式就是队列名
             * 参数3:表示该消息是持久化消息,即保存到内存也会保存到磁盘
             * 参数4:要传递的消息字节数组
             */
            channel.basicPublish("","WorkQueue", MessageProperties.PERSISTENT_TEXT_PLAIN,("这是第"+i+"个消息").getBytes());
        }
        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
    }
}

2.编写消费者

编写三个消费者,他们都监听的是一个队列。

package com.zj.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者1接受消息为:"+message);
            }
        });
    }
}
package com.zj.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者2接受消息为:"+message);
            }
        });
    }
}
package com.zj.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer3 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者3接受消息为:"+message);
            }
        });
    }
}
消费者1接受消息为:这是第0个消息
消费者1接受消息为:这是第3个消息
消费者1接受消息为:这是第6个消息
消费者1接受消息为:这是第9个消息
消费者1接受消息为:这是第12个消息
消费者1接受消息为:这是第15个消息
消费者1接受消息为:这是第18个消息
消费者1接受消息为:这是第21个消息
……
消费者2接受消息为:这是第1个消息
消费者2接受消息为:这是第4个消息
消费者2接受消息为:这是第7个消息
消费者2接受消息为:这是第10个消息
消费者2接受消息为:这是第13个消息
消费者2接受消息为:这是第16个消息
消费者2接受消息为:这是第19个消息
消费者2接受消息为:这是第22个消息
消费者2接受消息为:这是第25个消息
……
消费者3接受消息为:这是第2个消息
消费者3接受消息为:这是第5个消息
消费者3接受消息为:这是第8个消息
消费者3接受消息为:这是第11个消息
消费者3接受消息为:这是第14个消息
消费者3接受消息为:这是第17个消息
消费者3接受消息为:这是第20个消息
消费者3接受消息为:这是第23个消息
消费者3接受消息为:这是第26个消息
消费者3接受消息为:这是第29个消息
消费者3接受消息为:这是第32个消息


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
23天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
1月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
81 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
16天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
6月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
109 0
|
5月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1347 0
|
4月前
|
消息中间件 编解码 Docker
Docker部署RabbitMQ消息中间件
【7月更文挑战第4天】Docker部署RabbitMQ消息中间件
275 3
|
3月前
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
3月前
|
消息中间件 Docker 容器
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】
这篇文章提供了RabbitMQ的安装和基本使用教程,包括如何使用Docker拉取RabbitMQ镜像、创建容器、通过浏览器访问管理界面,以及如何创建交换机、队列、绑定和使用direct、fanout和topic三种类型的交换器进行消息发布和接收的测试。
消息中间件RabbitMQ---Docker安装RabbitMQ、以及RabbitMQ的基本使用【二】