《RabbitMQ》 | 消息丢失也就这么回事

简介: 本文主要介绍 RabbitMQ的消息丢失问题

面试中常见的RabbitMQ面试题也是多了去了,常见的如下:


  • 消息可靠性问题:如何确保发送的消息至少被消费一次?


  • 延迟消息问题:如何实现消息的延迟投递?


  • 高可用问题:如何避免单点的MQ故障而导致的不可用问题?


  • 消息堆积问题:如何解决数百万级以上消息堆积,无法及时消费问题?


这几个问题又得让你脑壳疼一阵子,是不是也在网上看了挺多博文介绍这方面的解决方案,但是却看了又忘,实际便是因为缺少实操,这篇小菜便重点讲述下 RabbitMQ 如何解决消息丢失问题~


一、消息可靠性问题


消息可靠性问题我们又可能将其理解为如何防止消息丢失?那为什么消息会丢失呢?我们可以先看看消息投递的整个过程:


网络异常,图片无法展示
|


我们从图中可以从三个阶段分析可能造成消息丢失:


  • publisher 发送消息到 exchange


  • exchange 分发到 queue


  • queue 投递到 customer


既然我们知道了哪些阶段可能造成数据丢失,那我们就可以从源头防范于未然~!


工程结构


工程结构很简单,就是一个简单的 Spring Boot 项目,里面有个 消费者生产者 两个模块


网络异常,图片无法展示
|


1、生产者发送丢失


RabbitMQ 中提供了 publisher confirm 机制来避免消息发送到 MQ 的过程中丢失的问题。消息发送到 MQ 以后,会返回一个确认结果给生产者,用于表示消息是否确认成功。该确认结果存在两种请求:


  • publisher-confirm


该类型是 发送者确认 ,存在两种情况


  1. 消息成功投递到交换机,返回 ack


  1. 消息未投递到交换机,返回 nack


  • publisher-return


该类型是 发送者回执 ,存在两种情况


  1. 消息投递到交换机,且成功分发到队列,返回 ack


  1. 消息投递到交换机,但未成功分发到队列,返回 nack


网络异常,图片无法展示
|


注意:确认机制发送消息时,需要给每个消息设置一个全局唯一ID,以区分不同消息,避免ack冲突


接下来我们用代码来说明具体的操作方式


1)配置文件


我们首先看下 生产者 的配置文件


网络异常,图片无法展示
|


前面几个配置 RabbitMQ 的连接信息没啥好讲的,我们来看几个比较陌生的配置


  • publisher-confirm-type


开启发送确认,这里可以支持两种类型


  1. simple:同步等待 confirm 结果,直到超时


  1. correlated:异步回调,定义 ConfirmCallback,MQ返回结果时会回调这个 ConfirmCallback


  • publisher-returns


开启 public-return,同样是基于 CallBack 机制,不过是定义 ReturnCallback


  • template.mandatory


定义路由失败时的策略。


  • true:调用 ReturnCallback


  • false:直接丢弃消息


2)定义回调事件


每个 RabbitTemplate 只能配置一个 ReturnCallback


网络异常,图片无法展示
|


3)发送消息


网络异常,图片无法展示
|


执行发送代码之前,我们确保已经创建了(一个直连交换机direct-exchange,一个队列direct-queue,且绑定的 key 为direct


正常情况下,我们执行代码肯定是发送成功的,可以看到控制台绿色输出


网络异常,图片无法展示
|


且我们在消息队列中也成功接收到了消息:


网络异常,图片无法展示
|


到这步是没有任何问题的,那我们就需要手动给它制造点问题~ 我们可以修改 交换机名称,这个时候发送消息的时候找不到交换机,那么交换机肯定就会返回 nack ,再看是否可以进入到我们代码中的判断:


网络异常,图片无法展示
|


代码执行虽然是绿色的,但因为rabbitMQ找不到正确的交换机,而导致消息发送失败,也就是下图的这个过程:


网络异常,图片无法展示
|


这一个是 publish -> exchange 失败我们顺利的捕获到了,那么 exchange -> queue 这步的失败是我们是否能够正常捕获?我们可以通过修改 路由 key 使交换机路由不到对应的 queue


网络异常,图片无法展示
|


可以发现当交换机没有路由到相对应的 queue 时,也成功触发了我们自定义的回调函数,然后看 rabbitMQ 控制台是可以发现消息已经成功投递到交换机


网络异常,图片无法展示
|


到这里,我们通过两种简单的错误模拟,使程序都能顺利的进入到我们预先定义的回调中,如果遇到发送失败的情况,我们可以在失败的回调中自定义消息重发机制,最大程度上避免消息丢失的问题


4)总结


