用了这么久的RabbitMQ异步编程竟然都是错的!(下)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 用了这么久的RabbitMQ异步编程竟然都是错的!

2.3 死信堵塞MQ之坑

始终无法处理的死信消息,可能会引发堵塞MQ。

若线程池的任务队列无上限,最终可能导致OOM,类似的MQ也要注意任务堆积问题。对于突发流量引起的MQ堆积,问题并不大,适当调整消费者的消费能力应该就可以解决。但在很多时候,消息队列的堆积堵塞,是因为有大量始终无法处理的消息。

2.3.1 案例

用户服务在用户注册后发出一条消息,会员服务监听到消息后给用户派发优惠券,但因用户并没有保存成功,会员服务处理消息始终失败,消息重新进入队列,然后还是处理失败。这种在MQ中回荡的同一条消息,就是死信。


随着MQ被越来越多的死信填满,消费者需花费大量时间反复处理死信,导致正常消息的消费受阻,最终MQ可能因数据量过大而崩溃。


定义一个队列、一个直接交换器,然后把队列绑定到交换器

image.png

sendMessage发送消息到MQ,访问一次提交一条消息,使用自增标识作为消息内容

image.png

收到消息后,直接NPE,模拟处理出错

image.png

调用sendMessage接口发送两条消息,然后来到RabbitMQ管理台,可以看到这两条消息始终在队列,不断被重新投递,导致重新投递QPS达到1063。

image.png

在日志中也可看到大量异常信息。

修复方案

  • 解决死信无限重复进入队列最简单方案
    在程序处理出错时,直接抛AmqpRejectAndDontRequeueException,避免消息重新进入队列
throw new AmqpRejectAndDontRequeueException("error");

但更希望对同一消息,能够先进行几次重试,解决因为网络问题导致的偶发消息处理失败,若依旧失败,再把消息投递到专门设置的DLX。对于来自DLX的数据,可能只是记录日志发送报警,即使出现异常也不会再重复投递。

  • 逻辑如下
  • image.png
  • 针对该问题,我们来看

Spring AMQP的简便解决方案

  1. 定义死信交换器、死信队列。其实都是普通交换器和队列,只不过专门用于处理死信消息
  2. 通过RetryInterceptorBuilder构建一个RetryOperationsInterceptor以处理失败时候的重试。策略是最多尝试5次(重试4次);并且采取指数退避重试,首次重试延迟1秒,第二次2秒,以此类推,最大延迟是10秒;如果第4次重试还是失败,则使用RepublishMessageRecoverer把消息重新投入一个DLX
  3. 定义死信队列的处理程序。本案例只记录日志

代码

image.png

执行程序,发送两条消息,查看日志:

image.png

msg2的4次重试间隔分别是1秒、2秒、4秒、8秒,再加上首次的失败,所以最大尝试次数是5

4次重试后,RepublishMessageRecoverer把消息发往DLX

死信处理程序输出了got dead message msg2。

虽然几乎同时发俩消息,但msg2在msg1四次重试全部结束后才开始处理,因为默认SimpleMessageListenerContainer只有一个消费线程。可通过增加消费线程避免性能问题:


直接设置concurrentConsumers参数为10,来增加到10个工作线程

image.png

  • 也可设置maxConcurrentConsumers参数,让SimpleMessageListenerContainer动态调整消费者线程数。

小结

一般在遇到消息处理失败的时候,可设置重试。若重试还是不行,可把该消息扔到专门的死信队列处理,不要让死信影响到正常消息处理。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
8月前
|
消息中间件 存储 监控
RabbitMQ 面试题及答案整理,最新面试题
RabbitMQ 面试题及答案整理,最新面试题
233 1
|
消息中间件 Java Kafka
计算机应届生一定要会的JAVA面试题:RabbitMQ是如何实现消息路由的?
一个应届生去面试,可能没有什么实战经验,今天被问到一个这样的面试题,说“RabbitMQ是如何实现消息路由的?“一下子竟然不知道如何组织语言了。今天我给大家分享一下我的理解。
109 1
|
消息中间件 Java Maven
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
消息中间件系列教程(08) -RabbitMQ -案例代码(工作队列模式)
68 0
|
8月前
|
消息中间件 缓存 NoSQL
非常强悍的 RabbitMQ 总结,写得真好
非常强悍的 RabbitMQ 总结,写得真好
33 0
|
8月前
|
消息中间件 数据库
面试题解析:RabbitMQ在多线程秒杀系统中的关键作用
面试题解析:RabbitMQ在多线程秒杀系统中的关键作用
88 0
|
消息中间件 存储 网络协议
RabbitMQ 26问,基本涵盖了面试官必问的面试题
RabbitMQ 26问,基本涵盖了面试官必问的面试题
1277 1
|
消息中间件 算法 Java
RabbitMQ灵活运用,怎么理解五种消息模型
RabbitMQ灵活运用,怎么理解五种消息模型
167 0
|
消息中间件 存储 网络协议
我用ChatGPT,给RabbitMQ加了个连接池
上次我把 RabbitMQ 集成到项目中,但是每次使用 RabbitMQ 时都去 New 一个连接,导致并发起不来,所以这次我们就给 RabbitMQ 加一个连接池。 为了偷个懒,我直接用 ChatGPT 教我加。
|
消息中间件 存储 数据可视化
面试必问:RabbitMQ 有哪几种消息模式?
面试必问:RabbitMQ 有哪几种消息模式?