Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

简介: Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

应用场景

之前我们已经通过《Spring Cloud Stream消费失败后的处理策略(一):自动重试》文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队。

动手试试

准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }
    @RestController
    static class TestController {
        @Autowired
        private TestTopic testTopic;
        /**
         * 消息生产接口
         *
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }
    }
    /**
     * 消息消费逻辑
     */
    @Slf4j
    @Component
    static class TestListener {
        private int count = 1;
        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received payload : " + payload + ", " + count);
            throw new RuntimeException("Message consumer failed!");
        }
    }
    interface TestTopic {
        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";
        @Output(OUTPUT)
        MessageChannel output();
        @Input(INPUT)
        SubscribableChannel input();
    }
}

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到程序不断的抛出了消息消费异常。这是由于这里我们多加了一个配置:spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true。在该配置作用之下,消息消费失败之后,并不会将该消息抛弃,而是将消息重新放入队列,所以消息的消费逻辑会被重复执行,直到这条消息消费成功为止。

深入思考

在完成了上面的这个例子之后,可能读者会有下面两个常见问题:

问题一:之前介绍的Spring Cloud Stream默认提供的默认功能(spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts)与本文所说的重入队列实现的重试有什么区别?

Spring Cloud Stream默认提供的默认功能只是对处理逻辑的重试,它们的处理逻辑是由同一条消息触发的。而本文所介绍的重新入队史通过重新将消息放入队列而触发的,所以实际上是收到了多次消息而实现的重试。

问题二:如上面的例子那样,消费一直不成功,这些不成功的消息会被不断堆积起来,如何解决这个问题?

对于这个问题,我们可以联合前文介绍的DLQ队列来完善消息的异常处理。

我们只需要增加如下配置,自动绑定dlq队列:

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true

然后改造一下消息处理程序,可以根据业务情况,为进入dlq队列增加一个条件,比如下面的例子:

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
    log.info("Received payload : " + payload + ", " + count);
    if (count == 3) {
        count = 1;
        throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!");
    } else {
        count ++;
        throw new RuntimeException("Message consumer failed!");
    }
}

设定了计数器count,当count为3的时候抛出AmqpRejectAndDontRequeueException这个特定的异常。此时,当只有当抛出这个异常的时候,才会将消息放入DLQ队列,从而不会造成严重的堆积问题。

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-4项目:

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
209 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
2月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
185 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
6月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
236 32
|
9月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
167 5
|
11月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
252 6
|
消息中间件 存储 Java
SpringCloud基础4——RabbitMQ和SpringAMQP
消息队列MQ、RabbitMQ、SpringAMQP高级消息队列协议、发布/订阅模型、fanout、direct、topic模式
SpringCloud基础4——RabbitMQ和SpringAMQP
|
消息中间件 存储 容灾
RabbitMQ的故障恢复与容灾策略
【8月更文第28天】RabbitMQ是一个开源的消息代理软件,它支持多种消息协议,如AMQP(Advanced Message Queuing Protocol)。在实际应用中,为了保证服务的连续性,需要实施一系列的故障恢复与容灾策略。
794 2
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
348 0
|
消息中间件 Java 开发工具
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常
176 0
|
消息中间件 存储 Kubernetes
消息队列 MQ使用问题之支持哪些消息分配策略
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

热门文章

最新文章