RabbitMQ核心思想

简介: RabbitMQ核心思想

RabbitMQ核心思想

在这里插入图片描述

MQ是干什么用的?

应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等...


  • 当前最主流的消息中间件。
  • 高可用性,支持发送确认,投递确认等特性
  • 高可用,支持镜像队列
  • 支持插件

优点:

  1. 基于 Erlang, 支持高并发
  2. 支持多种平台,多种客户端,文档齐全
  3. 可靠性高
  4. 在互联网公司有较大规模等使用,社区活跃度高

1. AMQP协议介绍

在这里插入图片描述

Broker :接受和分发消息等的应用,RabbitMQ就是Message

Virtual Host : 虚拟机Broker , 将多个单元隔离开

Connection : publisher / consumer 和 broker之间的TCP连接

Channel : connection内部建立的逻辑连接,通常没个线程创建单独的channel

Rounting Key : 路由键,用来只是消息的路由转发,相当于快递的地址

Exchange : 交换机 ,相当于快递的分拨中心

Queue : 消息队列,消息最终被送到这里等待consumer 取走

Binding : exchange 和 queue之间的虚拟连接,用于message的分发依据


AMQP协议的核心概念-Exchange

  • 在AMQP协议或者是RabbitMQ实现中,最核心的组件是Exchange
  • Exchange 承担 RabbitMQ 的核心功能 --- 路由转发
  • Exchange 有多个种类,配置多变,需要详细讲解

RabbitMQ核心 -- Exchange解析

  1. Exchange是 AMQP 协议和RabbitMQ的 核心组件
  2. Exchange的功能是根据 绑定关系 和 路由键为消息提供路由,将消息转发至相应的队列
  3. Exchange有4种类型 :Direct / Topic / Fabout /Headers

Direct Exchange (直接路由)

  • Message中的Routing Key 如果和 Binding Key 一致, Direct Exchange 则将 message 发到对应的 queue中

在这里插入图片描述

Fanout Exchange (广播路由)

  • 每个发到 Fanout Exchange 的 message 都会分发到所有绑定到queue上去

在这里插入图片描述

Topit Exchange (话题路由)

  • 根据 Routing Key 及通配规则,Topic Exchange 将消息分发目标 Queue中
  • 全匹配 :与Direct 类似
  • Binding Key 中的 #:匹配任意个数的word

在这里插入图片描述

2. Docker 安装 RabbitMQ


    docker pull rabbitmq
    

这里是直接安装最新的


    docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

访问 : http://IP地址:15672

在这里插入图片描述
用户名和密码默认都是guest


3. RabbitMQ保证消息的可靠性

  • 需要使用RabbitMQ发送端确认机制,确认消息成功发送到RabbitMQ并被处理
  • 需要使用RabbitMQ消息返回机制,若没发现目标队列,中间件会通知发送方
  • 需要使RabbitMQ消息端确认消息,确认消息没有发生异常
  • 需要使用RabbitMQ消费端限流机制,限制消息推送速度 ,保障接受端服务稳定
  • 大量到堆积消息会给RabbitMQ产生很大到压力,需要使用RabbitMQ消息过期时间,防止消息大量积压
  • 过期后会直接丢弃, 不符合业务逻辑,需要使用RabbitMQ死信队列,收集过期消息,以供分析


4. 发送确认机制原理



消息真的发出去了吗?

  • 消息发送后,发送端不知道RabbitMQ是否真的收到了消息,若RabbitMQ异常,消息丢失,业务异常,这个时候我们就需要使用RabbitMQ发送端确认机制,确认消息发送

三种确认机制


1. 单条同步确认
  • 配置channel,开启确认模式:channel.confirmSelect()
  • 每发送一条消息,调用channel.waitForConfirms()方法等待确认
         //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String message = objectMapper.writeValueAsString(orderMessageDTO);
        channel.confirmSelect();
        channel.basicPublish(
                "exhange.order.restaurant",
                "key.restaurant",
                null,
                message.getBytes());
        if(channel.waitForConfirms()){
            //表示发送确认处理逻辑
        }else{
            //发送失败
        }
2. 多条同步确认
  • 配置channel,开启确认模式:channel.confirmSelect()
  • 发送多条消息后,调用channel.waitForConfirms()方法等待确认
         //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String message = objectMapper.writeValueAsString(orderMessageDTO);
        channel.confirmSelect();
        channel.basicPublish(
                "exhange.order.restaurant",
                "key.restaurant",
                null,
                message.getBytes());
        if(channel.waitForConfirms()){
            //表示发送确认处理逻辑
        }else{
            //发送失败
        }
3. 异步确认
  • 配置channel,开启确认模式:channel.confirmSelect()
  • 在channel上添加监听: addConfirmListener , 发送消息后,会回调此方法,通知是否发送成功
  • 异步确认有可能是单条,也有可能是多条,取决于MQ
        //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String message = objectMapper.writeValueAsString(orderMessageDTO);
        channel.confirmSelect();
        channel.basicPublish(
                "exhange.order.restaurant",
                "key.restaurant",
                null,
                message.getBytes());
        ConfirmListener confirmListener = new ConfirmListener(){

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Ack  " + deliveryTag + multiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack  " + deliveryTag + multiple);
            }
        };
        channel.addConfirmListener(confirmListener);

5. 消息返回机制


消息真被路由了吗?

  • 消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃,业务异常,需要使用RabbitMQ消息返回机制,确认消息被正确路由

