RabbitMQ如何保证消息的可靠性

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: RabbitMQ如何保证消息的可靠性

可靠性分析

RabbitMQ如何保证消息的可靠?如RabbitMQ基础概念中的架构模型
在这里插入图片描述
可以看到一条消息的传递过程:

  1. 发布者和RabbitMQ建立连接发送消息至交换机。
  2. 交换机和队列绑定,将消息路由到队列中。
  3. 消费者和RabbitMQ建立连接指定某个队列的消息进行消费。

在这过程中以下几个环节可能会丢失消息:

  1. 发布者到交换机环节。
  2. 交换机到队列环节。
  3. 队列到消费者环节。

如下图
在这里插入图片描述


可靠性方案

所以要保证消息的可靠性需要做到以下几点:

  1. 发布者需确认交换机接收到消息。
  2. 发布者需确认队列接收到消息。
  3. 保证队列及其中的数据持久化。
  4. 保证消费者的正常消费。

如何做到以上几点?RabbitMQ为了适应各个场景的使用,以上的功能需要开发者按照定义自行设置实现。


可靠性实现

以下实现为Java整合RabbitMQ实现,参考Java整合RabbitMQ实现生产消费(7种通讯方式)

确认Exchange接收到消息

构建channel时添加确认监听机制,当消息未发送至交换机时做补偿措施。

 channel.addConfirmListener((sequenceNumber, multiple) -> {
   
   
            System.out.println("消息成功发送到交换机");
        }, (sequenceNumber, multiple) -> {
   
   
            System.err.println("消息未发送到交换机,补偿操作。");
        });

确认Queue接收到消息

  1. 构建channel时添加return监听机制,当消息未路由至队列时做补偿措施。
channel.addReturnListener((replyCode, replyText, exchange, routingKey, basicProperties, body) -> {
   
   
            System.err.format("消息 %s 未路由到指定队列: %s, replyText: %s,replyCode: %d%n", body, routingKey, replyText, replyCode);
        });
  1. 发布消息时设置消息mandatory为true,开启return机制。
 channel.basicPublish("", "",true, "", "");

保证Queue及其数据持久化

  1. 构建队列时持久化队列。
    //构建队列,queueDeclare("队列名称","是否持久化队列","是否只允许一个队列消费","长时间未使用是否删除","其他参数")
    channel.queueDeclare("", true, false, false, null);
    
  2. 发布消息时设置消息持久化属性。
    //设置消息持久化
    AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
    channel.basicPublish("", "",true, basicProperties, "");
    

    保证消费者的正常消费

    当消息持久化至队列时已经保证了消息的可靠投递,为保证消息的正常消费,需要解决重复消费和消息丢失问题。

重复消费问题

  1. 业务处理完成,但是ack失败,消息被扔进队列,导致重复消费。
  2. 业务处理过程中,进程宕机,恢复进程后消费未ACK的消息导致重复消费。

针对第一个场景的解决方案:设置手动ACK,并且业务处理和ack操作在一个事务中。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
   
            String message = new String(delivery.getBody(), "UTF-8");
            //消息处理后手动ACK
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
// ack为false
channel.basicConsume("", false, deliverCallback, consumerTag -> {
   
   
});

针对第二个场景的解决方案:发布消息时设置业务唯一标识,在消费后进行存储,如果有相同标识前来消费直接拒绝即可(具体业务具体分析)。设置业务唯一标识方式:

String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
channel.basicPublish("", "", basicProperties, "");

消息丢失问题

在以下场景中可能会发生消息丢失问题:

  1. 业务处理失败,但是ack成功,导致消息丢失。

解决方案:设置手动ACK,并且业务处理和ack操作在一个事务中。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 存储 监控
|
3月前
|
消息中间件 安全 Java
【RabbitMQ高级篇】消息可靠性问题
【RabbitMQ高级篇】消息可靠性问题
91 0
|
3月前
|
消息中间件 存储 数据库
RabbitMQ之MQ的可靠性
RabbitMQ之MQ的可靠性
|
3月前
|
消息中间件 存储 运维
|
3月前
|
消息中间件 SQL Java
RabbitMQ之消费者可靠性
RabbitMQ之消费者可靠性
|
3月前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
1290 3
|
3月前
|
消息中间件 Java API
【微服务系列笔记】MQ消息可靠性
消息可靠性涉及防止丢失,包括生产者发送时丢失、未到达队列以及消费者消费失败处理后丢失。 确保RabbitMQ消息可靠性的方法有:开启生产者确认机制,确保消息到达队列;启用消息持久化以防止未消费时丢失;使用消费者确认机制,如设置为auto,由Spring确认处理成功后ack。此外,可开启消费者失败重试机制,多次失败后将消息投递到异常交换机。
66 1
|
3月前
|
消息中间件 供应链 Java
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
163 0
RabbitMQ入门指南(九):消费者可靠性
|
3月前
|
消息中间件 Java 微服务
RabbitMQ入门指南(七):生产者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消息丢失的可能性、生产者可靠性中的生产者重试机制和生产者确认机制等内容。
99 0
RabbitMQ入门指南(七):生产者可靠性
|
3月前
|
消息中间件 安全 Java
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
69 0