【RabbitMQ】RabbitMQ如何做到保证消息100%不丢失?

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【RabbitMQ】RabbitMQ如何做到保证消息100%不丢失?

项目中使用RabbitMQ来作为消息队列,遇见过消息丢失的情况,特此记录一下。

写在前面

先来说下MQTT协议中的3种语义,这个非常重要

在MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

  • At most once:至多一次。消息在传递时,最多会被送达一次。也就是说,没什么消息可靠性保证,允许丢消息。
  • At least once:至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。

Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级 这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。也就是说,消息队列很难保证消息不重复。

At least once+幂等消费=Exactly once

幂等性下回再讲,这篇先说下消息丢失的问题。

如何保证消息100%不丢失

消息从生产端到消费端消费要经过3个步骤:

  1. 生产端发送消息到RabbitMQ;
  2. RabbitMQ发送消息到消费端;
  3. 消费端消费这条消息;

也就是说只要保证生产端和消费段的消息可靠性,理论上就能够保证消息100%不丢失。

(当然,这里的可靠并不是一定就100%不丢失了,磁盘损坏,机房爆炸等等都能导致数据丢失,当然这种都是极小概率发生,能做到99.999999%消息不丢失,就是可靠的了)


生产端可靠性投递

事务消息机制

// 设置channel开启事务
rabbitTemplate.setChannelTransacted(true);
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory)
 {
  return new RabbitTransactionManager(connectionFactory);
 }
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void publishMessage(String message) throws Exception {
   rabbitTemplate.setMandatory(true);
   rabbitTemplate.convertAndSend("javatrip",message);
    }

事务消息机制由于会严重降低性能,所以一般不采用这种方法,因为这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ-Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。

confirm消息确认机制

顾名思义,就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。

# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败回退
spring.rabbitmq.publisher-returns=true       
@Configuration
@Slf4j
public class RabbitMQConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void enableConfirmCallback() {
        //confirm 监听,当消息成功发到交换机 ack = true,没有发送到交换机 ack = false
        //correlationData 可在发送时指定消息唯一 id
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if(!ack){
                //记录日志、发送邮件通知、落库定时任务扫描重发
            }
        });
        //当消息成功发送到交换机没有路由到队列触发此监听
        rabbitTemplate.setReturnsCallback(returned -> {
            //记录日志、发送邮件通知、落库定时任务扫描重发
        });
    }
}

实际在这两个监听里面去做重发并不是很多,因为成本太高了,首先 RabbitMQ 本身丢失的可能性就非常低,其次如果这里需要落库再用定时任务扫描重发还要开发一堆代码,分布式定时任务…再其次定时任务扫描肯定会增加消息延迟,不是很有必要。真实业务场景是记录一下日志就行了,方便问题回溯,顺便发个邮件给相关人员,如果真的极其罕见的是生产者弄丢消息,那么开发往数据库补数据就行了。


消息持久化


RabbitMQ收到消息后将这个消息暂时存在了内存中,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,需要给exchange、queue和message都进行持久化

    @Bean
    public Queue TestQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:该队列是否只供一个消费者进行消费是否进行消息共享,true可以多个消费者消费,false:只能-一个消费者消费
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        return new Queue("test",true,true,false);
    }

在Spring Boot中消息默认就是持久化的。

消息入库

消息入库,顾名思义就是将要发送的消息保存到数据库中。

发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示消息发送了但还没收到确认;收到确认后将status设为1,表示RabbitMQ已收到消息。

生产端这边还需要开一个定时器,定时检索消息表,将status=0并且超过固定时间后

(可能消息刚发出去还没来得及确认,这边定时器刚好检索到这条status=0的消息,所以要设置时间)

还没收到确认的消息取出重发 (消息重发可能会造成幂等性问题,这里消费端要做幂等性处理),可能重发还会失败,所以还要添加一个最大重发次数字段retry_count,超过就做另外的处理。

生产端的定时器可以使用:xxl-job - 分布式任务调度平台来干这个事情

这样消息就可以保证生产端的可靠性了

消费端可靠性投递

ACK机制改为手动

