高性能消息中间件 RabbitMQ(二)

简介: 高性能消息中间件 RabbitMQ

2.3 账户管理

guest账户默认只允许本地使用,我们可以创建新账户远程访问RabbitMQ(如2.2中),但是不推荐远程使用MQ.

1、创建账户

# 创建账户
rabbitmqctl add_user MQzhang(用户名) MQzhang(密码)

2、给用户授予管理员角色

rabbitmqctl set_user_tags 用户名 administrator

3、给用户授权

# "/"表示虚拟机
# zj表示用户名
# ".*" ".*" ".*" 表示完整权限
rabbitmqctl set_permissions -p "/" MQzhang".*" ".*" ".*"

4、通过管控台访问rabbitmq即可。

2.4 管控台

2.5 Docker安装

1、关闭RabbitMQ服务

rabbitmqctl stop

2、在Centos7中安装docker

# 安装Docker
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
# 启动docker
systemctl start docker

3、拉取镜像

docker pull rabbitmq

4、启动MQ

docker run -d --hostname zj--name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

三、RabbitMQ工作模式

RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用)

3.1 简单模式

简介

特点

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。

项目搭建

接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。

JMS

由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

1、启动RabbitMQ

# 开启管控台插件
rabbitmq-plugins enable rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

2、创建普通maven项目,添加RabbitMQ依赖:

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
  </dependency>
</dependencies>

3、编写生产者

package com.zj.mq.Simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数*/
        channel.queueDeclare("simpleQueue",false,false,false,null);
        //5.发送消息
        String msg ="hello rabbitMQ";
        /*
         * 参数1:交换机名,""表示默认交换机
         * 参数2:路由键,简单模式就是队列名
         * 参数3:其他额外参数
         * 参数4:要传递的消息字节数组
         */
        channel.basicPublish("","simpleQueue",null,msg.getBytes());
        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
        System.out.println("OK");
    }
}

4.编写消费者

package com.zj.mq.Simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("simpleQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接受消息为:"+message);
            }
        });
    }
}

消费者随时在监听队列只要队列有消息就会被消费。

3.2 工作队列模式

与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:

  1. 一个队列对应多个消费者。
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者。

1、编写生产者,并产生大量消息

package com.zj.mq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*生产者*/
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.建立信道
        Channel channel = connection.createChannel();
        //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct)
        /* 参数1:队列名
         * 参数2:是否持久化,true表示MQ重启后队列还在。
         * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
         * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
         * 参数5:其他额外参数*/
        channel.queueDeclare("WorkQueue",false,false,false,null);
        //5.发送大量消息
        for (int i = 0; i < 100; i++) {
            /*
             * 参数1:交换机名,""表示默认交换机
             * 参数2:路由键,简单模式就是队列名
             * 参数3:表示该消息是持久化消息,即保存到内存也会保存到磁盘
             * 参数4:要传递的消息字节数组
             */
            channel.basicPublish("","WorkQueue", MessageProperties.PERSISTENT_TEXT_PLAIN,("这是第"+i+"个消息").getBytes());
        }
        //6.关闭资源(信道和连接)
        channel.close();
        connection.close();
    }
}

2.编写消费者

编写三个消费者,他们都监听的是一个队列。

package com.zj.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者1接受消息为:"+message);
            }
        });
    }
}
package com.zj.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者2接受消息为:"+message);
            }
        });
    }
}
package com.zj.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*消费者*/
public class Consumer3 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.66.100");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("MQzhang");
        connectionFactory.setPassword("MQzhang");
        connectionFactory.setVirtualHost("/");
        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //3.创建信道
        Channel channel = connection.createChannel();
        //4.监听队列(一直在连接不会关闭连接)
        /*
        * 参数一:监听的队列名
        * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。
        * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。
        * */
        channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者3接受消息为:"+message);
            }
        });
    }
}
消费者1接受消息为:这是第0个消息
消费者1接受消息为:这是第3个消息
消费者1接受消息为:这是第6个消息
消费者1接受消息为:这是第9个消息
消费者1接受消息为:这是第12个消息
消费者1接受消息为:这是第15个消息
消费者1接受消息为:这是第18个消息
消费者1接受消息为:这是第21个消息
……
消费者2接受消息为:这是第1个消息
消费者2接受消息为:这是第4个消息
消费者2接受消息为:这是第7个消息
消费者2接受消息为:这是第10个消息
消费者2接受消息为:这是第13个消息
消费者2接受消息为:这是第16个消息
消费者2接受消息为:这是第19个消息
消费者2接受消息为:这是第22个消息
消费者2接受消息为:这是第25个消息
……
消费者3接受消息为:这是第2个消息
消费者3接受消息为:这是第5个消息
消费者3接受消息为:这是第8个消息
消费者3接受消息为:这是第11个消息
消费者3接受消息为:这是第14个消息
消费者3接受消息为:这是第17个消息
消费者3接受消息为:这是第20个消息
消费者3接受消息为:这是第23个消息
消费者3接受消息为:这是第26个消息
消费者3接受消息为:这是第29个消息
消费者3接受消息为:这是第32个消息


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 存储 负载均衡
消息中间件的选择:RabbitMQ是一个明智的选择
消息中间件的选择:RabbitMQ是一个明智的选择
52 0
|
6月前
|
消息中间件 数据库
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
51 0
|
6月前
|
消息中间件
消息中间件系列教程(17) -RabbitMQ-死信队列
消息中间件系列教程(17) -RabbitMQ-死信队列
130 0
|
6月前
|
消息中间件
消息中间件系列教程(16) -RabbitMQ-应答模式
消息中间件系列教程(16) -RabbitMQ-应答模式
40 0
|
6月前
|
消息中间件
消息中间件系列教程(15) -RabbitMQ-基于全局消息ID解决幂等性问题
消息中间件系列教程(15) -RabbitMQ-基于全局消息ID解决幂等性问题
50 0
|
6月前
|
消息中间件 缓存 API
消息中间件系列教程(14) -RabbitMQ-自动补偿机制
消息中间件系列教程(14) -RabbitMQ-自动补偿机制
101 0
|
6月前
|
消息中间件 Java Maven
消息中间件系列教程(13) -RabbitMQ-SpringBoot集成RabbitMQ
消息中间件系列教程(13) -RabbitMQ-SpringBoot集成RabbitMQ
44 0
|
2月前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 物联网 Java
MQTT常见问题之微消息队列配置失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列

热门文章

最新文章