Spring Cloud Stream消费失败后的处理策略(一):自动重试

简介: Spring Cloud Stream消费失败后的处理策略(一):自动重试

之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如:

下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式。不过不论哪种方式,都需要与具体业务结合,解决不同业务场景可能出现的问题。

今天第一节,介绍一下Spring Cloud Stream中默认就已经配置了的一个异常解决方案:重试!

应用场景

依然要明确一点,任何解决方案都要结合具体的业务实现来确定,不要有了锤子看什么问题都是钉子。那么重试可以解决什么问题呢?由于重试的基础逻辑并不会改变,所以通常重试只能解决因环境不稳定等外在因素导致的失败情况,比如:当我们接收到某个消息之后,需要调用一个外部的Web Service做一些事情,这个时候如果与外部系统的网络出现了抖动,导致调用失败而抛出异常。这个时候,通过重试消息消费的具体逻辑,可能在下一次调用的时候,就能完成整合业务动作,从而解决刚才所述的问题。

动手试试

先通过一个小例子来看看Spring Cloud Stream默认的重试机制是如何运作的。之前在如何消费自己生产的消息一文中的例子,我们可以继续沿用,或者也可以精简一些,都写到一个主类中,比如下面这样:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
    @RestController
    static class TestController {
        @Autowired
        private TestTopic testTopic;
        /**
         * 消息生产接口
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }
    }
    /**
     * 消息消费逻辑
     */
    @Slf4j
    @Component
    static class TestListener {
        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received: " + payload);
            throw new RuntimeException("Message consumer failed!");
        }
    }
    interface TestTopic {
        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";
        @Output(OUTPUT)
        MessageChannel output();
        @Input(INPUT)
        SubscribableChannel input();
    }
}

内容很简单,既包含了消息的生产,也包含了消息消费。与之前例子不同的就是在消息消费逻辑中,主动的抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-10 11:20:21.345  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:22.350  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:24.354  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:54.651 ERROR 30499 --- [w2p2yKethOsqg-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.didispace.stream.TestApplication$TestListener#receive[1 args]; nested exception is java.lang.RuntimeException: Message consumer failed!, failedMessage=GenericMessage [payload=byte[5], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=test-topic, amqp_receivedExchange=test-topic, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=test-topic.anonymous.EuqBJu66Qw2p2yKethOsqg, amqp_redelivered=false, id=a89adf96-7de2-f29d-20b6-2fcb0c64cd8c, amqp_consumerTag=amq.ctag-XFy6vXU2w4RB_NRBzImWTA, contentType=application/json, timestamp=1544412051638}]
  at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
  at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
  at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
  at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
  at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
  at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
  at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
  at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
  at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
  at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
  at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
  at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
  at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
  at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
  at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:60)
  at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:214)
  at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
  at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
  at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:211)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Message consumer failed!
  at com.didispace.stream.TestApplication$TestListener.receive(TestApplication.java:65)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
  at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
  ... 27 more
从日志中可以看到,一共输出了三次Received: hello,也就是说消息消费逻辑执行了3次,然后抛出了最终执行失败的异常。
设置重复次数
默认情况下Spring Cloud Stream会重试3次,我们也可以通过配置的方式修改这个默认配置,比如下面的配置可以将重试次数调整为1次:
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
对于一些纯内部计算逻辑,不需要依赖外部环境,如果出错通常是代码逻辑错误的情况下,不论我们如何重试都会继续错误的业务逻辑可以将该参数设置为0,避免不必要的重试影响消息处理的速度。
深入思考
完成了上面的基础尝试之后,再思考下面两个问题:
问题一:如果在重试过程中消息处理成功了,还会有异常信息吗?
答案是不会。因为重试过程是消息处理的一个整体,如果某一次重试成功了,会任务对所收到消息的消费成功了。
这个问题可以在上述例子中做一些小改动来验证,比如:
@Slf4j
@Component
static class TestListener {
    int counter = 1;
    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received: " + payload + ", " + counter);
        if (counter == 3) {
            counter = 1;
            return;
        } else {
            counter++;
            throw new RuntimeException("Message consumer failed!");
        }
    }
}
通过加入一个计数器,当重试是第3次的时候,不抛出异常来模拟消费逻辑处理成功了。此时重新运行程序,并调用接口localhost:8080/sendMessage?message=hello,可以获得如下日志结果,并没有异常打印出来。
2018-12-10 16:07:38.390  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 1
2018-12-10 16:07:39.398  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 2
2018-12-10 16:07:41.402  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 3
也就是,虽然前两次消费抛出了异常,但是并不影响最终的结果,也不会打印中间过程的异常,避免了对日志告警产生误报等问题。
问题二:如果重试都失败之后应该怎么办呢?
如果消息在重试了还是失败之后,目前的配置唯一能做的就是将异常信息记录下来,进行告警。由于日志中有消息的消息信息描述,所以应用维护者可以根据这些信息来做一些补救措施。
当然,这样的做法显然不是最好的,因为太过麻烦。那么怎么做才好呢?且听下回分解!
代码示例
本文示例读者可以通过查看下面仓库的中的stream-exception-handler-1项目:
Github
Gitee
如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!
以下专题教程也许您会有兴趣
Spring Boot基础教程
Spring Cloud基础教程
 本文由 程序猿DD 创作
