2.3 死信堵塞MQ之坑
始终无法处理的死信消息,可能会引发堵塞MQ。
若线程池的任务队列无上限,最终可能导致OOM,类似的MQ也要注意任务堆积问题。对于突发流量引起的MQ堆积,问题并不大,适当调整消费者的消费能力应该就可以解决。但在很多时候,消息队列的堆积堵塞,是因为有大量始终无法处理的消息。
2.3.1 案例
用户服务在用户注册后发出一条消息,会员服务监听到消息后给用户派发优惠券,但因用户并没有保存成功,会员服务处理消息始终失败,消息重新进入队列,然后还是处理失败。这种在MQ中回荡的同一条消息,就是死信。
随着MQ被越来越多的死信填满,消费者需花费大量时间反复处理死信,导致正常消息的消费受阻,最终MQ可能因数据量过大而崩溃。
定义一个队列、一个直接交换器,然后把队列绑定到交换器
sendMessage发送消息到MQ,访问一次提交一条消息,使用自增标识作为消息内容
收到消息后,直接NPE,模拟处理出错
调用sendMessage接口发送两条消息,然后来到RabbitMQ管理台,可以看到这两条消息始终在队列,不断被重新投递,导致重新投递QPS达到1063。
在日志中也可看到大量异常信息。
修复方案
- 解决死信无限重复进入队列最简单方案
在程序处理出错时,直接抛AmqpRejectAndDontRequeueException
,避免消息重新进入队列
throw new AmqpRejectAndDontRequeueException("error");
但更希望对同一消息,能够先进行几次重试,解决因为网络问题导致的偶发消息处理失败,若依旧失败,再把消息投递到专门设置的DLX。对于来自DLX的数据,可能只是记录日志发送报警,即使出现异常也不会再重复投递。
- 逻辑如下
- 针对该问题,我们来看
Spring AMQP的简便解决方案
- 定义死信交换器、死信队列。其实都是普通交换器和队列,只不过专门用于处理死信消息
- 通过RetryInterceptorBuilder构建一个RetryOperationsInterceptor以处理失败时候的重试。策略是最多尝试5次(重试4次);并且采取指数退避重试,首次重试延迟1秒,第二次2秒,以此类推,最大延迟是10秒;如果第4次重试还是失败,则使用RepublishMessageRecoverer把消息重新投入一个DLX
- 定义死信队列的处理程序。本案例只记录日志
代码
执行程序,发送两条消息,查看日志:
msg2的4次重试间隔分别是1秒、2秒、4秒、8秒,再加上首次的失败,所以最大尝试次数是5
4次重试后,RepublishMessageRecoverer把消息发往DLX
死信处理程序输出了got dead message msg2。
虽然几乎同时发俩消息,但msg2在msg1四次重试全部结束后才开始处理,因为默认SimpleMessageListenerContainer只有一个消费线程。可通过增加消费线程避免性能问题:
直接设置concurrentConsumers参数为10,来增加到10个工作线程
- 也可设置
maxConcurrentConsumers
参数,让SimpleMessageListenerContainer
动态调整消费者线程数。
小结
一般在遇到消息处理失败的时候,可设置重试。若重试还是不行,可把该消息扔到专门的死信队列处理,不要让死信影响到正常消息处理。