RabbitMQ学习笔记1

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: RabbitMQ学习笔记

一、消息队列概述

1、简介:

消息队列是用来发送消息的消息中间件,本质上是队列,有先进先出的特点。


2、功能:

服务削峰

程序解耦

异步消息


3、MQ分类:

ActiveMQ

RocketMQ

Kafka

RabbitMQ


4、RabbitMQ安装

(一)安装并运行

(1)、在docker hub 中查找rabbitmq镜像

docker search rabbitmq:3-management


(2)、从docker hub 中拉取rabbitmq镜像

docker pull rabbitmq:3-management


(3)、查看拉取的rabbitmq镜像

docker  images


(4)、运行 rabbitmq服务端

docker run -d \
-v /opt/rabbitmq/data:/var/lib/rabbitmq \
-p 5672:5672 -p 15672:15672 --name rabbitmq --restart=always \
--hostname myRabbit rabbitmq:3-management
=

参数解释:

docker run :启动命令
--name :给容器起名字
 -p :★映射端口号,主机端口:容器端口
-v : 将主机中指定目录的挂载到容器的目录
-i : 以交互模式运行。
-t : 进入终端。
-d : 以守护模式后台运行。
-e XXX_XXX="xxxxxxxxxxx" : 指定环境变量

(5)、查看正在运行的容器

docker ps


(6)、容器运行成功之后,在浏览器访问:

http://192.168.1.100:15672

账号 guest , 密码 guest

(二)其他操作:

(1)、重新启动 rabbitmq 容器

docker   restart   <容器id>

(2)、结束正在运行的容器

docker  stop  <容器id>    容器优雅退出
docker  kill  <容器id>    容器直接退出

(3)、删除 docker 容器 (容器在删除前要先结束)

docker  rm   <容器id>  [ <容器id> ...]


(4)、删除 docker 镜像

docker  rmi  <镜像id>  [ <镜像id> ...]

(5)、查看正在运行的 rabbitmq 进程

ps -ef | grep   rabbitmq

(6)、进入容器内部

docker exec -it  <容器id>  /bin/bash

(7)、查看容器内网ip地址

docker  inspect <容器id>


(8)、查看docker 镜像的版本

docker image inspect <镜像名称>:latest|grep -i version



5、RabbitMQ的工作原理

(一)下图是RabbitMQ的基本结构:

5.png


组成部分说明:

Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue

Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费者

Producer:消息生产者,即生产方客户端,生产方客户端将消息发送

Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。


(二)生产者发送消息流程:

(1)、生产者和Broker建立TCP连接。

(2)、生产者和Broker建立通道。

(3)、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

(4)、Exchange将消息转发到指定的Queue(队列)


(三)消费者接收消息流程:

(1)、消费者和Broker建立TCP连接

(2)、消费者和Broker建立通道

(3)、消费者监听指定的Queue(队列)

(4)、当有消息到达Queue时Broker默认将消息推送给消费者。

(5)、消费者接收到消息。

(6)、ack回复


二、简易队列

1、简介:

channel.basicPublish()方法第一个参数为空字符串,消息生产者绑定默认交换机,第二个参数填入队列名称作为routingKey,就是简易队列模式。

2、代码:

(1)生产者:


public class Producer {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory fact = new ConnectionFactory();
        fact.setHost("192.168.1.100");
        fact.setUsername("guest");
        fact.setPassword("guest");
        Channel channel = fact.newConnection().createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String mess = "hello world";
        channel.basicPublish("", QUEUE_NAME, null, mess.getBytes());
        System.out.println("消息发送完毕");
    }
}
// 第一个参数是交消息换机名称。如果为空字符串则队列模式为简易队列模式,绑定默认交换机,第二个参数填入队列名称作为routingKey。
// 第二个参数是路由键routingKey。
// 第三个参数是消息的属性.
// 第四个参数是消息的内容.
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

(2)消费者:

public class Consumer {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory fact = new ConnectionFactory();
        fact.setHost("192.168.1.100");
        fact.setUsername("guest");
        fact.setPassword("guest");
        Connection connection = fact.newConnection();
        Channel channel = connection.createChannel();
        DeliverCallback d = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        CancelCallback c = consumerTag -> {
            System.out.println("消息被中断");
        };
        channel.basicConsume(QUEUE_NAME, true, d, c);
    }
}

(3)编写工具类:

