SpringAMQP开启“可靠性”机制

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: SpringAMQP开启“可靠性”机制

前言

上一篇介绍了如何在 《SpringBoot 中集成和使用消息队列》,看过这一篇就基本上可以在SpringBoot中使用消息队列了,但是消息队列他归根结底是一个客户端服务器模式的中间件,面对复杂的网络环境和分布式使用环境,难免会出现各种问题。出现问题不可怕,重点在于如何预防和处理,本章就重点介绍一下如何预防和处理使用SpringAMQP时可能出现的问题。

一、消息堆积

1、什么是消息堆积?

消息堆积指的是消费者这边的处理能力低于生产者这边生产消息的能力,导致大量的消息积压在MQ的一种现象。消息堆积可能导致短时间内队列达到最大容量,导致使新消息无法进入队列;对于时间敏感的消息可能成为死信。

2、使用 work 模式同时开启prefetch

work模式:简单来说就是让多个消息队列绑定到一个队列,共同消费队列中的消息。

默认情况下,消息队列是通过轮询的方式将消息推送给消费者的,完全不考虑消费者的消费能力。举个例子:假设生产者生产了50条消息,消费者1的处理能力是1秒50条,消费者2的消费能力是1秒5条,实际这五十条消息会通过轮询各分配给两个消费者25条,如果消费者还没处理完就会阻塞等待,处理完之后再继续推送。

所以默认情况并没有考虑到消费者是否已经处理完消息,可能也会造成消息堆积。怎么解决呢?可以通过修改配置文件:将prefetch设置为1,即每次给消费者投递一条消息,处理完了再投递下一条,这样可以尽可能发挥每个消费者的最大处理能力。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 #每次投递一条消息,消费完在投递下一条

3、对消息进行异步处理

再者就是可以在代码上进行优化,比如在消息处理的时候使用线程池进行异步消费,这样可以缩短每个消息的处理时间,降低消息堆积的可能性。

二、发送者可靠性

1、发送者重连机制

有时候可能因为网络波动,可能会出现客户端连接MQ失败的情况。这里可以通过重试机制来提高消息发送的成功率。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后等初始待时间
        multiplier: 1 # 失败后下次等待时长倍数
        max-attempts: 3 # 最大重试次数

2、发送者确认机制

在生产者这边,是可以开启确认机制的,就是MQ他在接收到消息成功后会返回一个ack给生产者,接收失败就返回nack,生产者这边就可以根据返回的结果,如果失败了就可以进行重发。RabbitMQ这边提供了两种确认机制:

  • Publisher Confirm:当生产者向消息队列发送消息时,如果有设备或网络故障导致消息丢失或其他错误,AMQP 协议会自动触发 Confirm 机制,将消息发送失败的信息返回给生产者。生产者可以根据返回的信息进行相应的处理,例如重发、记录日志等。
  • Publisher Return:消息路由失败时触发,一般不开启,因为路由失败是自己业务的问题
spring:
  rabbitmq:
    publisher-confirm-type: CORRELATED 
    # none: 关闭confirm机制 
    # simple: 同步阻塞等待MQ回执消息 
    # correlated: MQ异步回调方式返回回执消息
    publisher-returns: true 


对于ReturnCallback整个项目中配置一次即可:

@Slf4j
@Configuration
public class MqCommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback(路由失败时触发)
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.debug("收到消息的 return callback,exchange:{}, key:{}, msg:{}, code:{}, text:{}",
                        returnedMessage.getExchange(),returnedMessage.getRoutingKey(),
                        returnedMessage.getReplyCode(),returnedMessage.getReplyText());
            }
        });

    }
}

ConfirmCallback 每次发送消息都需要编写

@Test
void testConfirmCallback() throws InterruptedException {
    // 1.创建CorrelationData,并指定消息ID
    CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 基本不会触发
            log.error("消息回调失败",ex);
        }

        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 这里执行回调
            if (result.isAck()) {
                // 消息发送成功
                log.debug("消息发送成功,收到 ack");
            } else {
                // 消息发送失败
                log.debug("消息发送失败,收到 nack");
                // 重传等业务逻辑...
            }
        }
    });

    rabbitTemplate.convertAndSend("forum.direct","red","hello",cd);
}


虽然上述确认机制可以基本保证生产者发送消息的可靠性,但是会增加系统额外的负担和资源开销,因为生产者确认也需要通过MQ来执行回调,如果需要使用,不需要开启publisher return(自己代码写的有问题),对于nack也可以有限次重试,失败多了直接记录异常即可。

