rabbitmq 重复ACK导致消息丢失

简介: rabbitmq 重复确认导致消息丢失 背景 rabbitmq 在应用场景中,大多采用工作队列 work-queue的模式。 在一个常见的工作队列模式中,消费者 worker 将不断的轮询从队列中拉取最新消息,当队列负载压力增大时允许添加多个worker 进行处理。

rabbitmq 重复确认导致消息丢失

背景

rabbitmq 在应用场景中,大多采用工作队列 work-queue的模式。

在一个常见的工作队列模式中,消费者 worker 将不断的轮询从队列中拉取最新消息,当队列负载压力增大时允许添加多个worker 进行处理。
然而执行一个任务可能需要相当的时长,这是由业务特性所决定的;如果 worker执行任务过程中出现异常甚至宕机,此时消息便会丢失,这是简单消息队列难以解决的问题。

rabbitmq 采用了消息确认机制来防止此类问题,在该机制中,worker需要向 MQ Server 返回 ACK响应以表示消息已确认处理;
在以下情况下,rabbitmq 会对消息进行重新投递:
1 client 未响应ACK, 主动关闭 Channel;
2 client 未响应ACk, 网络异常断开;

消息的重发机制没有超时限制,只要client 不响应ACK,那么会一直投递;
如果启用了消息持久化机制,那么消息将有进一步的保障。

问题描述及分析

1 客户端为简化应答处理,可以设置自动应答选项,如:

  boolean autoAck = false;
  channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

2 如果不启用自动应答,需要应用代码手动进行应答:

    try {
          doWork(message);
        } finally {
          logger.info(" xxx work done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }

3 当两种方案同时存在

由于客户端的编码失误,先启用了自动应答选项,又在应用代码执行了应答的代码:
     // enable autoAck
     boolean autoAck = true;
     consumerChannel.basicConsume(queueName, autoAck, this);

     //...

     // snipper from Consumer.handleDelivery method
     // send ack to server  
     try {
            consumerChannel.basicAck(deliveryTag, true);
        } catch (Exception e) {
     }

多了一次确认,应用代码貌似一切如常。 但在频繁进行消息收发测试时发现 消息存在随机性丢失处理的情况!
检查 rabbitmq server日志发现以下异常:

  {amqp_error,precondition_failed,"unknown delivery tag 1",'basic.ack'}
  ...
  {amqp_error,precondition_failed,"unknown delivery tag 1",'basic.ack'}
  ...
  {amqp_error,precondition_failed,"unknown delivery tag 1",'basic.ack'}
  ...

提示未知的 delivery tag=1,该字段为MQ server 用于消息确认的标记,服务端因无法识别而打印错误。
另外一个现象则是,连续收发消息 5次,其中丢失消息处理1次,而 rabbitmq server错误日志出现 4次!

经过分析,发现问题原因所在:
rabbitmq 为每一个channel维护了一个delivery tag的计数器,这里采用正向自增,新消息投递时自增,当消息响应时自减;
在连续收发的场景中,由于消息发送的间隔较短,部分消息因 consumer的重复确认被rabbitmq 当做已处理而丢弃。

解决方案

取消consumer 的自动应答机制,仅保留手动应答的处理,问题解决。

参考资料

关于 rabbitmq 消息确认机制:
http://www.rabbitmq.com/confirms.html#when

img_9b09a36f6de95886f52ce82fa1e89c88.jpe

作者: zale

出处: http://www.cnblogs.com/littleatp/, 如果喜欢我的文章,请关注我的公众号

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出 原文链接  如有问题, 可留言咨询.

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
消息中间件 存储 Kubernetes
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
k8s1.20版本部署RabbitMQ集群(持久化)——2023.05
1353 1
|
11月前
|
消息中间件 数据采集 数据库
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
105 1
|
消息中间件 NoSQL 关系型数据库
【Kubernetes部署Shardingsphere、Mycat、Mysql、Redis、中间件Rocketmq、Rabbitmq、Nacos】
【Kubernetes部署Shardingsphere、Mycat、Mysql、Redis、中间件Rocketmq、Rabbitmq、Nacos】
437 0
|
消息中间件 存储 安全
|
消息中间件
rabbitmq消息的确认机制ack
rabbitmq消息的确认机制ack
|
消息中间件 存储 前端开发
硬核!SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
硬核!SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
557 0
硬核!SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
|
消息中间件
RabbitMQ手动ACK与死信队列
RabbitMQ手动ACK与死信队列
544 0
|
消息中间件 JavaScript 前端开发
JavaScript 连接消息(RabbitMQ)
JavaScript 连接消息(RabbitMQ)
JavaScript 连接消息(RabbitMQ)
|
消息中间件 NoSQL 关系型数据库
RabbitMQ消息丢失、积压、重复等解决方案
RabbitMQ消息丢失、积压、重复等解决方案
RabbitMQ消息丢失、积压、重复等解决方案
|
消息中间件 Java 网络架构
SpringBoot整合RabbitMQ 实现五种消息模型
SpringBoot整合RabbitMQ 实现五种消息模型
307 2