版权声明:自由转载-非商用-非衍生-保持署名 (CC BY-NC-ND 3.0)
原创不易,转载请注明出处,感谢支持!如果本文对您有用,欢迎转发分享!
 Spring Cloud, Spring Cloud Stream
上一篇
Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑
应用场景上一篇《Spring Cloud Stream消费失败后的处理策略(一):自动重试》介绍了默认就会生效的消息重试功能。对于一些因环境原因、网络抖动等不稳定因素引发的问题可以起到比较好的作用。但是对于诸如代码本身存在的逻辑错误等,无论重试多少次都不可能成功的问题,是无法修复的。对于这样的情况,前文中说了可以利用日志记录消息内容,配合告警来做补救,但是很显然,这样做非常原始,并且太过笨拙...
下一篇
Spring Cloud Stream如何消费自己生产的消息
在上一篇《Spring Cloud Stream如何处理消息重复消费》中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题。本文将继续说说在另外一个被经常问到的问题:如果微服务生产的消息自己也想要消费一份,应该如何实现呢? 常见错误在放出标准答案前,先放出一个常见的错误姿势和告警信息(以便您可以通过搜索引擎找到这里^_^)。以下错误基于Spring Boot 2.0...
本周福利
加入圈子
  加入微信群
  加入QQ群
免费资源
  设计模式学习笔记
  Java基础核心知识大总结
  算法刷题学习笔记
  优秀的简历模版
  免费领《1000G精选资料》
热门导航
  Spring Cloud中文资料站
  腾讯云:每日特惠秒杀专场
  阿里云:新用户最低0.6折
  Markdown写作分发工具
  Java进阶必备:源码解析
  微服务开源项目:Onemall
  4K高清电影资源
  宝塔面板:轻松管理云主机
热门专题
Spring Boot教程
全网Star最多的Spring Boot免费教程,,目前正在更新2.x版本内容哦!
Spring Cloud教程
全网最早最全的Spring Cloud免费教程,目前正在更新Alibaba模块与G版本哦!
微服务专题
最热门的分布式架构设计,微服务到底好不好?您的团队到底适合不适合?
免费资源
  设计模式学习笔记
  Java基础核心知识大总结
  算法刷题学习笔记
  优秀的简历模版
  免费领《1000G精选资料》
分类
Chrome插件2
Fleet1
Git6
IntelliJ IDEA23
Java44
Maven1
Spring12
Spring Boot176
Spring Cloud132
Spring Data1
优惠活动5
前沿资讯11
前端技术14
博客攻略7
开源推荐28
敏捷管理10
日常记录9
架构运维57
程序人生36

从日志中可以看到,一共输出了三次Received: hello,也就是说消息消费逻辑执行了3次,然后抛出了最终执行失败的异常。

设置重复次数

默认情况下Spring Cloud Stream会重试3次,我们也可以通过配置的方式修改这个默认配置,比如下面的配置可以将重试次数调整为1次:

spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

