一、消息队列概述
1、简介:
消息队列是用来发送消息的消息中间件,本质上是队列,有先进先出的特点。
2、功能:
服务削峰
程序解耦
异步消息
3、MQ分类:
RocketMQ
Kafka
RabbitMQ
4、RabbitMQ安装
(一)安装并运行
(1)、在docker hub 中查找rabbitmq镜像
docker search rabbitmq:3-management
(2)、从docker hub 中拉取rabbitmq镜像
docker pull rabbitmq:3-management
(3)、查看拉取的rabbitmq镜像
docker images
(4)、运行 rabbitmq服务端
docker run -d \ -v /opt/rabbitmq/data:/var/lib/rabbitmq \ -p 5672:5672 -p 15672:15672 --name rabbitmq --restart=always \ --hostname myRabbit rabbitmq:3-management =
参数解释:
docker run :启动命令 --name :给容器起名字 -p :★映射端口号,主机端口:容器端口 -v : 将主机中指定目录的挂载到容器的目录 -i : 以交互模式运行。 -t : 进入终端。 -d : 以守护模式后台运行。 -e XXX_XXX="xxxxxxxxxxx" : 指定环境变量
(5)、查看正在运行的容器
docker ps
(6)、容器运行成功之后,在浏览器访问:
账号 guest , 密码 guest
(二)其他操作:
(1)、重新启动 rabbitmq 容器
docker restart <容器id>
(2)、结束正在运行的容器
docker stop <容器id> 容器优雅退出 docker kill <容器id> 容器直接退出
(3)、删除 docker 容器 (容器在删除前要先结束)
docker rm <容器id> [ <容器id> ...]
(4)、删除 docker 镜像
docker rmi <镜像id> [ <镜像id> ...]
(5)、查看正在运行的 rabbitmq 进程
ps -ef | grep rabbitmq
(6)、进入容器内部
docker exec -it <容器id> /bin/bash
(7)、查看容器内网ip地址
docker inspect <容器id>
(8)、查看docker 镜像的版本
docker image inspect <镜像名称>:latest|grep -i version
5、RabbitMQ的工作原理
(一)下图是RabbitMQ的基本结构:
组成部分说明:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费者
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
(二)生产者发送消息流程:
(1)、生产者和Broker建立TCP连接。
(2)、生产者和Broker建立通道。
(3)、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
(4)、Exchange将消息转发到指定的Queue(队列)
(三)消费者接收消息流程:
(1)、消费者和Broker建立TCP连接
(2)、消费者和Broker建立通道
(3)、消费者监听指定的Queue(队列)
(4)、当有消息到达Queue时Broker默认将消息推送给消费者。
(5)、消费者接收到消息。
(6)、ack回复
二、简易队列
1、简介:
channel.basicPublish()方法第一个参数为空字符串,消息生产者绑定默认交换机,第二个参数填入队列名称作为routingKey,就是简易队列模式。
2、代码:
(1)生产者:
public class Producer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory fact = new ConnectionFactory(); fact.setHost("192.168.1.100"); fact.setUsername("guest"); fact.setPassword("guest"); Channel channel = fact.newConnection().createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String mess = "hello world"; channel.basicPublish("", QUEUE_NAME, null, mess.getBytes()); System.out.println("消息发送完毕"); } }
// 第一个参数是交消息换机名称。如果为空字符串则队列模式为简易队列模式,绑定默认交换机,第二个参数填入队列名称作为routingKey。 // 第二个参数是路由键routingKey。 // 第三个参数是消息的属性. // 第四个参数是消息的内容. void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
(2)消费者:
public class Consumer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory fact = new ConnectionFactory(); fact.setHost("192.168.1.100"); fact.setUsername("guest"); fact.setPassword("guest"); Connection connection = fact.newConnection(); Channel channel = connection.createChannel(); DeliverCallback d = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback c = consumerTag -> { System.out.println("消息被中断"); }; channel.basicConsume(QUEUE_NAME, true, d, c); } }
(3)编写工具类:
public class ConnectUtils { public static Channel connect() throws Exception { ConnectionFactory fact = new ConnectionFactory(); fact.setHost("192.168.1.100"); fact.setUsername("guest"); fact.setPassword("guest"); Connection connection = fact.newConnection(); Channel channel = connection.createChannel(); return channel; } }
public class SleepUtils { public static void sleep(int second) { try { Thread.sleep(second * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
三、相关设置
3、消息应答:
分为自动应答和手动应答。
不建议自动应答。
手动应答:
channel.basicAsk() (用于确认应答)
channel.basicNask() (用于否认应答)
channel.basicReject() (用于否认应答)
multiple 批量应答 true false 建议使用false
4、消息重新入队:
设置为手动应答后,当一个消费者 断连,mq会将消息重新入队,交给其他的消费者。
public class Producer { private final static String ACK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); channel.queueDeclare(ACK_QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", ACK_QUEUE_NAME, null, message.getBytes("UTF-8")); } } }
public class Consumer1 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); System.out.println("Consumer1等待接收消息的时间较短"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); SleepUtils.sleep(1); System.out.println("C1接收到的消息:" + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback); } }
public class Consumer2 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); System.out.println("Consumer2等待接收消息的时间较长"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); SleepUtils.sleep(10); System.out.println("C2接收到的消息:" + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback); } }
在Producer 控制台依次输入
aa 消息被C1接收到, 等待1秒,打印出“C1接收到的消息:aa”
bb 消息被C2接收到, 等待10秒,打印出“C2接收到的消息:bb”
cc 消息被C1接收到, 等待1秒,打印出“C1接收到的消息:cc”
dd 消息被C2接收到, 等待10秒的过程中关闭C2服务,消息重新进入队列,分配到C1服务。
消息被C1接收到, 等待1秒,打印出“C1接收到的消息:dd”
5、持久化
(1)队列持久化:
在生产者中:
channel.queueDeclare(ACK_QUEUE_NAME, true, false, false, null) 第二个参数为 boolean durable,值为true则进行持久化,在图形化界面队列后面显示大写字母‘D’
(2)消息持久化:
channel.basicPublish(“”, ACK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(“UTF-8”));
6、不公平分发:处理速度快的消费者多接受消息(能者多劳)。
与其相对的是轮询分发,完全按照一个消费者轮一次的方式接收信息。
在消息消费者设置
channel.basicQos(1);
Qos: Quality of Service
预取值Prefetch :信道的消息容量
在消息消费者C1设置
channel.basicQos(2)
在消息消费者C2设置
channel.basicQos(5);
四、发布确认
三种方式:
单个发布确认、批量发布确认、异步发布确认
public class Producer { public static void main(String[] args) throws Exception { // confirm01(); //单个发布确认耗时920ms // confirm02(); //批量发布确认耗时92ms confirm03(); // 批量发布确认耗时66ms } //单个发布确认 public static void confirm01() throws Exception { Channel channel = ConnectUtils.connect(); channel.confirmSelect(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); long begin = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message = "消息" + i; System.out.println("生产者发送消息:" + message); channel.basicPublish("", queueName, null, message.getBytes("UTF-8")); channel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("单个发布确认耗时" + (end - begin) + "ms"); } //批量发布确认 public static void confirm02() throws Exception { Channel channel = ConnectUtils.connect(); channel.confirmSelect(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); long begin = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message = "消息" + i; System.out.println("生产者发送消息:" + message); channel.basicPublish("", queueName, null, message.getBytes("UTF-8")); if ((i + 1) % 100 == 0) { channel.waitForConfirms(); } } long end = System.currentTimeMillis(); System.out.println("批量发布确认耗时" + (end - begin) + "ms"); } //异步发布确认 public static void confirm03() throws Exception { Channel channel = ConnectUtils.connect(); channel.confirmSelect(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); ConfirmCallback c1 = (deliveryTag, multiple) -> { System.out.println("确认的消息:" + deliveryTag); }; ConfirmCallback c2 = (deliveryTag, multiple) -> { System.out.println("未确认的消息:" + deliveryTag); }; channel.addConfirmListener(c1, c2); // 异步通知 long begin = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { String message = "消息" + i; System.out.println("生产者发送消息:" + message); channel.basicPublish("", queueName, null, message.getBytes("UTF-8")); } long end = System.currentTimeMillis(); System.out.println("批量发布确认耗时" + (end - begin) + "ms"); } }
五、交换机 Exchange:
1、作用:通过交换机,能够让消息被多个消费者接收
消息不能由生产者直接发送到队列,而是由生产者发送到交换机,再由交换机发送到队列。
2、交换机类型:
direct(直接)、topic(主题)、headers(标题)、fanout(扇出)
3、无名交换机:
channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
不指定交换机名称则使用默认交换机(AMQP default)
4、临时队列:
不进行持久化的队列
5、fanout类型:发布订阅模式
将消息发送到同一个交换机下不同队列的消费者
public class Producer { private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); System.out.println("生产者发送消息:" + message); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); } } }
public class Consumer1 { private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); System.out.println("Consumer1等待接收消息的时间较短"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("C1接收到的消息:" + message); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "key001"); channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
public class Consumer2 { private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); System.out.println("Consumer2等待接收消息的时间较短"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("C2接收到的消息:" + message); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "key001"); channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
生产者控制台:
aa
生产者发送消息:aa
bb
生产者发送消息:bb
cc
生产者发送消息:cc
dd
生产者发送消息:dd
ff
生产者发送消息:ff
消费者C1控制台:
Consumer1等待接收消息的时间较短
C1接收到的消息:aa
C1接收到的消息:bb
C1接收到的消息:cc
C1接收到的消息:dd
C1接收到的消息:ff
消费者C2控制台:
Consumer1等待接收消息的时间较短
C1接收到的消息:aa
C1接收到的消息:bb
C1接收到的消息:cc
C1接收到的消息:dd
C1接收到的消息:ff
6、direct类型:路由模式
与交换机绑定的队列的routingKey不相同,在发送消息时指定交换机和routingKey就能找到唯一的队列。
消息生产者:
public class Producer { private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); System.out.println("生产者发送消息:" + message); channel.basicPublish(EXCHANGE_NAME, "key201", null, message.getBytes("UTF-8")); } } }
消费者C1:
public class Consumer1 { private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("C1接收到的消息:" + message); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; String queueName = "direct1-1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "key101"); channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
消费者C2:
public class Consumer2 { private static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { Channel channel = ConnectUtils.connect(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("C2接收到的消息:" + message); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息被中断"); }; String queueName1 = "direct2-1"; channel.queueDeclare(queueName1, false, false, false, null); channel.queueBind(queueName1, EXCHANGE_NAME, "key201"); channel.basicConsume(queueName1, true, deliverCallback, cancelCallback); String queueName2 = "direct2-2"; channel.queueDeclare(queueName2, false, false, false, null); channel.queueBind(queueName2, EXCHANGE_NAME, "key202"); channel.basicConsume(queueName2, true, deliverCallback, cancelCallback); } }
在生产者控制台输入:
aaa
生产者发送消息:aaa
bbb
生产者发送消息:bbb
ccc
生产者发送消息:ccc
ddd
生产者发送消息:ddd
消费者控制台显示:
C2接收到的消息:aaa
C2接收到的消息:bbb
C2接收到的消息:ccc
C2接收到的消息:ddd