消息的开启方法:

  1. 在RabbitMQ基础配置中又一个关键配置项:Mandatory
  2. Mandatory若为false,RabbitMQ将直接丢弃无法路由的消息
  3. Mandatory若为true,RabbitMQ才会处理无法路由的消息

在这里插入图片描述

DeliverCallback deliverCallback = ((consumerTag, message) -> {
        //拿到消息
        String messageBody = new String(message.getBody());
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        try {
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("Message Return:");
                    //处理失败的业务逻辑
                }
            });
            channel.basicPublish(
                    "exhange.order.restaurant",
                    "key.restaurant",
                    true,
                    null,
                    messageBody.getBytes());

        }catch (Exception e){
            log.error(e.getMessage());
        }

    });

6. 消费端确认机制


消费端处理异常怎么办?

  • 默认情况下,消费端接收消息时,消息会被自动确认(ACK),发生异常时,发送端与消息中间件无法得知消息处理情况,需要使用RabbitMQ 消息端确认机制,确认消息被正确处理

消费端ACK类型

手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息


- 单条手动ACK : multiple = false

- 多条手动ACK : multiple = true

- 推荐使用单条ACK

    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);


    channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);

自动ACK:消费端收到消息后,会自动签收消息


7. 消费端限流机制

业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推给接受端,造成接受端服务奔溃

在高并发端场景下,有个微服务奔溃了,本科期间队列挤压了大量消息,微服务上线后,收到大量并发消息。将同样多端消息推给能力不同端副本,会导致部分副本异常

针对以上问题,RabbitMQ 开发了 Qos (服务质量保证) 功能,Qos功能保证了在一定树木消息违背确认前,不消费新的消息

//这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
channel.basicQos(1);

8. RabbitMQ的过期时间(TTL)

  • RabbitMQ的过期时间称为 TTL (time to live), 生存时间
  • RabbitMQ的过期时间分为消息TTL 和 队列 TTL
  • 消息TTL设置了单条消息的过期时间
  • 队列TTL设置了队列中所有消息的过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .deliveryMode(2) //deliveryMode=1代表不持久化,deliveryMode=2代表持久化
            .contentEncoding("UTF-8") // 编码方式
            .expiration("10000") // 过期时间
            .headers(headers) //自定义属性
            .build();


String messageBody = "发送的消息"

//发送通道
channel.basicPublish(
                    "exhange.order.restaurant",
                    "key.restaurant",
                    true,
                    properties,
                    messageBody.getBytes());

9. 死信队列

如何转移过期的消息?

  • 消息被设置了过期时间,过期后会直接被丢弃,直接被丢弃的消息无法对系统运行异常发出警报,需要使用RabbitMQ死信队列,收集过期消息,以供分析

什么是死信队列

  • 队列被配置了DLX属性 (Dead-Letter-Exchange) 当一个消息变成死信(dead message)后,能重新被发布到另一个 Exchange , 这个Exchange也是一个普通交换机,死信被死信交换机路由后,一般进入一个固定队列

在这里插入图片描述

怎么变成死信
  • 消息被拒绝 (reject / nack) 并且 requeue = false
  • 消息过期(TTL到期)
  • 队列达到最大长度

个人博客地址:http://blog.yanxiaolong.cn/

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
消息中间件 中间件 数据安全/隐私保护
RabbitMQ 的核心概念
RabbitMQ 的核心概念
50 2
|
消息中间件 存储 Java
最经典的消息中间件:RabbitMQ
最经典的消息中间件:RabbitMQ
164 1
|
消息中间件 数据库
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
消息中间件系列教程(18) -RabbitMQ-基于RabbitMQ解决分布式事务(思想)
90 0
|
消息中间件 存储 安全
RabbitMQ原理
描述了RabbitMQ的原理和使用方法
130 0
|
8月前
|
消息中间件 存储 Java
RabbitMQ是如何实现消息传递的?
RabbitMQ是如何实现消息传递的?
137 0
|
消息中间件 存储 缓存
5.RabbitMQ 的工作原理(重要)
5.RabbitMQ 的工作原理(重要)
175 0
|
消息中间件 存储 负载均衡
为什么会选择使用RabbitMQ ? 有什么好处 ?
选择使用RabbitMQ的原因有很多,以下是一些常见的好处和详细介绍
523 0
|
消息中间件 负载均衡 安全
RabbitMQ设计原理解析
RabbitMQ现在用的也比较多,但是没有过去那么多啦。现在很多的流行或者常用技术或者思路都是从过去的思路中演变而来的。了解一些过去的技术,对有些人来说可能会产生众里寻他千百度的顿悟,加深对技术的理解,更好的应用于工作中去。
148 0
|
消息中间件 存储 缓存
Rabbitmq基本原理
MQ全称为Message Queue, 是一种分布式应用程序的的通信方法,它是消费-生产者模型的一个典型的代表,producer往消息队列中不断写入消息,而另一端consumer则可以读取或者订阅队列中的消息。RabbitMQ是MQ产品的典型代表,是一款基于AMQP协议可复用的企业消息系统。业务上,可以实现服务提供者和消费者之间的数据解耦,提供高可用性的消息传输机制,在实际生产中应用相当广泛。本文意在介绍Rabbitmq的基本原理,包括rabbitmq基本框架,概念,通信过程等。
101 0
|
消息中间件 存储 网络协议
RabbitMQ详解(一):RabbitMQ相关概念
RabbitMQ详解(一):RabbitMQ相关概念
489 0