2.3 账户管理
guest账户默认只允许本地使用,我们可以创建新账户远程访问RabbitMQ(如2.2中),但是不推荐远程使用MQ.
1、创建账户
# 创建账户 rabbitmqctl add_user MQzhang(用户名) MQzhang(密码)
2、给用户授予管理员角色
rabbitmqctl set_user_tags 用户名 administrator
3、给用户授权
# "/"表示虚拟机 # zj表示用户名 # ".*" ".*" ".*" 表示完整权限 rabbitmqctl set_permissions -p "/" MQzhang".*" ".*" ".*"
4、通过管控台访问rabbitmq即可。
2.4 管控台
2.5 Docker安装
1、关闭RabbitMQ服务
rabbitmqctl stop
2、在Centos7中安装docker
# 安装Docker curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun # 启动docker systemctl start docker
3、拉取镜像
docker pull rabbitmq
4、启动MQ
docker run -d --hostname zj--name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
三、RabbitMQ工作模式
RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用)
3.1 简单模式
简介
特点
- 一个生产者对应一个消费者,通过队列进行消息传递。
- 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。
项目搭建
接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。
1、启动RabbitMQ
# 开启管控台插件 rabbitmq-plugins enable rabbitmq_management # 启动rabbitmq rabbitmq-server -detached
2、创建普通maven项目,添加RabbitMQ依赖:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.0</version> </dependency> </dependencies>
3、编写生产者
package com.zj.mq.Simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /*生产者*/ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.建立信道 Channel channel = connection.createChannel(); //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct) /* 参数1:队列名 * 参数2:是否持久化,true表示MQ重启后队列还在。 * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问 * 参数4:是否自动删除,true表示不再使用队列时自动删除队列 * 参数5:其他额外参数*/ channel.queueDeclare("simpleQueue",false,false,false,null); //5.发送消息 String msg ="hello rabbitMQ"; /* * 参数1:交换机名,""表示默认交换机 * 参数2:路由键,简单模式就是队列名 * 参数3:其他额外参数 * 参数4:要传递的消息字节数组 */ channel.basicPublish("","simpleQueue",null,msg.getBytes()); //6.关闭资源(信道和连接) channel.close(); connection.close(); System.out.println("OK"); } }
4.编写消费者
package com.zj.mq.Simple; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /*消费者*/ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.监听队列(一直在连接不会关闭连接) /* * 参数一:监听的队列名 * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。 * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。 * */ channel.basicConsume("simpleQueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接受消息为:"+message); } }); } }
消费者随时在监听队列只要队列有消息就会被消费。
3.2 工作队列模式
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:
- 一个队列对应多个消费者。
- 一条消息只会被一个消费者消费。
- 消息队列默认采用轮询的方式将消息平均发送给消费者。
1、编写生产者,并产生大量消息
package com.zj.mq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.util.concurrent.TimeoutException; /*生产者*/ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.建立信道 Channel channel = connection.createChannel(); //4.创建队列(如果队列已经存在的话则使用该队列,也就是说队列只会创建一次)和交换机(简单模式下使用的是默认交换机direct) /* 参数1:队列名 * 参数2:是否持久化,true表示MQ重启后队列还在。 * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问 * 参数4:是否自动删除,true表示不再使用队列时自动删除队列 * 参数5:其他额外参数*/ channel.queueDeclare("WorkQueue",false,false,false,null); //5.发送大量消息 for (int i = 0; i < 100; i++) { /* * 参数1:交换机名,""表示默认交换机 * 参数2:路由键,简单模式就是队列名 * 参数3:表示该消息是持久化消息,即保存到内存也会保存到磁盘 * 参数4:要传递的消息字节数组 */ channel.basicPublish("","WorkQueue", MessageProperties.PERSISTENT_TEXT_PLAIN,("这是第"+i+"个消息").getBytes()); } //6.关闭资源(信道和连接) channel.close(); connection.close(); } }
2.编写消费者
编写三个消费者,他们都监听的是一个队列。
package com.zj.mq.work; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /*消费者*/ public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.监听队列(一直在连接不会关闭连接) /* * 参数一:监听的队列名 * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。 * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。 * */ channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("消费者1接受消息为:"+message); } }); } }
package com.zj.mq.work; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /*消费者*/ public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.监听队列(一直在连接不会关闭连接) /* * 参数一:监听的队列名 * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。 * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。 * */ channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("消费者2接受消息为:"+message); } }); } }
package com.zj.mq.work; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /*消费者*/ public class Consumer3 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.66.100"); connectionFactory.setPort(5672); connectionFactory.setUsername("MQzhang"); connectionFactory.setPassword("MQzhang"); connectionFactory.setVirtualHost("/"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.监听队列(一直在连接不会关闭连接) /* * 参数一:监听的队列名 * 参数二:是否自动签收(消费完消息后自动告诉MQ消息消费完了),如果设置为false需要手动确认消息,否则MQ会一直发送消息。 * 参数三:Consumer的实现类,重写该类方法表示接受到消息后如何消费,body就是消息的字节数组。 * */ channel.basicConsume("WorkQueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("消费者3接受消息为:"+message); } }); } }
消费者1接受消息为:这是第0个消息 消费者1接受消息为:这是第3个消息 消费者1接受消息为:这是第6个消息 消费者1接受消息为:这是第9个消息 消费者1接受消息为:这是第12个消息 消费者1接受消息为:这是第15个消息 消费者1接受消息为:这是第18个消息 消费者1接受消息为:这是第21个消息 …… 消费者2接受消息为:这是第1个消息 消费者2接受消息为:这是第4个消息 消费者2接受消息为:这是第7个消息 消费者2接受消息为:这是第10个消息 消费者2接受消息为:这是第13个消息 消费者2接受消息为:这是第16个消息 消费者2接受消息为:这是第19个消息 消费者2接受消息为:这是第22个消息 消费者2接受消息为:这是第25个消息 …… 消费者3接受消息为:这是第2个消息 消费者3接受消息为:这是第5个消息 消费者3接受消息为:这是第8个消息 消费者3接受消息为:这是第11个消息 消费者3接受消息为:这是第14个消息 消费者3接受消息为:这是第17个消息 消费者3接受消息为:这是第20个消息 消费者3接受消息为:这是第23个消息 消费者3接受消息为:这是第26个消息 消费者3接受消息为:这是第29个消息 消费者3接受消息为:这是第32个消息