public class ConnectUtils {
    public static Channel connect() throws Exception {
        ConnectionFactory fact = new ConnectionFactory();
        fact.setHost("192.168.1.100");
        fact.setUsername("guest");
        fact.setPassword("guest");
        Connection connection = fact.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
public class SleepUtils {
    public static void sleep(int second) {
        try {
            Thread.sleep(second * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

三、相关设置

3、消息应答:

分为自动应答和手动应答。

不建议自动应答。

手动应答:

channel.basicAsk() (用于确认应答)

channel.basicNask() (用于否认应答)

channel.basicReject() (用于否认应答)


multiple 批量应答 true false 建议使用false


4、消息重新入队:

设置为手动应答后,当一个消费者 断连,mq会将消息重新入队,交给其他的消费者。


public class Producer {
    private final static String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = ConnectUtils.connect();
        channel.queueDeclare(ACK_QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", ACK_QUEUE_NAME, null, message.getBytes("UTF-8"));
        }
    }
}
public class Consumer1 {
    private static final String QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = ConnectUtils.connect();
        System.out.println("Consumer1等待接收消息的时间较短");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("C1接收到的消息:" + message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息被中断");
        };
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
    }
}
public class Consumer2 {
    private static final String QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = ConnectUtils.connect();
        System.out.println("Consumer2等待接收消息的时间较长");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            SleepUtils.sleep(10);
            System.out.println("C2接收到的消息:" + message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息被中断");
        };
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
    }
}

在Producer 控制台依次输入

aa 消息被C1接收到, 等待1秒,打印出“C1接收到的消息:aa”

bb 消息被C2接收到, 等待10秒,打印出“C2接收到的消息:bb”

cc 消息被C1接收到, 等待1秒,打印出“C1接收到的消息:cc”

dd 消息被C2接收到, 等待10秒的过程中关闭C2服务,消息重新进入队列,分配到C1服务。

消息被C1接收到, 等待1秒,打印出“C1接收到的消息:dd”


5、持久化

(1)队列持久化:

在生产者中:

channel.queueDeclare(ACK_QUEUE_NAME, true, false, false, null) 第二个参数为 boolean durable,值为true则进行持久化,在图形化界面队列后面显示大写字母‘D’


(2)消息持久化:

channel.basicPublish(“”, ACK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(“UTF-8”));


6、不公平分发:处理速度快的消费者多接受消息(能者多劳)。

与其相对的是轮询分发,完全按照一个消费者轮一次的方式接收信息。

在消息消费者设置

channel.basicQos(1);


Qos: Quality of Service


预取值Prefetch :信道的消息容量


在消息消费者C1设置

channel.basicQos(2)

在消息消费者C2设置

channel.basicQos(5);


6.png

四、发布确认

三种方式:

单个发布确认、批量发布确认、异步发布确认

public class Producer {
    public static void main(String[] args) throws Exception {
//        confirm01();  //单个发布确认耗时920ms
//        confirm02();    //批量发布确认耗时92ms
        confirm03();  // 批量发布确认耗时66ms
    }
    //单个发布确认
    public static void confirm01() throws Exception {
        Channel channel = ConnectUtils.connect();
        channel.confirmSelect();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) {
            String message = "消息" + i;
            System.out.println("生产者发送消息:" + message);
            channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
            channel.waitForConfirms();
        }
        long end = System.currentTimeMillis();
        System.out.println("单个发布确认耗时" + (end - begin) + "ms");
    }
    //批量发布确认
    public static void confirm02() throws Exception {
        Channel channel = ConnectUtils.connect();
        channel.confirmSelect();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) {
            String message = "消息" + i;
            System.out.println("生产者发送消息:" + message);
            channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
            if ((i + 1) % 100 == 0) {
                channel.waitForConfirms();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("批量发布确认耗时" + (end - begin) + "ms");
    }
    //异步发布确认
    public static void confirm03() throws Exception {
        Channel channel = ConnectUtils.connect();
        channel.confirmSelect();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
        ConfirmCallback c1 = (deliveryTag, multiple) -> {
            System.out.println("确认的消息:" + deliveryTag);
        };
        ConfirmCallback c2 = (deliveryTag, multiple) -> {
            System.out.println("未确认的消息:" + deliveryTag);
        };
        channel.addConfirmListener(c1, c2);         // 异步通知
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) {
            String message = "消息" + i;
            System.out.println("生产者发送消息:" + message);
            channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
        }
        long end = System.currentTimeMillis();
        System.out.println("批量发布确认耗时" + (end - begin) + "ms");
    }
}


五、交换机 Exchange:

1、作用:通过交换机,能够让消息被多个消费者接收

消息不能由生产者直接发送到队列,而是由生产者发送到交换机,再由交换机发送到队列。

2、交换机类型:

direct(直接)、topic(主题)、headers(标题)、fanout(扇出)

3、无名交换机:

channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));


不指定交换机名称则使用默认交换机(AMQP default)


4、临时队列:

不进行持久化的队列


5、fanout类型:发布订阅模式

将消息发送到同一个交换机下不同队列的消费者