我们可以通过 publisher-confirmpublisher-return 两种错误捕获机制,来避免 生产者 -> exchange -> queue 这条链路的消息丢失


  • publisher-confirm
  1. 消息成功发送到 exchange,返回 ack
  2. 消息未能成功发送到 exchange,返回 nack
  3. 消息发送过程中出现异常,没有收到回执,则进入 failureCallback 回调


  • publisher-return
  1. 消息成功发送到 exchange,但没有路由到 queue,调用自定义回调函数 returnCallback


2、消息存储丢失


消息存储丢失是啥意思?其实就是持久化 的概念,当消息已经成功发送到 queue 时,这个时候如果消费者没有及时进行消费,rabbitMQ 又刚好宕机重启了,那么这个时候就会发现消息丢失了。


这是因为 MQ 默认是内存存储消息,我们可以通过开启持久化的功能来确保在 MQ 中的消息不丢失


其实我们通过 RabbitMQ 提供的 GUI 创建交换机或队列的时候就可以发现有持久化的这个选项


网络异常,图片无法展示
|


如果将 durability 设为 durable 后,我们可以发现无论如何重启 MQ,重启后交换机和队列依然存在。


但是很多时候我们交换机队列 的创建并非在 GUI 上创建,而是通过应用代码的方式创建


  • 交换机持久化


网络异常,图片无法展示
|


  • 队列持久化


网络异常,图片无法展示
|


  • 消息持久化


默认情况下,AMQP 发出的消息都是持久化的,不用特意指定


网络异常,图片无法展示
|


3、消费者消费丢失


RabbitMQ 采取的机制是当确认消息被消费者消费后就会立即删除


那么如何确认消息已被消费者消费?那就还得依靠回执来确认,消费者获取消息后,需要向 RabbitMQ 发送 ack 回执,表明自己已经处理消息。其中 ack 在 AMQP 中有三种确认模式:


  • manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack


  • auto:自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack,反之返回 nack


  • none:关闭 ack,MQ 在消息投递后会立即删除消息


上述三种方式都是通过修改配置文件:


网络异常,图片无法展示
|


1)manual


该方式需要用户自己手动确认,灵活性较好


网络异常,图片无法展示
|


这个时候如果执行逻辑是正常的,那么在 RabbitMQ 上就会将该消息删除,但是如果执行的逻辑抛出了异常,没有进入到手动确认的环节,RabbitMQ 将会把该消息保留:


网络异常,图片无法展示
|


网络异常,图片无法展示
|


2)auto


该方式在没有异常发生时会自动进行消息确认


我们在配置文件中将确认方式改为 auto 进行测试:


网络异常,图片无法展示
|


正常情况下接收消息是没有任何问题的,那我们同样制造些非正常情况:


网络异常,图片无法展示
|


我们手动制造了点异常,发现消息没有被 RabbitMQ 删除的同时,而且控制台一直在报错,无止境的在尝试重新消费,这如果放在线上环境难免有些令人崩溃。


当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,就会导致 MQ 的消息处理飙升


而发生这种情况的原因所在便是因为 RabbitMQ的消息失败重试机制,但很多时候我们可能不想一直重试,只需要经过几次尝试,如果失败就放弃处理,这个时候我们就需要在配置文件中配置失败重试机制:


网络异常,图片无法展示
|


开启该配置后,我们重启项目进行观察


网络异常,图片无法展示
|


通过控制台可以看到在重试 3 次后,SpringAMQP会抛出异常

