RabbitMQ入门指南(九):消费者可靠性

简介: RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。

当RabbitMQ向消费者投递消息后,了解消费者的处理状态是非常重要的。因为消息的投递并不代表消费者一定能够正确地消费这些消息,可能会出现各种故障:

  • 网络故障:在消息投递过程中,如果RabbitMQ和消费者之间的网络连接出现故障,可能会导致消息无法正确投递给消费者。
  • 消费者宕机:如果消费者在接收消息后突然宕机,那么消息可能无法被正确处理。
  • 消费者处理异常:消费者在接收到消息后,由于处理不当或者出现异常情况,可能会导致消息处理失败。

以上情况都可能导致消息丢失,因此RabbitMQ需要知道消费者的处理状态,以便在消息处理失败时重新投递消息。


一、消费者确认机制

RabbitMQ的消费者确认机制Consumer Acknowledgement)是一种确保消息被成功处理的机制。当消费者处理消息结束后,需要向RabbitMQ发送一个回执,以告知消息的处理状态。这个机制对于确保消息的可靠传递非常重要,因为它可以防止消息在消费者端处理失败而没有被正确处理的情况。

回执有三种可选值:

  • ACK(确认):表示消费者成功处理了消息,RabbitMQ会从队列中删除该消息。
  • NACK(否定确认):表示消息处理失败,RabbitMQ需要再次投递该消息。
  • REJECT(拒绝):表示消息处理失败并且被拒绝,RabbitMQ会从队列中删除该消息。

在实际应用中,一般使用ACK或NACK两种方式。REJECT方式的使用相对较少,通常只在消息格式存在问题,即存在开发错误的情况下使用。因此大多数情况下需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ACK,处理失败时返回NACK。

在consumer服务的application.yml文件中添加配置修改Spring AMQP的ACK处理方式 :

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

image.gif

RabbitMQ 支持三种不同的确认模式,这些模式通过acknowledge-mode属性进行配置:

  • none:关闭ACK。消费者接收到消息后不需要发送任何确认给发送者,发送者将继续发送下一条消息。在这种模式下,如果消费者处理消息失败,消息将会丢失,无法保证消息的可靠性。
  • manual:手动ACK。消费者接收到消息后需要手动发送确认给发送者,发送者才会继续发送下一条消息。在这种模式下,如果消费者处理消息失败,可以手动发送NACK给发送者,告诉发送者这条消息处理失败,以便发送者重新发送消息。这种模式可以保证消息的可靠性,但需要消费者手动处理确认和NACK。
  • auto:自动ACK。Spring AMQP提供了一种自动的消息确认机制。它利用AOP(面向切面编程)对消息处理逻辑做了环绕增强。当业务正常执行时,Spring AMQP会自动返回ACK。当业务出现异常时,根据异常判断返回不同结果:业务异常,自动返回NACK;消息处理或校验异常,自动返回REJECT。

二、失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 如果消费者持续出现异常,消息会不断地在队列中重新排队并重新发送,这可能会导致消息处理延迟和队列持续增长,给系统带来不必要的压力。

为了解决这个问题,Spring框架提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到MQ队列 。

在consumer服务的application.yml文件中添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 1000ms
          multiplier: 1
          max-attempts: 3
          stateless: true

image.gif

enabled: true 开启消费者失败重试
initial-interval: 1000ms 初始的等待时长
multiplier: 1 每次重试的等待时长是上次等待时长的倍数
max-attempts: 3 最大重试次数
stateless: true true表示重试是无状态的,即每次重试都是独立的,不会考虑之前的重试状态。如果业务中包含事务,需要改为false。

通过这样的配置,当消费者出现异常时,消息会在本地进行重试,而不是无限期地重新排队发送。在达到最大重试次数后,SpringAMQP会抛出AmqpRejectAndDontRequeueException异常,并将消息从队列中删除。这意味着最后一次处理消息的结果是失败的,并且消息不会被重新排队发送给消费者。

这种失败重试机制可以有效地减少消息处理的延迟和队列的增长,提高系统的稳定性和可用性。当然,在使用失败重试机制时,也需要考虑到业务逻辑和异常处理的合理性,避免因过度重试而导致的问题。

三、失败处理策略

在失败重试机制中,当达到最大重试次数后,消息会被直接丢弃。尽管这在某些场景中可能是可接受的,但对于那些对消息可靠性要求极高的业务来说,这显然是一个潜在的风险点。

Spring AMQP为此提供了强大的支持,允许开发人员自定义重试次数耗尽后的消息处理策略。这个策略是由MessageRecovery接口来定义的,它有三种不同的实现方式:

  • RejectAndDontRequeueRecoverer:当重试次数耗尽后,直接拒绝消息,并丢弃该消息。这是默认的处理方式。
  • ImmediateRequeueMessageRecoverer:当重试次数耗尽后,返回一个NACK给生产者,使消息重新入队,以便再次发送。
  • RepublishMessageRecoverer:当重试次数耗尽后,可以将失败的消息投递到一个指定的交换机和队列中,这个交换机和队列专门用来存放异常的消息。