public class Producer {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = ConnectUtils.connect();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            System.out.println("生产者发送消息:" + message);
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        }
    }
}
public class Consumer1 {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = ConnectUtils.connect();
        System.out.println("Consumer1等待接收消息的时间较短");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("C1接收到的消息:" + message);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息被中断");
        };
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "key001");
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}
public class Consumer2 {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = ConnectUtils.connect();
        System.out.println("Consumer2等待接收消息的时间较短");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("C2接收到的消息:" + message);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息被中断");
        };
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "key001");
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

生产者控制台:

aa

生产者发送消息:aa

bb

生产者发送消息:bb

cc

生产者发送消息:cc

dd

生产者发送消息:dd

ff

生产者发送消息:ff


消费者C1控制台:

Consumer1等待接收消息的时间较短

C1接收到的消息:aa

C1接收到的消息:bb

C1接收到的消息:cc

C1接收到的消息:dd

C1接收到的消息:ff


消费者C2控制台:

Consumer1等待接收消息的时间较短

C1接收到的消息:aa

C1接收到的消息:bb

C1接收到的消息:cc

C1接收到的消息:dd

C1接收到的消息:ff


6、direct类型:路由模式

与交换机绑定的队列的routingKey不相同,在发送消息时指定交换机和routingKey就能找到唯一的队列。

消息生产者:


public class Producer {
    private static final String EXCHANGE_NAME = "direct_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = ConnectUtils.connect();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            System.out.println("生产者发送消息:" + message);
            channel.basicPublish(EXCHANGE_NAME, "key201", null, message.getBytes("UTF-8"));
        }
    }
}

消费者C1:

public class Consumer1 {
    private static final String EXCHANGE_NAME = "direct_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = ConnectUtils.connect();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("C1接收到的消息:" + message);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息被中断");
        };
        String queueName = "direct1-1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "key101");
        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    }
}

消费者C2:

public class Consumer2 {
    private static final String EXCHANGE_NAME = "direct_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = ConnectUtils.connect();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("C2接收到的消息:" + message);
        };
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消息被中断");
        };
        String queueName1 = "direct2-1";
        channel.queueDeclare(queueName1, false, false, false, null);
        channel.queueBind(queueName1, EXCHANGE_NAME, "key201");
        channel.basicConsume(queueName1, true, deliverCallback, cancelCallback);
        String queueName2 = "direct2-2";
        channel.queueDeclare(queueName2, false, false, false, null);
        channel.queueBind(queueName2, EXCHANGE_NAME, "key202");
        channel.basicConsume(queueName2, true, deliverCallback, cancelCallback);
    }
}

在生产者控制台输入:

aaa

生产者发送消息:aaa

bbb

生产者发送消息:bbb

ccc

生产者发送消息:ccc

ddd

生产者发送消息:ddd

消费者控制台显示:

C2接收到的消息:aaa

C2接收到的消息:bbb

C2接收到的消息:ccc

C2接收到的消息:ddd

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 存储 NoSQL
RibbitMQ学习笔记之 RabbitMQ 其他知识点扩展
RibbitMQ学习笔记之 RabbitMQ 其他知识点扩展
64 1
|
消息中间件 网络协议 数据中心
RabbmitMQ学习笔记-RabbitMQ集群架构模式
RabbmitMQ学习笔记-RabbitMQ集群架构模式
76 0
|
消息中间件 Java
RabbmitMQ学习笔记-RabbitMQ与SpringBoot2.0整合实战
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
112 0
|
消息中间件 容灾
RibbitMQ学习笔记之RabbitMQ 集群(二)
RibbitMQ学习笔记之RabbitMQ 集群
79 0
|
消息中间件 缓存 负载均衡
RibbitMQ学习笔记之RabbitMQ 集群(一)
RibbitMQ学习笔记之RabbitMQ 集群
106 0
|
消息中间件 Web App开发 NoSQL
RabbitMQ学习笔记2
RabbitMQ学习笔记
81 0
|
消息中间件 存储 安全
RabbitMQ学习笔记(一)----RabbitMQ的基本概念以及5种队列模式
今天开始学习消息中间件,根据项目需求,目前选择的消息中间件是RabbitMQ。让我们一起来认识下RabbitMQ吧。
343 0
RabbitMQ学习笔记(一)----RabbitMQ的基本概念以及5种队列模式
|
消息中间件 存储 新零售
初步了解消息队列 RabbitMQ 版|学习笔记
快速学习初步了解消息队列 RabbitMQ 版
168 0
|
消息中间件 Java 数据安全/隐私保护
SpringBoot 整合 RabbitMQ|学习笔记
快速学习 SpringBoot 整合 RabbitMQ
140 0
SpringBoot 整合 RabbitMQ|学习笔记
|
消息中间件 Java 开发者
RabbitMQ 运行机制|学习笔记
快速学习 RabbitMQ 运行机制
RabbitMQ 运行机制|学习笔记
下一篇
无影云桌面