对于一些纯内部计算逻辑,不需要依赖外部环境,如果出错通常是代码逻辑错误的情况下,不论我们如何重试都会继续错误的业务逻辑可以将该参数设置为0,避免不必要的重试影响消息处理的速度。

深入思考

完成了上面的基础尝试之后,再思考下面两个问题:

问题一:如果在重试过程中消息处理成功了,还会有异常信息吗?

答案是不会。因为重试过程是消息处理的一个整体,如果某一次重试成功了,会任务对所收到消息的消费成功了。

这个问题可以在上述例子中做一些小改动来验证,比如:

Slf4j
@Component
static class TestListener {
    int counter = 1;
    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received: " + payload + ", " + counter);
        if (counter == 3) {
            counter = 1;
            return;
        } else {
            counter++;
            throw new RuntimeException("Message consumer failed!");
        }
    }
}

通过加入一个计数器,当重试是第3次的时候,不抛出异常来模拟消费逻辑处理成功了。此时重新运行程序,并调用接口localhost:8080/sendMessage?message=hello,可以获得如下日志结果,并没有异常打印出来。

2018-12-10 16:07:38.390  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 1
2018-12-10 16:07:39.398  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 2
2018-12-10 16:07:41.402  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 3

也就是,虽然前两次消费抛出了异常,但是并不影响最终的结果,也不会打印中间过程的异常,避免了对日志告警产生误报等问题。

问题二:如果重试都失败之后应该怎么办呢?

如果消息在重试了还是失败之后,目前的配置唯一能做的就是将异常信息记录下来,进行告警。由于日志中有消息的消息信息描述,所以应用维护者可以根据这些信息来做一些补救措施。

当然,这样的做法显然不是最好的,因为太过麻烦。那么怎么做才好呢?且听下回分解!

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-1项目:

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
3天前
|
存储 缓存 Java
nacos常见问题之超时异常如何解决
Nacos是阿里云开源的服务发现和配置管理平台,用于构建动态微服务应用架构;本汇总针对Nacos在实际应用中用户常遇到的问题进行了归纳和解答,旨在帮助开发者和运维人员高效解决使用Nacos时的各类疑难杂症。
165 0
|
8月前
|
Java Maven Spring
spring-retry实现方法请求重试
spring-retry实现方法请求重试
36 0
|
12月前
|
Java API 微服务
Spring Cloud Alibaba - 14 OpenFeign自定义配置 + 调用优化 + 超时时间
Spring Cloud Alibaba - 14 OpenFeign自定义配置 + 调用优化 + 超时时间
259 0
|
安全 Dubbo Java
【Spring专题】「实战系列」重新回顾一下异常重试框架Spring Retry的功能指南
【Spring专题】「实战系列」重新回顾一下异常重试框架Spring Retry的功能指南
147 0
【Spring专题】「实战系列」重新回顾一下异常重试框架Spring Retry的功能指南
|
Java Spring
Spring Cloud学习 之 Spring Cloud Ribbon 重试机制及超时设置不生效
Spring Cloud学习 之 Spring Cloud Ribbon 重试机制及超时设置不生效
538 1
|
缓存 Java API
Spring Cloud升级之路 - Hoxton - 10. 网关重试Body丢失的问题
Spring Cloud升级之路 - Hoxton - 10. 网关重试Body丢失的问题
|
Java 微服务 Spring
Spring Cloud升级之路 - Hoxton - 8. 修改实例级别的熔断为实例+方法级别
Spring Cloud升级之路 - Hoxton - 8. 修改实例级别的熔断为实例+方法级别
|
分布式计算 Dubbo Java
Spring 中的重试机制,简单、实用!
Spring实现了一套重试机制,功能简单实用。Spring Retry是从Spring Batch独立出来的一个功能,已经广泛应用于Spring Batch,Spring Integration, Spring for Apache Hadoop等Spring项目。
Spring 中的重试机制,简单、实用!
|
Java Spring
Spring Cloud Stream消费失败后的处理策略(一):自动重试
Spring Cloud Stream消费失败后的处理策略(一):自动重试
251 0
探讨通过Feign配合Hystrix进行调用时异常的处理
探讨通过Feign配合Hystrix进行调用时异常的处理
462 0