Spring Cloud Stream消费失败后的处理策略(一):自动重试

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spring Cloud Stream消费失败后的处理策略(一):自动重试

之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如:

下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式。不过不论哪种方式,都需要与具体业务结合,解决不同业务场景可能出现的问题。

今天第一节,介绍一下Spring Cloud Stream中默认就已经配置了的一个异常解决方案:重试!

应用场景

依然要明确一点,任何解决方案都要结合具体的业务实现来确定,不要有了锤子看什么问题都是钉子。那么重试可以解决什么问题呢?由于重试的基础逻辑并不会改变,所以通常重试只能解决因环境不稳定等外在因素导致的失败情况,比如:当我们接收到某个消息之后,需要调用一个外部的Web Service做一些事情,这个时候如果与外部系统的网络出现了抖动,导致调用失败而抛出异常。这个时候,通过重试消息消费的具体逻辑,可能在下一次调用的时候,就能完成整合业务动作,从而解决刚才所述的问题。

动手试试

先通过一个小例子来看看Spring Cloud Stream默认的重试机制是如何运作的。之前在如何消费自己生产的消息一文中的例子,我们可以继续沿用,或者也可以精简一些,都写到一个主类中,比如下面这样:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
    @RestController
    static class TestController {
        @Autowired
        private TestTopic testTopic;
        /**
         * 消息生产接口
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }
    }
    /**
     * 消息消费逻辑
     */
    @Slf4j
    @Component
    static class TestListener {
        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received: " + payload);
            throw new RuntimeException("Message consumer failed!");
        }
    }
    interface TestTopic {
        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";
        @Output(OUTPUT)
        MessageChannel output();
        @Input(INPUT)
        SubscribableChannel input();
    }
}

内容很简单,既包含了消息的生产,也包含了消息消费。与之前例子不同的就是在消息消费逻辑中,主动的抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-10 11:20:21.345  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:22.350  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:24.354  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:54.651 ERROR 30499 --- [w2p2yKethOsqg-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.didispace.stream.TestApplication$TestListener#receive[1 args]; nested exception is java.lang.RuntimeException: Message consumer failed!, failedMessage=GenericMessage [payload=byte[5], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=test-topic, amqp_receivedExchange=test-topic, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=test-topic.anonymous.EuqBJu66Qw2p2yKethOsqg, amqp_redelivered=false, id=a89adf96-7de2-f29d-20b6-2fcb0c64cd8c, amqp_consumerTag=amq.ctag-XFy6vXU2w4RB_NRBzImWTA, contentType=application/json, timestamp=1544412051638}]
  at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
  at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
  at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
  at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
  at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
  at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
  at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
  at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
  at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
  at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
  at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
  at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
  at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
  at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
  at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:60)
  at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:214)
  at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
  at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
  at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:211)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Message consumer failed!
  at com.didispace.stream.TestApplication$TestListener.receive(TestApplication.java:65)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
  at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
  ... 27 more
从日志中可以看到,一共输出了三次Received: hello,也就是说消息消费逻辑执行了3次,然后抛出了最终执行失败的异常。
设置重复次数
默认情况下Spring Cloud Stream会重试3次,我们也可以通过配置的方式修改这个默认配置,比如下面的配置可以将重试次数调整为1次:
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
对于一些纯内部计算逻辑,不需要依赖外部环境,如果出错通常是代码逻辑错误的情况下,不论我们如何重试都会继续错误的业务逻辑可以将该参数设置为0,避免不必要的重试影响消息处理的速度。
深入思考
完成了上面的基础尝试之后,再思考下面两个问题:
问题一:如果在重试过程中消息处理成功了,还会有异常信息吗?
答案是不会。因为重试过程是消息处理的一个整体,如果某一次重试成功了,会任务对所收到消息的消费成功了。
这个问题可以在上述例子中做一些小改动来验证,比如:
@Slf4j
@Component
static class TestListener {
    int counter = 1;
    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received: " + payload + ", " + counter);
        if (counter == 3) {
            counter = 1;
            return;
        } else {
            counter++;
            throw new RuntimeException("Message consumer failed!");
        }
    }
}
通过加入一个计数器,当重试是第3次的时候,不抛出异常来模拟消费逻辑处理成功了。此时重新运行程序,并调用接口localhost:8080/sendMessage?message=hello,可以获得如下日志结果,并没有异常打印出来。
2018-12-10 16:07:38.390  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 1
2018-12-10 16:07:39.398  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 2
2018-12-10 16:07:41.402  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 3
也就是,虽然前两次消费抛出了异常,但是并不影响最终的结果,也不会打印中间过程的异常,避免了对日志告警产生误报等问题。
问题二:如果重试都失败之后应该怎么办呢?
如果消息在重试了还是失败之后,目前的配置唯一能做的就是将异常信息记录下来,进行告警。由于日志中有消息的消息信息描述,所以应用维护者可以根据这些信息来做一些补救措施。
当然,这样的做法显然不是最好的,因为太过麻烦。那么怎么做才好呢?且听下回分解!
代码示例
本文示例读者可以通过查看下面仓库的中的stream-exception-handler-1项目:
Github
Gitee
如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!
以下专题教程也许您会有兴趣
Spring Boot基础教程
Spring Cloud基础教程
 本文由 程序猿DD 创作