三、MQ可靠性

对于MQ本身是提供了持久化的功能的,可以给保证MQ重启数据不丢失。并且在持久化情况下开启生产者确认时,RabbitMQ只有在消息持久化完成之后才会给生产者返回ACK回执。

四、消费者可靠性

1、消息者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理结束消息之后,向RabbitMQ发送一个回执,告知RabbitMQ自己消息的处理状态:

  1. ack:成功处理消息,RabbitMQ从队列中删除消息
  2. nack:消息处理失败,RabbitMQ需要再次投递消息
  3. reject:消息处理失败并拒绝消息,RabbitMQ从队列中删除该消息
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: AUTO
        # none:不处理。消息投递给消费者后立即ack,消息立即从MQ中删除,不安全
    # manual:手动模式。需要自己在业务代码中调用api,发送ack或reject
    # auto:自动模式。SpringAMQP使用AOP对我们的消息处理逻辑进行了环绕增强,当业务正常执行时自动返回ack,异常时,如果是业务异常会返回nack,如果是消息处理或校验异常会返回reject

2、失败重试机制

当消费者处理消息出现异常后,MQ这边会再次将消息投递给消费者,如果无限失败就会无限重试,对于MQ和消费者来讲压力就比较大,可以利用SpringAMQP的retry进制,当消费者出现异常时限制重试次数:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: AUTO
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时间为1s
          multiplier: 1 # 下次失败的等待时长的倍数
          max-attempts: 3 # 最大重试次数
          stateless: true # true 无状态,false 有状态

开启重试机制后,如果重试次数耗尽,消息依然失败,就需要被MessageRecoverer接口来处理:


RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,默认的方式

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定交换机(建议)

下面演示第三中策略的接口配置实现:

@Configuration
public class ErrorConfiguration {
    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.exchange");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(Queue errorQueue,DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");
    }
}

这样即使重试次数耗尽,消息也不会丢失,而是投递到了 error.queue 的队列里面。

五、保证幂等性

1、什么是幂等性?

幂等性就是重复执行相同的操作,系统的状态不会发送变化。比如查询和删除这些操作本身就是幂等的,它们多次操作不会给系统造成状态不一致的影响。


上述机制可以保证消息“至少”被消费1次,但是由于网络的复杂性,可能生产者收不到ack,导致消息的重发,或者MQ这边没有收到消费者的ack,导致消息的重复投递,这些都可能造成消息的重复消费,所以这个时候就要考虑幂等性问题了。

2、使用唯一 ID

生产者这边在给RabbitMQ投递消息的时候,附带一个唯一消息的ID,RabbitMQ这边它是自带去重功能的,就是相同ID的消息它是只存储一份的.

消费者这里,他就可以消费完一条消息后,先将消息ID存起来,然后后面的消息根据ID进行判断是否是重复消息,如果重复直接丢弃就行了.

给消息设置ID的方法:

@Configuration
public class Config {
  @Bean
  public MessageConverter messageConverter() {
     Jackson2JsonMessageConverter jjmc =  new Jackson2JsonMessageConverter();
     jjmc.setCreateMessageIds(true);
     return jjmc;
  }
}

3、针对业务进行判断

以支付扣减余额和修改订单状态为例:

  1. 首先支付服务会在余额扣减成功后利用MQ将消息通知给修改订单状态的服务.
  2. 修改订单状态之前,先查询订单状态,只有已支付的订单才做修改,这样就可以在业务上保证幂等.

六、实现延迟消息

1、借助死信对列

因为对于那些超时为处理的消息,MQ会投递到死信对列,我们就可以借助这个特性,先将消息投递到到一个普通的对列中,然后如果超时就直接投到了死信对列,然后就让消费者监听死信对列,就可以实现延迟消息了。(PS:对列通过dead-letter-exchange属性绑定的交换机就称为 死信交换机。)

发送延迟消息:

void testSendTTLMessage() {
  Message message = MessageBuilder
  .withBody("hello".getBytes(StandardCharsets.UTF_8))
  .setExpiration(5000).build(); // 5秒钟的延迟消息
  rabbitTemplate.convertAndSend("simple.direct1","testKey",message);
}

2、使用RabbitMQ官方提供的插件

在RabbitMQ中,官方是推出了一种原生支持延迟消息的插件的。原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后暂存一段时间,到期后投递到队列。下面讲解插件使用:

消费者声明

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue",durable = "true"),
            exchange = @Exchange(name = "delay.direct",delayed = "true",type = ExchangeTypes.DIRECT),
            key = "delay"
    ))
    public void listenDelayMessage(String msg) {
        log.info("接收到delay.queue的延迟消息 {}"+msg);
    }