AmqpRejectAndDontRequeueException,说明本地重试机制生效了。而且我们回到 RabbitMQ 控制台可以看到对应消息被删除了,说明最后 SpringAMQP 返回的是 ack,导致消息被 MQ 删除


网络异常,图片无法展示
|


但是这种处理方式并不优雅,重试后直接删除消息过于 暴力,那么有没有更好的处理方式?答案是有的!


我们可以利用 AMQP 提供的 MessageRecovery 接口来实现,该接口有三种不同的实现方式:


  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢失消息。默认方式,以上就是采用这种方式


  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队


  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机


三种方式可以根据不同场景进行采用,分析一下,不难发现第三种

RepublishMessageRecoverer 是比较优雅的~ 当重试失败后会将消息投递到一个指定专门存放异常消息的队列,后续由人工集中进行处理!具体使用方式如下:


网络异常,图片无法展示
|


通过自定义异常处理后,我们重启项目查看控制台:


网络异常,图片无法展示
|


可以发现重试3次后,我们的异常消息进入到了我们自定义的异常队列中


网络异常,图片无法展示
|


3)none


该方式没啥好讲的~ 无论消息异常与否 MQ 都会进行删除!


4、总结


假如这个时候面试再问你,如何确保 RabbitMQ消息的可靠性?那你可得好好唠嗑唠嗑


如何保证消息不丢失?


1)首先分析丢失的场景有哪些?


消息丢失可能发生在 发送时丢失(未送达 exchange / 未路由到 queue)消息未持久化而MQ宕机消费者接收消息未能正确消费


2)然后如何预防


  • 开启生产者确认机制,确保生产者的消息能到达队列


确认机制包括 publisher-confirm  和 publisher-return

当未送达到 交换机 我们可以通过 publisher-confirm 返回的 acknack 来确认

交换机 未成功路由到 队列,我们可以通过 publisher-return 自定义的回调函数来确认,每个 RabbitTemplate 只能配置一个 ReturnCallback


  • 开启持久化功能,确保消息未消费前在队列中不会丢失


持久化功能分为 交换机持久化队列持久化消息持久化,我们都需要将 durable 设置为 true


  • 开启消费者确认机制最低为 auto 级别


消费者确认机制有三种类型:manual (手动确认)auto (自动确认)none (关闭 ack)


  • 失败重试机制


我们手动设置 MessageResoverer  为 RepublishMessageRecoverer 方式,将投递失败的消息转到异常队列中,交由人工处理


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 安全
|
消息中间件 JavaScript 前端开发
JavaScript 连接消息(RabbitMQ)
JavaScript 连接消息(RabbitMQ)
JavaScript 连接消息(RabbitMQ)
|
消息中间件 NoSQL 关系型数据库
RabbitMQ消息丢失、积压、重复等解决方案
RabbitMQ消息丢失、积压、重复等解决方案
RabbitMQ消息丢失、积压、重复等解决方案
|
消息中间件 存储 NoSQL
springcloud:springboot整合RabbitMQ|RabbitMQ保证消息可靠性(三)
上一章我们讲解了rabbitmq的四种交换机类型、七种通讯方式。本章我们将整合springboot来向大家完整演示rabbitmq的使用,并说明如何保证消息的可靠性。
631 0
springcloud:springboot整合RabbitMQ|RabbitMQ保证消息可靠性(三)
|
消息中间件 Java 网络架构
SpringBoot整合RabbitMQ 实现五种消息模型
SpringBoot整合RabbitMQ 实现五种消息模型
270 2
|
消息中间件
13、RabbitMQ教程-消息的顺序性
13、RabbitMQ教程-消息的顺序性
236 0
|
消息中间件
12、RabbitMQ教程-使用消息确认机制confirm带来的问题
12、RabbitMQ教程-使用消息确认机制confirm带来的问题
190 0
|
消息中间件
11、RabbitMQ教程-消息确认机制confirm
11、RabbitMQ教程-消息确认机制confirm
251 0
|
消息中间件 API
10、RabbitMQ教程-消息的两种消费模式
10、RabbitMQ教程-消息的两种消费模式
564 0
|
消息中间件 存储
RabbitMQ-消息丢失
RabbitMQ-消息丢失