RabbitMQ——高级篇

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: RabbitMQ——高级篇

一、MQ的常见问题

  • 消息可靠性问题:如何确保发送的消息至少被消费一次
  • 延迟消息问题:如何实现的延迟投递
  • 消息堆积问题:解决数百万的消息堆积无法及时消费的问题
  • 高可用问题:如何避免单点的MQ故障而导致的不可用问题

二、消息可靠性问题

消息从生产者发送到exchange,再到queue,再到消费者,这个过程中有可能会导致消息丢失:

  • 发送时丢失:生产者发送的消息未送达exchange,消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

宕机指的是计算机系统或设备因为各种原因(如硬件故障、软件错误、网络问题等)而无法正常运行或停止工作的状态。

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息在发送到MQ过程中丢失,消息发送到MQ后会返回一个结果给发送者来表示消息是否处理成功,结果有如下的两种请求:

  • publisher-confirm:发送者确认,消息成功投递到交换机返回ack,消息未投递到交换机,返回nack
  • publisher-return:返回者回执,消息投递到交换机,但是没有路由到队列就会返回ACK以及路由失败的原因

注意:在确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同的消息,避免ack冲突

消息持久化

MQ默认是内存存储消息,开启持久化就可以确保MQ的消息不会丢失。

SpringAMQP中的消息默认是持久的,也可以通过以下方式实现消息持久化:

交换机持久化:

    @Bean
    public DirectExchange simpleDirect(){
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 
        return new DirectExchange("simple.direct",true,false);
    }

队列持久化:

    @Bean
    public Queue simpleQueue(){
        // 使用QueueBuilder构建队列,durable就是持久化的
        return QueueBuilder.durable("simple.queue").build();
    }

消息持久化:SpringAMQP中的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定

    @Test
    public void testDurableMessage() {
        // 1.准备消息
        Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("simple.queue", message);
    }

消费者消息确认

RabbitMQ支持消费者确认机制的,消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除消息,在SpringAMQP中允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ackMQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

需要在配置文件中进行修改:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none # none,关ack; manual,手动ack; auto: 自动ack

失败重试机制

当消费者出现异常后,消息会不断重新入队列,再重新发送给消费者,然后再次异常,再次重新入队,会出现无限循环导致mq的消息处理飙升,带来不必要的压力。

那么我们就可以使用Spring的retry机制,在消费者出现异常时先利用本地重试,而不是无限制的重新入队列。

在配置文件中进行如下修改:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试机制
          initial-interval: 1000 # 初始的失败等待时长是1s
          multiplier: 3  ## 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless:true  # true无状态;false有状态。如果业务中包合事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐使用)

RepublishMessageRecoverer的具体代码实现

定义交换机和队列,并进行绑定

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
 
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }
 
    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

定义RepublishMessageRecoverer:

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }

综上所述, 可以从如下方面实现RabbitMQ消息的可靠性:

  • 开启生产者确认机制,确保生产者的消息能够到达队列
  • 开启持久化功能,确保队列中的消息在未消费前不会丢失
  • 开启消费者确认机制为auto,又spring确认消息处理成功之后返回ack
  • 开启消费者失败重试机制,设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,由人工进行处理

三、死信交换机

简介死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。


TTL超时机制

TTL,也就是Time-To-Live(存活时间)如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间

当队列和消息本身都设置了存活时间时,以时间短的ttl为准

延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

RabbitMQ的官方也推出了rabbitmq_delayed_message_exchange插件,原生支持延迟队列效果RabbitMQ的官方插件社区进行下载安装到云服务器上。

在Java代码中就可以使用SpringAMQP的延迟队列插件DelayExchange,其本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。然后我们向这个delay为true的交换机中发送消息,一定要给消息添加一个header:x-delay,值为延迟的时间,单位为毫秒

四、惰性队列

消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

解决消息堆积有三种种思路:

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积,提高堆积上限

惰性队列

惰性队列具备如下的特征:

  • 接收消息后直接存入的是磁盘而不是内存
  • 消费者消费消息是从磁盘中进行读取再加载到内存中的
  • 支持数百万条的消息存储

设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。

用SpringAMQP声明惰性队列分两种方式:

基于Bean的方式:

    @Bean
    public Queue lazyQueue() {
        return QueueBuilder.durable("lazy.queue")
                .lazy() # 开启x—queue-mode为lazy
                .build();
    }

基于注解的方式:

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了delay.queue的延迟消息");
    }

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 存储 Java
RabbitMQ之发布确认高级
【1月更文挑战第10天】 在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:
229 9
|
6月前
|
消息中间件 安全 Java
【RabbitMQ高级篇】消息可靠性问题
【RabbitMQ高级篇】消息可靠性问题
162 0
|
6月前
|
消息中间件 存储 Java
RabbitMQ高级
RabbitMQ高级
44 0
|
6月前
|
消息中间件 存储 Java
【RabbitMQ教程】第七章 —— RabbitMQ - 发布确认高级
【RabbitMQ教程】第七章 —— RabbitMQ - 发布确认高级
|
消息中间件 存储 负载均衡
工作八年?是高级开发?竟然答不出:如何保证RabbitMQ的高可用?
一个8年工作经验的小伙伴,被问到这样一个问题,说如何保证RabbitMQ的高可用。关于这个问题呢,这位小伙伴倒是有个实操经验,就是不知道如何组织语言。所以,当时面试结果不太理想。今天,我给大家分享一下我的理解。
152 0
|
消息中间件 Java 测试技术
【RabbitMQ高级篇】消息可靠性问题(1)(下)
【RabbitMQ高级篇】消息可靠性问题(1)(下)
122 0
|
消息中间件 安全 Java
【RabbitMQ高级篇】消息可靠性问题(1)(上)
【RabbitMQ高级篇】消息可靠性问题(1)
100 0
|
存储 消息中间件
十二、RabbitMQ高级 - 惰性队列
十二、RabbitMQ高级 - 惰性队列
|
消息中间件
十一、RabbitMQ高级 - 延迟队列
十一、RabbitMQ高级 - 延迟队列
|
消息中间件
十、RabbitMQ高级 - 死信交换机
十、RabbitMQ高级 - 死信交换机