版权声明:自由转载-非商用-非衍生-保持署名 (CC BY-NC-ND 3.0)
原创不易,转载请注明出处,感谢支持!如果本文对您有用,欢迎转发分享!
 Spring Cloud, Spring Cloud Stream
上一篇
Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑
应用场景上一篇《Spring Cloud Stream消费失败后的处理策略(一):自动重试》介绍了默认就会生效的消息重试功能。对于一些因环境原因、网络抖动等不稳定因素引发的问题可以起到比较好的作用。但是对于诸如代码本身存在的逻辑错误等,无论重试多少次都不可能成功的问题,是无法修复的。对于这样的情况,前文中说了可以利用日志记录消息内容,配合告警来做补救,但是很显然,这样做非常原始,并且太过笨拙...
下一篇
Spring Cloud Stream如何消费自己生产的消息
在上一篇《Spring Cloud Stream如何处理消息重复消费》中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题:如果微服务生产的消息自己也想要消费一份,应该如何实现呢? 常见错误在放出标准答案前,先放出一个常见的错误姿势和告警信息(以便您可以通过搜索引擎找到这里^_^)。以下错误基于Spring Boot 2.0...
本周福利
加入圈子
  加入微信群
  加入QQ群
免费资源
  设计模式学习笔记
  Java基础核心知识大总结
  算法刷题学习笔记
  优秀的简历模版
  免费领《1000G精选资料》
热门导航
  Spring Cloud中文资料站
  腾讯云:每日特惠秒杀专场
  阿里云:新用户最低0.6折
  Markdown写作分发工具
  Java进阶必备:源码解析
  微服务开源项目:Onemall
  4K高清电影资源
  宝塔面板:轻松管理云主机
热门专题
Spring Boot教程
全网Star最多的Spring Boot免费教程,,目前正在更新2.x版本内容哦!
Spring Cloud教程
全网最早最全的Spring Cloud免费教程,目前正在更新Alibaba模块与G版本哦!
微服务专题
最热门的分布式架构设计,微服务到底好不好?您的团队到底适合不适合?
免费资源
  设计模式学习笔记
  Java基础核心知识大总结
  算法刷题学习笔记
  优秀的简历模版
  免费领《1000G精选资料》
分类
Chrome插件2
Fleet1
Git6
IntelliJ IDEA23
Java44
Maven1
Spring12
Spring Boot176
Spring Cloud132
Spring Data1
优惠活动5
前沿资讯11
前端技术14
博客攻略7
开源推荐28
敏捷管理10
日常记录9
架构运维57
程序人生36

从日志中可以看到,一共输出了三次Received: hello,也就是说消息消费逻辑执行了3次,然后抛出了最终执行失败的异常。

设置重复次数

默认情况下Spring Cloud Stream会重试3次,我们也可以通过配置的方式修改这个默认配置,比如下面的配置可以将重试次数调整为1次:

spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

对于一些纯内部计算逻辑,不需要依赖外部环境,如果出错通常是代码逻辑错误的情况下,不论我们如何重试都会继续错误的业务逻辑可以将该参数设置为0,避免不必要的重试影响消息处理的速度。

深入思考

完成了上面的基础尝试之后,再思考下面两个问题:

问题一:如果在重试过程中消息处理成功了,还会有异常信息吗?

答案是不会。因为重试过程是消息处理的一个整体,如果某一次重试成功了,会任务对所收到消息的消费成功了。

