Spring实现Kafka重试Topic,真的太香了

简介: Kafka的Consumer偏移值支持手动管理,便于消息处理失败时重试。通过RetryableTopic注解,可设定重试次数与间隔,错误达上限后消息转入DLT队列。此方法提升处理灵活性,但也可能导致消息顺序混乱或重复处理,需谨慎应用。

概述

Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重试,我们可以选择手动管理,并在成功的情况下增加偏移量。但是,这会暂时阻止队列消息的处理。我们可以选择异步方法。

为什么我们需要它?

如果发生错误,而不是停止队列消息的处理;我们可以将错误消息转移到不同的主题并再次处理。

如果在处理 Kafka 消息时出现错误,可以使用 RetryableTopic 注解以一定的时间间隔和一定的次数再次处理消息。如果完成尝试次数后错误仍然存在,则消息将发送到 DLT 队列。

如何使用?

我们首先回顾一下RetryableTopic注解可以取的一些值,以便您可以做出最适合您的设置:

attempts:尝试处理消息的次数。它的默认值为 3。如果完成所有尝试后仍然收到错误,则消息将发送到 DLT 队列。

backoff:用于确定处理消息的时间间隔。从 Backoff 类获取一个值。您可以在下面找到退避的详细示例。

排除/排除名称:允许您排除指定的异常类。当您添加到列表中的任何错误被抛出时,重试机制将不会被激活。

include / includeNames:仅当抛出指定的异常时才会激活重试机制。

kafkaTemplate:虽然您可以给出现有 kafkaTemplate bean 的名称,但您也可以为特定于重试的 Kafka 模板定义不同的 bean。

autoCreateTopics:决定是否自动创建Retry和DLT主题。

retryTopicSuffix / dltTopicSuffix:用于确定要添加到自动创建的主题末尾的后缀。

dltStrategy:如果不需要DLT,可以定义为NO_DLT。

SameIntervalTopicReuseStrategy/fixedDelayTopicStrategy(3.0.4之前):用于确定要创建的重试主题策略。创建 (SINGLE_TOPIC) 或尽可能多的尝试值 (MULTIPLE_TOPICS) 重试主题。

Backoff的示例:

  • 具有固定的增量值
Backoff(delay = 600000 ) // 每 10 分钟
  • 具有指数价值
Backoff(delay = 60000 , multiplier = 2 ) // 1、2、4、8... 分钟后重复。
  • 用占位符定义值
Backoff(delayExpression = "${delay}", multiplierExpression = "${multiplier}")

@RetryableTopic 示例:

@RetryableTopic(
     backoff = @Backoff(delay = 300000),
     attempts = 12,
     sameIntervalTopicReuseStrategy = 
         SameIntervalTopicReuseStrategy.SINGLE_TOPIC,
     kafkaTemplate = "kafkaRetryableTopicTemplate",
     exclude = { SerializationException.class, 
                 DeserializationException.class, 
                 NullPointerException.class 
               }
 )
 @KafkaListener(topics = "my-topic")
 public void processMessage(RetryableDto retryableDto) {
     log.info("Retrying process RetryableDto : {}", retryableDto);
     // process message
 }

在上面的例子中,消息将每5分钟重新处理一次,总共12次,即1小时。如果任何尝试均顺利完成,则试用将终止。

由于定义了 SINGLE_TOPIC,因此将创建单个主题以进行重试。如果没有进行此定义,则会创建 12 个重试主题。

如果抛出了排除中定义的任何错误,则不会执行重做。

如果需要,您可以编写自己的 RetryableException 并在包含中定义此值,以便仅在引发此错误时才重试。

DLT队列处理

如果完成了定义的尝试次数并且继续收到错误,则消息将发送到 DLT 队列。如果要处理这些消息,可以使用DltHandler注解。

用法示例:

@DltHandler 
 public  void  handleDltMessage (RetryableDto retryableDto) { 
     log.error("DLT处理程序消息:{}", retryableDto); 
}

注意事项

虽然使用 RetryableTopic 的异步处理优势为我们带来了性能提升,但这种使用也有一些缺点。

使用RetryableTopic可能会破坏消息的处理顺序。

让我们用一个例子来解释这种情况:当主主题在时间 t 处理时,一条消息出错并被发送到重试主题。在时间 t + 1 时,另一条消息来到主主题并成功处理。让我们在重试主题中的消息在时间 t + 2 时被成功处理。在这种情况下,第一条传入消息将在第二条消息之后处理。如果订购对您很重要,我建议您在消息处理过程中进行必要的检查。

另一个缺点是消息双重处理的风险。您可以通过考虑这种可能性来进行改进。

相关文章
|
5月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
368 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
5月前
|
Java 测试技术 数据库
使用Spring的@Retryable注解进行自动重试
在现代软件开发中,容错性和弹性至关重要。Spring框架提供的`@Retryable`注解为处理瞬时故障提供了一种声明式、可配置的重试机制,使开发者能够以简洁的方式增强应用的自我恢复能力。本文深入解析了`@Retryable`的使用方法及其参数配置,并结合`@Recover`实现失败回退策略,帮助构建更健壮、可靠的应用程序。
666 1
使用Spring的@Retryable注解进行自动重试
|
10月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
1674 7
|
11月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
556 10
|
12月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
219 5
|
11月前
|
消息中间件 人工智能 安全
秒级灾备恢复:Kafka 2025 AI自愈集群下载及跨云Topic迁移终极教程
Apache Kafka 2025作为企业级实时数据中枢,实现五大革新:量子安全传输(CRYSTALS-Kyber抗量子加密算法)、联邦学习总线(支持TensorFlow Federated/Horizontal FL框架)、AI自愈集群(MTTR缩短至30秒内)、多模态数据处理(原生支持视频流、3D点云等)和跨云弹性扩展(AWS/GCP/Azure间自动迁移)。平台采用混合云基础设施矩阵与软件依赖拓扑设计,提供智能部署架构。安装流程涵盖抗量子安装包获取、量子密钥配置及联邦学习总线设置。
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
689 5
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
574 1
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
323 1
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。