RabbitMQ实战-消费端ACK、NACK及重回队列机制(上)

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: RabbitMQ实战-消费端ACK、NACK及重回队列机制(上)

当连接失败时,消息可能还在客户端和服务器之间传输 - 它们可能处于两侧的解码或编码的中间过程,在 TCP 堆栈缓冲区中,或在电线上飞行。

在这种情况下,传输中的信息将无法正常投递 - 它们需要被重新投递。Acknowledgements机制让服务器和客户端知道何时需要重新投递。


根据定义,使用消息代理(如RabbitMQ)的系统是分布式的。由于发送的协议方法(消息)不能保证到达协作方或由其成功处理,因此发布者和消费者都需要一个投递和处理确认的机制。

  • 从消费者到 RabbitMQ 的递送处理确认,在消息协议中称为acknowledgements
  • broker对publishers的确认是一个协议扩展,称为publisher confirms


这两个功能都启发于 TCP。它们对于从publish到MQ节点和从MQ节点到消费者的可靠投递都至关重要。即对于数据安全至关重要,应用程序对数据安全的责任与MQ节点一样多。


当 RabbitMQ 向 Con 传递消息时,它需要知道何时考虑该消息才能成功发送。什么样的逻辑是最佳的取决于系统。因此,它主要是应用决定的。在 AMQP 0-9-1 中,当 Con:


  • 使用basicConsume方法进行注册
    image.png
  • 或使用basicGet 方法按需获取消息

image.png

就会进行。

Consumer Acknowledgement Modes and Data Safety Considerations

当节点向消费者传递消息时,它必须决定该消息是否应由消费者考虑处理(或至少接收)。由于多种内容(客户端连接、消费者应用等)可能会失败,因此此决定是数据安全问题。消息传递协议通常提供一个确认机制,允许消费者确认交付到他们连接到的节点。是否使用该机制由消费者订阅时决定。根据使用的确认模式,RabbitMQ 可以考虑在消息发出后立即成功传递(写入 TCP 插座)或收到明确(‘手册’)客户确认时。手动发送的确认可能是正面的或负面的,并使用以下协议方法之一:

basic.ack

积极地确认

basic.nack

消极地确认。

basicReject

消极地确认,但还有一个limitation

image.png

Delivery Identifiers: Delivery Tags

如何确定投递(确认表明他们各自的投递)。当一个 Con(订阅)被注册,MQ将使用basic.deliver方法发送(推送)消息。该方法带有delivery tag,该tag可唯一标识channel上的投递。因此,Delivery tags作用域在每个 channel 内。交


Delivery Tags是单调增长的正整数,并由客户库提供。客户端库方法,承认交付以交付标签作为参数。由于每个通道的递送标签范围很广,因此必须在接收的同一通道上确认交付。在不同的通道上确认将导致’未知交货标签’协议异常并关闭通道。


Positively Acknowledging Deliveries

用于交付确认的 API 方法通常暴露为客户库中通道上的操作。Java 客户端用户将使用channel:

// 假设已有channel实例
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
             // positively acknowledge a single delivery, the message will
             // be discarded
             channel.basicAck(deliveryTag, false);
         }
     });

Acknowledging Multiple Deliveries at Once

Manual确认模式可批量进行,以减少网络流量。这是通过将acknowledgement方法的multiple字段设置为true来实现的。


basicReject在历史上都没有该字段,这就是为什么basicNack被MQ引入,作为协议的扩展。


当multiple=true,MQ 将确认所有未完成的delivery tag,并包括确认中指定的tag。

与确认相关的其他所有内容一样,这个作用域是channel内。比如,假定channel Ch 上有未确认的delivery tag 5、6、7 和 8,当一个delivery tag=8、multiple=true的acknowledgement frame到达该channel时,则从 5 到 8 的所有投递都将被确认。

若multiple=false,则仍不确认投递 5、6 和 7。


要确认与MQ Java客户端的多次投递,将Channel#basicAck的multiple参数设置为 true。

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
             // positively acknowledge all deliveries up to
             // this delivery tag
             channel.basicAck(deliveryTag, true);
         }
     });

Negative Acknowledgement and Requeuing of Deliveries

有时,消费者无法及时处理投递,但其他实例可能能够处理。这时可能更想让它重新入队,让其他Con接收和处理它。

basicReject和basicNack就是用于实现这种想法的两个协议方法。这些方法通常用于消极地确认投递。


此类投递可被Broker丢弃或重新入队。此行为由requeue字段控制:


  • 当字段设置为true,Broker将用指定的delivery tag重新入队投递(或多个投递)。


这两个方法通常暴露作为客户端库中channel上的操作。Java 客户端用户可以调用:

  • Channel#basicReject
  • Channel#basicNack
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
             // negatively acknowledge, the message will
             // be discarded
             channel.basicReject(deliveryTag, false);
         }
     });

RabbitMQ ACK 机制的意义

ACK机制可以保证Con拉取到了消息,若处理失败了,则队列中还有这个消息,仍然可以给Con处理。


ack机制是 Con 告诉 Broker 当前消息是否成功消费,至于 Broker 如何处理 NACK,取决于 Con 是否设置了 requeue:如果 requeue=false, 则NACK 后 Broker 还是会删除消息的。


但一般处理消息失败都是因为代码逻辑出bug,即使队列中后来仍然保留该消息,然后再给Con消费,依旧报错。

当然,若一台机器宕机,消息还有,还可以给另外机器消费,这种情景下 ACK 很有用。


如果不使用 ACK 机制,直接把出错消息存库,便于日后查bug或重新执行。 参考 Quartz 定时任务调度,Quartz可以让失败的任务重新执行一次,或者不管,或者怎么怎么样,但是 RabbitMQ 好像缺了这一点。

1 ACK和NACK

当设置autoACK=false 时,就可以使用手工ACK。

其实手工方式包括了手工ACK、手工NACK。

  • 手工 ACK 时,会发送给Broker一个应答,代表消息处理成功,Broker就可回送响应给Pro
  • NACK 则表示消息处理失败,如果设置了重回队列,Broker端就会将没有成功处理的消息重新发送

使用方式

Con消费时,若由于业务异常,可手工 NACK 记录日志,然后进行补偿

void basicNack(long deliveryTag, 
         boolean multiple,
         boolean requeue)

如果由于服务器宕机等严重问题,就需要手工 ACK 保障Con消费成功

void basicAck(long deliveryTag, boolean multiple)


相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
1月前
|
消息中间件 新零售 弹性计算
云消息队列 RabbitMQ 版入门训练营,解锁对比开源优势与零基础实战
欢迎加入「云消息队列 RabbitMQ 版入门训练营」。
|
22天前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
27天前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
5天前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
13 0
|
5天前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
11 0
|
5天前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
10 0
|
1月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
35 0
说说RabbitMQ延迟队列实现原理?
|
1月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
43 1
|
2月前
|
消息中间件 存储 监控
RabbitMQ 死信队列
RabbitMQ的死信队列(DLQ)是存储无法正常消费消息的特殊队列,常见于消息被拒绝、过期或队列满时。DLQ用于异常处理、任务调度和监控,通过绑定到普通队列自动路由死信消息。通过监听死信队列,可以对异常消息进行补偿和进一步处理,提升系统稳定性和可维护性。
33 1
|
2月前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
39 0

推荐镜像

更多