RabbitMQ的自动ack机制默认在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完。

我们需要进行手动消费

#开启手动ACK,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中
spring.rabbitmq.listener.simple.acknowledge-mode=manual

8a1402e8e6c74e33a656dfa91c4a1a92.png621dc95b0d7e4e86a9d113dd6b79da94.png这里要小心! basicNack 方法的第三个参数代表是否重回队列,通常代码的报错并不会因为重试就能解决,所以可能这种情况:继续被消费,继续报错,重回队列,继续被消费…死循环。

一定要有重发消息次数的限制,或者干脆不入队,发送到Redis进行下记录也行。

SpringBoot 提供的消息重试

SpringBoot 给我们提供了一种重试机制,当消费者执行的业务方法报错时会重试执行消费者业务方法。

启用 SpringBoot 提供的重试机制

spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数
spring.rabbitmq.listener.simple.max-attempts=3    
# 重试时间间隔
spring.rabbitmq.listener.simpleinitial-interval: 3000

消费者代码

   @RabbitListener(queues = "queue")
    public void listen(String object, Message message, Channel channel) throws IOException {
        try {
            /**
             * 执行业务代码...
             * */
            int i = 1 / 0; //故意报错测试
        } catch (Exception e) {
            log.error("签收失败", e);
            /**
             * 记录日志、发送邮件、保存消息到数据库,落库之前判断如果消息已经落库就不保存
             * */
            throw new RuntimeException("消息消费失败");
        }
    }

注意一定要手动 throw 一个异常,因为 SpringBoot 触发重试是根据方法中发生未捕捉的异常来决定的。值得注意的是这个重试是 SpringBoot 提供的,重新执行消费者方法,而不是让 RabbitMQ 重新推送消息。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
消息中间件
使用RabbitMQ如何保证消息不丢失 ?
RabbitMQ通过发布者确认、回执机制、消息持久化及消费者确认等方案,确保消息从发送到接收的每个环节都能有效防止丢失。即便如此,特殊情况下仍可能丢失,如系统故障等。为此,可设计消息状态表,记录消息ID、内容、交换机、路由键、发送时间和签收状态等,结合定时任务检查并重发未签收消息,以进一步提升消息传输的可靠性。
54 1
|
4月前
|
消息中间件 Java 开发者
如何避免RabbitMQ消息丢失?
本文探讨了RabbitMQ中如何避免消息丢失的问题。在默认情况下,RabbitMQ并不保证消息的持久性,但提供了多种机制来确保消息的可靠传输与处理。文章分析了消息可能丢失的关键环节,并介绍了相应的保证机制:发布者确认交换机已接收消息、确认队列接收消息、队列及消息的持久化,以及消费者成功处理消息后的确认。通过Java代码示例展示了如何在实际应用中实现这些机制。最终,确保了消息在从生产到消费的整个流程中的可靠性。
|
消息中间件 Java Maven
消息中间件系列教程(12) -RabbitMQ-消息确认机制
消息中间件系列教程(12) -RabbitMQ-消息确认机制
86 0
|
消息中间件 存储 Java
RabbitMQ如何保证消息的可靠性
RabbitMQ如何保证消息的可靠性
100 0
|
7月前
|
消息中间件 存储 Java
RabbitMQ中的消息持久化是如何实现的?
RabbitMQ中的消息持久化是如何实现的?
132 0
|
7月前
|
消息中间件 存储 程序员
四、RabbitMQ如何保证消息丢失
四、RabbitMQ如何保证消息丢失
73 0
|
消息中间件
rabbitmq重复确认导致消息丢失
rabbitmq重复确认导致消息丢失
|
消息中间件 存储 运维
优雅地处理RabbitMQ中的消息丢失
优雅地处理RabbitMQ中的消息丢失
103 0
|
消息中间件 存储
RabbitMQ如何保证消息发送成功
RabbitMQ如何保证消息发送成功
157 0
RabbitMQ如何保证消息发送成功
|
Java Spring
RabbitMQ-如何保证消息不丢失
RabbitMQ-如何保证消息不丢失
100 0