记生产环境 rabbitmq 部分客户端 channel 持续积压消息不进行ack

简介: 记生产环境 rabbitmq 部分客户端 channel 持续积压消息不进行ack

0. 服务配置

rabbitmq 集群(普通集群模式)
消费者 三台 消费线程各消费者 10
消费者配置 使用 spring-amqp|auto-ack 模式

1. 故障发现

近日有同学发现一个业务队列存在上千个 unacked 消息,并且有持续上涨的趋势。

2. 故障表现

队列下其中两个客户端的各一个 channel 分别阻塞几百条数据,并且在持续累加,重启应用后队列 unacked 消息全部进入 ready 状态等待重消费,但是重启后客户端依然有 channel 重新开始堆积并且在趋势上涨。

3. 问题排查

排查思路

检查 mq 控制台是否是队列创建问题
消费者阻塞是否有规律可循(未ack数据是否有共同特征、阻塞客户端配置是否有问题)
客户端代码是否有问题、应用是否有jvm级别故障

4. 问题定位

经过一番筛查,问题定位到了代码部分,队列消费代码并非刚上线,而是在前一日服务重启后出现的这个问题,重新 review 代码后发现消费者有使用 CountDownLatch 等待多线程消费结果, CountDownLatch#countDown 的调用没有放到 finally 中执行,并且提交到线程池的任务也没有使用 try catch 进行包裹,到此怀疑是消费线程阻塞到了 CountDownLatch#await 处,异步任务处理时由于偶现异常代码并未执行到 CountDownLatch#countDown 处,再者由于异步任务未捕获异常导致错误直接抛到 jvm 日志无法记录错误。为了验证这个问题,我们又dump了阻塞服务的栈信息,发现确实有消费者线程阻塞到 CountDownLatch#await 处,问题定位结束。

5. 解决方案

从任务处下手添加 catch 记录日志,并将 CountDownLatch#await 放到 finally 中执行。重启应用再次观察,并未出现 unacked 消息,观察日志也并未出现新添加的 error 日志。

6.问题拓展

同一个 channel 为何会阻塞那么多数据?

线上生产环境采用推模式,rabbitmq 通过 channel 推送消息到客户端,客户端采用 LinkedBlockingQueue 做缓存,一个 channel 对应一个消费者线程,当消费者线程阻塞时 LinkedBlockingQueue 作为中转一直在预存消息,所以会出现很多 unacked 消息。

为什么仅有部分一两个 channel 出现堆积?

线上添加错误日志后实际并未出现错误打印,怀疑之前异常可能是由于重启后第一次请求 rpc 偶现调用失败,猜测暂无法复现,后续需观察日志。

总结

谨慎使用线程同步,谨防线程死锁,务必保证线程不会 hang死。
自建线程池做好错误兜底,不要将异常抛给jvm。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
消息中间件 存储 数据采集
4步实现状态机驱动的MQTT客户端,快速接入OneNet (1)
本文介绍了基于状态机驱动的MQTT客户端快速接入OneNet平台的实现方法,通过4步完成模块设计。文章以开源项目`Sparrow`为基础,引入`OneNetMqtt`业务模块,采用事件驱动模型和双层状态机设计,实现设备状态管理、消息处理及定时任务等功能。模块分为三层:`OneNetManager`负责核心逻辑,`OneNetDevice`管理设备信息,`OneNetDriver`处理Socket与MQTT通信。验证结果显示设备连接、数据上报及下线功能正常,稳定性良好。该设计简化了复杂条件判断,增强了系统灵活性与可扩展性,适用于实际项目参考。文末提供源码获取方式,助力读者实践与学习。
741 117
|
Kubernetes 监控 API
深入解析Kubernetes及其在生产环境中的最佳实践
深入解析Kubernetes及其在生产环境中的最佳实践
874 93
|
消息中间件 开发工具 RocketMQ
消息队列 MQ产品使用合集之如何关闭客户端的日志记录
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
存储 Kubernetes 测试技术
企业级LLM推理部署新范式:基于ACK的DeepSeek蒸馏模型生产环境落地指南
企业级LLM推理部署新范式:基于ACK的DeepSeek蒸馏模型生产环境落地指南
806 12
|
存储 Kubernetes 测试技术
企业级LLM推理部署新范式:基于ACK的DeepSeek蒸馏模型生产环境落地指南
本教程演示如何在ACK中使用vLLM框架快速部署DeepSeek R1模型推理服务。
|
消息中间件 数据采集 数据库
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite
239 2
|
存储 Kubernetes Cloud Native
部署Kubernetes客户端和Docker私有仓库的步骤
这个指南涵盖了部署Kubernetes客户端和配置Docker私有仓库的基本步骤,是基于最新的实践和工具。根据具体的需求和环境,还可能需要额外的配置和调整。
427 1
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。