这个问题可以在上述例子中做一些小改动来验证,比如:

Slf4j
@Component
static class TestListener {
    int counter = 1;
    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received: " + payload + ", " + counter);
        if (counter == 3) {
            counter = 1;
            return;
        } else {
            counter++;
            throw new RuntimeException("Message consumer failed!");
        }
    }
}

通过加入一个计数器,当重试是第3次的时候,不抛出异常来模拟消费逻辑处理成功了。此时重新运行程序,并调用接口localhost:8080/sendMessage?message=hello,可以获得如下日志结果,并没有异常打印出来。

2018-12-10 16:07:38.390  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 1
2018-12-10 16:07:39.398  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 2
2018-12-10 16:07:41.402  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 3

也就是,虽然前两次消费抛出了异常,但是并不影响最终的结果,也不会打印中间过程的异常,避免了对日志告警产生误报等问题。

问题二:如果重试都失败之后应该怎么办呢?

如果消息在重试了还是失败之后,目前的配置唯一能做的就是将异常信息记录下来,进行告警。由于日志中有消息的消息信息描述,所以应用维护者可以根据这些信息来做一些补救措施。

当然,这样的做法显然不是最好的,因为太过麻烦。那么怎么做才好呢?且听下回分解!

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-1项目:

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
Dubbo Java 应用服务中间件
深入了解Spring Cloud Alibaba Dubbo
在现代分布式系统开发中,构建高性能、可伸缩性和弹性的微服务架构变得越来越重要。Spring Cloud Alibaba Dubbo(简称Dubbo)是一个开源的分布式服务框架,可以帮助开发者构建强大的微服务架构,具备负载均衡、服务治理、远程调用等强大功能。本文将深入介绍Spring Cloud Alibaba Dubbo,帮助你理解它的核心概念、工作原理以及如何在你的项目中使用它。
|
Kubernetes Java 微服务
Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(1)
Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(1)
138 0
Spring Boot 单体应用一键升级成 Spring Cloud Alibaba(1)
|
7月前
|
Java 中间件 开发者
Spring Cloud Alibaba
【1月更文挑战第27天】【1月更文挑战第127篇】Spring Cloud Alibaba
141 1
|
6月前
|
消息中间件 Java 持续交付
Spring Cloud Alibaba 项目搭建步骤和注意事项
Spring Cloud Alibaba 项目搭建步骤和注意事项
804 0
Spring Cloud Alibaba 项目搭建步骤和注意事项
|
Java Nacos Sentinel
Spring Cloud Alibaba学习指南
由于在2018年Netflix公司宣布对其核心组件Hystrix、Ribbon、zuul、Eureka等进入维护状态,也就是Spring Cloud Netflix系列。由此Spring Cloud Alibaba就诞生了,值得注意的是Spring Cloud Alibaba完全兼容了Spring Cloud Netflix中的Ribbon、Feign、Eureka等组件,所以基于Spring Cloud Netflix的项目可以无缝迁移到Spring Cloud Alibaba。
552 0
|
7月前
|
消息中间件 Java Maven
Spring Cloud Alibaba 简介
Spring Cloud Alibaba 简介
318 1
|
7月前
|
消息中间件 负载均衡 Java
【从Spring Cloud到Spring Cloud Alibaba,这些改变你都知道吗?】—— 每天一点小知识
【从Spring Cloud到Spring Cloud Alibaba,这些改变你都知道吗?】—— 每天一点小知识
173 0
【从Spring Cloud到Spring Cloud Alibaba,这些改变你都知道吗?】—— 每天一点小知识
|
SQL 消息中间件 Oracle
|
负载均衡 Java Spring
业界良心啊!第五次更新的Spring Cloud Alibaba升级太多内容
Spring Cloud Alibaba经历的这几年,每次的更新都会掀起更大的水花,就在近日进行了第五次的重大更新,除了修改、升级了一些内容之外,还有增加了很多细节的内容。在质上可以说是有了一个很大的飞跃!
|
Kubernetes Cloud Native 安全
Spring Cloud Alibaba 在 Proxyless Mesh 上的探索
站在 2023 年的今天,Service Mesh 早已不是一个新兴的概念, 回顾过去 6 年多的发展历程,Service Mesh 从一经推出就受到来自全世界的主流技术公司关注和追捧。
268 5
Spring Cloud Alibaba 在 Proxyless Mesh 上的探索