生产者发送

    @Test
    void testSendDelayMessage() throws InterruptedException {
        rabbitTemplate.convertAndSend("delay.direct", "delay", "hello,delay!", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 延迟10秒
                message.getMessageProperties().setDelay(5000);
                return message;
            }
        });
        log.info("延迟消息发送成功!");
    }

如果感觉这一大串太麻烦,可以将 new MessagePostProcessor() 分离出来:

// 封装专门用来发送延迟消息的处理器
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {

    private final int delay;

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(delay);
        return message;
    }
}
    @Test
    void testSendDelayMessage2() throws InterruptedException {
        rabbitTemplate.convertAndSend("delay.direct", "delay", "hello,delay!", new DelayMessageProcessor(5000));
        log.info("延迟消息发送成功!");
        Thread.sleep(1000);
    }


相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
云原生实践公开课
课程大纲 开篇:如何学习并实践云原生技术 基础篇: 5 步上手 Kubernetes 进阶篇:生产环境下的 K8s 实践 相关的阿里云产品:容器服务&nbsp;ACK 容器服务&nbsp;Kubernetes&nbsp;版(简称&nbsp;ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情:&nbsp;https://www.aliyun.com/product/kubernetes
相关文章
|
1月前
|
消息中间件 存储 监控
|
1月前
|
网络协议 算法 网络性能优化
|
8月前
|
存储 Linux 调度
确保并发执行的安全性:探索多线程和锁机制以构建可靠的程序
在当今计算机系统中,多线程编程已成为常见的需求,然而,同时也带来了并发执行的挑战。为了避免数据竞争和其他并发问题,正确使用适当的锁机制是至关重要的。通过阅读本文,读者将了解到多线程和锁机制在并发编程中的重要性,以及如何避免常见的并发问题,确保程序的安全性和可靠性。通过实际案例和代码示例来说明如何正确地使用多线程和锁机制来构建可靠的程序。
23 1
|
28天前
|
监控 物联网 Java
打造高可用系统:深入了解心跳检测机制
本文介绍了分布式系统中**心跳检测**的重要机制,用于监测系统节点的健康状态和通信畅通。心跳检测通过定期发送信号,若节点在预定期限内未响应则视为可能失效。处理机制包括重试、报警和自动修复。文章还提到了**周期检测**和**累计失效检测**两种策略,并给出Java代码示例展示心跳检测实现。此外,列举了心跳检测在分布式数据库、微服务和物联网等场景的应用,以及优化策略如动态调整心跳频率和优化超时机制。最后,强调了心跳检测对系统稳定性和高可用性的关键作用。
102 2
|
12天前
|
消息中间件 Kafka API
深入解析Kafka消息传递的可靠性保证机制
深入解析Kafka消息传递的可靠性保证机制
16 0
|
11月前
|
消息中间件 存储 Kafka
MQ 学习日志(六) 保证消息的可靠性传输
消息的可靠性传输 简述
83 0
|
程序员 C++ 开发者
C++异常和错误处理机制:如何使您的程序更加稳定和可靠
在C++编程中,异常处理和错误处理机制是非常重要的。它们可以帮助程序员有效地处理运行时错误和异常情况。本文将介绍C++中的异常处理和错误处理机制。
93 0
|
网络协议 算法
如何保证TCP的稳定性和流速控制
TCP粘包和拆包中保证顺序的具体算法是TCP滑动窗口算法。 TCP作为一个传输层协议,最核心的能力是传输。传输需要保证可靠性,还需要控制流速,这两个核心能力均由滑动窗口提供。 滑动窗口数据结构
75 0
|
Java 测试技术 API
可靠性利器-重试机制
在日常开发中,我们经常会遇到需要调用外部服务和接口的场景。外部服务对于调用者来说一般都是不可靠的,尤其是在网络环境比较差的情况下,网络抖动很容易导致请求超时等异常情况,这时候就需要使用失败重试策略重新调用 API 接口来获取。重试策略在服务治理方面也有很广泛的使用,通过定时检测,来查看服务是否存活。
856 0
|
人工智能 算法 BI
3.4 预读机制
<div class="bct fc05 fc11 nbw-blog ztag"><div> <p style="TEXT-INDENT: 21pt;"><span style="FONT-FAMILY: 宋体; mso-ascii-font-family: 'Times new roman'; mso-hansi-font-family: 'Times new roman';">随着处理器
2053 0