1. 自动补偿机制
在RabbitMQ里,如果消费者在处理消息时,业务逻辑出现异常,默认会执行补偿机制(也就是消息重试机制)。如果业务逻辑出现异常,是不会消费消息的。基于上一篇博客的例子《消息中间件系列教程(13) -RabbitMQ-SpringBoot集成RabbitMQ》来演示一下。
现在消费者处理消息的地方模拟一个异常:
生产者发送消息给消费者,会发现控制台一直在打印错误日志,也就是说,消费者一直在重试消费(补偿):
也可以在RabbitMQ控制台看到,消息时没有被消费的:
那么RabbitMQ消费者的补偿原理是怎样的呢?
- @RabbitListener 底层使用了AOP进行拦截,如果程序没有抛异常,自动提交事务。
- 如果AOP使用异常通知拦截获取异常信息的话,自动实现补偿机制,该消息会缓存到RabbitMQ服务端进行缓存,一直重试到不抛异常为准。
其实在application.yml配置文件可以配置重试的次数与时间的,如重试5次,每次3秒:
运行程序后,发现重试了5次,3秒重试一次,而且RabbitMQ控制的消息也消费了:
2. 如何合理选择重试机制
下面有两种情况:
- 情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? (需要重试机制)
- 情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试?(不需要重试机制)。
对于情况一是需要重试机制的,因为第三方API接口无法访问可能是多种原因造成的,比如网络延时等。而情况二不需要重试机制,因为异常的代码是一直都会有异常的,只能通过下一次发布版本解决。
那么该如何解决呢?
- 可以采用“日志记录”+“定时任务健康检查”+“人工补偿”来解决。比如:出现了错误,可以日志记录下来,定时任务去定时查日志,人工进行补偿。
对于情况一,也许还会有疑问,如何实现手动重试呢?其实在模拟抛出异常就可以了,如下伪代码:
@Component @RabbitListener(queues = "fanout_sms_queue") public class FanoutSmsConsumer { @RabbitHandler public void process(String msg) { System.out.println("短信消费者获取生产者消息msg:" + msg); Result result = OkHttpCientUtils(url,json); if(result == null){ throw new Exception("调用第三方接口失败,准备执行重试机制"); } System.out.println("调用第三方接口与成功"); } }