在处理策略中,一种比较合适的方式是使用RepublishMessageRecoverer。当消息失败后,它会被投递到一个特定的、专门用于存放异常消息的队列中。这个队列可以由人工进行集中处理,使得开发人员可以更精细地处理和诊断问题。

在consumer服务中定义处理失败消息的交换机和队列:

@Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

image.gif

定义一个RepublishMessageRecoverer:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

image.gif

完整代码如下:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {
    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

image.gif

通过这样的配置,当消息在尝试多次重试后仍然失败时,它们会被自动投递到定义的异常消息队列中。这样就可以集中处理这些异常消息,进行进一步的诊断或处理。这种策略为开发人员在处理复杂分布式系统中的消息问题提供了一种更加专业和灵活的方式。

四、业务幂等性

在计算机科学和软件开发中,幂等性是一个重要的概念。简单来说,如果一个操作或函数不论执行一次还是多次,其结果都是相同的,那么称这个操作或函数是幂等的。在业务处理中,幂等性尤其关键。它可以保证系统的稳定性,确保在某些异常情况下,多次执行某个业务操作不会对业务状态产生不一致的结果。幂等性的重要性在于它能够避免因重复执行操作而产生的数据不一致、状态冲突等问题。在涉及金融交易、库存管理、用户认证等关键领域,幂等性是确保系统稳定和数据准确的重要前提。

1.通过唯一标识符保证操作的幂等性

为每个操作生成唯一的标识符(如ID),并在系统中跟踪这些标识符以检测重复操作。当接收到具有已知标识符的操作时,可以跳过重复的操作。Spring AMQP的MessageConverter自带了MessageID的功能,只要开启这个功能即可。

Jackson的消息转换器示例:

@Bean
public MessageConverter messageConverter(){
    // 定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 配置自动创建消息ID,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

image.gif

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送:

@Test
    void testSendMessage2Queue() {
        String queueName = "demo.queue";
        String msg = "Idempotent Test";
        rabbitTemplate.convertAndSend(queueName, msg);
    }

image.gif

运行测试用例,查看结果:


2.通过业务判断保证操作的幂等性

业务判断,是一种基于业务逻辑和状态的检查,以确定是否对重复的请求或消息进行处理。在多种业务场景中,这一策略的思路各有不同。

比如在支付订单的案例中,业务逻辑主要是支付并将订单状态从“未支付”修改为“已支付”(需要防止重复支付)。因此,在执行这一业务时,可以判断订单的状态是否为“未支付”。若状态不是“未支付”,则说明该订单已经被处理过,无需重复处理。与基于唯一标识符的方案相比,业务判断方案无需对原有数据库进行改造,因此更为推荐。

以支付修改订单的业务为例:

@Override
    public void markOrderPaySuccess(Long orderId) {
        // 查询订单
        Order order = getById(orderId);
        // 判断订单状态,订单不存在或者订单状态不是1,放弃处理
        if (order == null || order.getStatus() != 1) {
            return;
        }
        // 尝试更新订单
        order.setStatus(2);
        order.setPayTime(LocalDateTime.now());
        orderService.updateById(order);
    }

image.gif

以上代码示例判断和更新是两步动作 ,极小概率下可能存在线程安全问题,所以可以进行以下修改:

@Override
    public void markOrderPaySuccess(Long orderId) {
        // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
        orderService.lambdaUpdate()
                .set(Order::getStatus, 2)
                .set(Order::getPayTime, LocalDateTime.now())
                .eq(Order::getId, orderId)
                .eq(Order::getStatus, 1)
                .update();
    }

image.gif


总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容,希望对大家有所帮助。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
13天前
|
消息中间件 Java RocketMQ
MQ产品使用合集之在同一个 Java 进程内建立三个消费对象并设置三个消费者组订阅同一主题和标签的情况下,是否会发生其中一个消费者组无法接收到消息的现象
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
13天前
|
消息中间件 存储 运维
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
999 3
|
13天前
|
消息中间件 安全 Java
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
46 0
|
13天前
|
消息中间件 RocketMQ
RocketMq消费者/生产者配置
RocketMq消费者/生产者配置
|
13天前
|
消息中间件 Java API
【微服务系列笔记】MQ消息可靠性
消息可靠性涉及防止丢失,包括生产者发送时丢失、未到达队列以及消费者消费失败处理后丢失。 确保RabbitMQ消息可靠性的方法有:开启生产者确认机制,确保消息到达队列;启用消息持久化以防止未消费时丢失;使用消费者确认机制,如设置为auto,由Spring确认处理成功后ack。此外,可开启消费者失败重试机制,多次失败后将消息投递到异常交换机。
44 1
|
13天前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
87 0
|
13天前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
60 0
|
13天前
|
消息中间件 网络协议 JavaScript
MQTT常见问题之微消息队列mqtt支持ipv6失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
13天前
|
消息中间件 物联网 Java
MQTT常见问题之微消息队列配置失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
13天前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
46 2