「Spring和Kafka」Kafka整合Spring 深入挖掘 -第1部分

简介: 「Spring和Kafka」Kafka整合Spring 深入挖掘 -第1部分

接下来是《如何在您的Spring启动应用程序中使用Apache Kafka》「Spring和Kafka」如何在您的Spring启动应用程序中使用Kafka ,这展示了如何开始使用Spring启动和Apache Kafka®,这里我们将更深入地挖掘Apache Kafka项目的Spring提供的一些附加功能。

Apache Kafka的Spring为Kafka带来了熟悉的Spring编程模型。它提供了用于发布记录的KafkaTemplate和用于异步执行POJO侦听器的侦听器容器。Spring引导自动配置连接了许多基础设施,因此您可以将精力集中在业务逻辑上。


错误恢复

考虑一下这个简单的POJO监听器方法:

@KafkaListener(id = "fooGroup", topics = "topic1")

public void listen(String in) {

logger.info("Received: " + in);

if (in.startsWith("foo")) {

throw new RuntimeException("failed");

}

}

默认情况下,失败的记录会被简单地记录下来,然后我们继续下一个。但是,我们可以在侦听器容器中配置一个错误处理程序来执行一些其他操作。为此,我们用我们自己的来覆盖Spring Boot的自动配置容器工厂:


@Bean

public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(

ConcurrentKafkaListenerContainerFactoryConfigurer configurer,

ConsumerFactory<Object, Object> kafkaConsumerFactory) {

ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

configurer.configure(factory, kafkaConsumerFactory);

factory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<<

return factory;

}

注意,我们仍然可以利用大部分的自动配置。

SeekToCurrentErrorHandler丢弃轮询()中的剩余记录,并在使用者上执行查找操作来重置偏移量,以便在下一次轮询时再次获取被丢弃的记录。默认情况下,错误处理程序跟踪失败的记录,在10次提交尝试后放弃,并记录失败的记录。但是,我们也可以将失败的消息发送到另一个主题。我们称这是一个毫无意义的话题。

下面的例子把这一切放在一起:

@Bean

public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(

ConcurrentKafkaListenerContainerFactoryConfigurer configurer,

ConsumerFactory<Object, Object> kafkaConsumerFactory,

KafkaTemplate<Object, Object> template) {

ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

configurer.configure(factory, kafkaConsumerFactory);

factory.setErrorHandler(new SeekToCurrentErrorHandler(

new DeadLetterPublishingRecoverer(template), 3));

return factory;

}

@KafkaListener(id = "fooGroup", topics = "topic1")

public void listen(String in) {

logger.info("Received: " + in);

if (in.startsWith("foo")) {

throw new RuntimeException("failed");

}

}

@KafkaListener(id = "dltGroup", topics = "topic1.DLT")

public void dltListen(String in) {

logger.info("Received from DLT: " + in);

}

反序列化错误

但是,在Spring获得记录之前发生的反序列化异常又如何呢?进入ErrorHandlingDeserializer。此反序列化器包装委托反序列化器并捕获任何异常。然后将它们转发给侦听器容器,后者将它们直接发送给错误处理程序。异常包含源数据,因此可以诊断问题。

域对象并推断类型

考虑下面的例子:

@Bean

public RecordMessageConverter converter() {

return new StringJsonMessageConverter();

}

@KafkaListener(id = "fooGroup", topics = "topic1")

public void listen(Foo2 foo) {

logger.info("Received: " + foo);

if (foo.getFoo().startsWith("fail")) {

throw new RuntimeException("failed");

}

}

@KafkaListener(id = "dltGroup", topics = "topic1.DLT")

public void dltListen(Foo2 in) {

logger.info("Received from DLT: " + in);

}

注意,我们现在正在使用类型为Foo2的对象。消息转换器bean推断要转换为方法签名中的参数类型的类型。

转换器自动“信任”类型。Spring Boot自动将转换器配置到侦听器容器中。

在生产者方面,发送的对象可以是一个不同的类(只要它的类型兼容):

@RestController

public class Controller {

@Autowired

private KafkaTemplate<Object, Object> template;

@PostMapping(path = "/send/foo/{what}")

public void sendFoo(@PathVariable String what) {

this.template.send("topic1", new Foo1(what));

}

}

和:

spring:

kafka:

producer:

value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

$ curl -X POST http://localhost:8080/send/foo/fail

这里,我们在消费者端使用StringDeserializer和“智能”消息转换器。

多种监听器

我们还可以使用单个侦听器容器,并根据类型路由到特定的方法。这次我们不能推断类型,因为类型是用来选择要调用的方法的。

相反,我们依赖于在记录头中传递的类型信息来将源类型映射到目标类型。此外,由于我们没有推断类型,所以需要将消息转换器配置为“信任”映射类型的包。


在本例中,我们将在两端使用消息转换器(以及StringSerializer和StringDeserializer)。下面是消费者端转换器的例子:

@Bean

public RecordMessageConverter converter() {

StringJsonMessageConverter converter = new StringJsonMessageConverter();

DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();

typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);

typeMapper.addTrustedPackages("com.common");

Map<String, Class<?>> mappings = new HashMap<>();

mappings.put("foo", Foo2.class);

mappings.put("bar", Bar2.class);

typeMapper.setIdClassMapping(mappings);

converter.setTypeMapper(typeMapper);

return converter;

}


在这里,我们从“foo”映射到类Foo2,从“bar”映射到类Bar2。注意,我们必须告诉它使用TYPE_ID头来确定转换的类型。同样,Spring Boot会自动将消息转换器配置到容器中。下面是应用程序片段中的生产端类型映射。yml文件;格式是一个逗号分隔的令牌列表:FQCN:

spring:

kafka:

producer:

value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

properties:

spring.json.type.mapping: foo:com.common.Foo1,bar:com.common.Bar1

这个配置将类Foo1映射到“foo”,将类Bar1映射到“bar”。

监听器:

@Component

@KafkaListener(id = "multiGroup", topics = { "foos", "bars" })

public class MultiMethods {

@KafkaHandler

public void foo(Foo1 foo) {

System.out.println("Received: " + foo);

}

@KafkaHandler

public void bar(Bar bar) {

System.out.println("Received: " + bar);

}

@KafkaHandler(isDefault = true)

public void unknown(Object object) {

System.out.println("Received unknown: " + object);

}

}

生产者:

@RestController

public class Controller {

@Autowired

private KafkaTemplate<Object, Object> template;

@PostMapping(path = "/send/foo/{what}")

public void sendFoo(@PathVariable String what) {

this.template.send(new GenericMessage<>(new Foo1(what),

Collections.singletonMap(KafkaHeaders.TOPIC, "foos")));

}

@PostMapping(path = "/send/bar/{what}")

public void sendBar(@PathVariable String what) {

this.template.send(new GenericMessage<>(new Bar(what),

Collections.singletonMap(KafkaHeaders.TOPIC, "bars")));

}

@PostMapping(path = "/send/unknown/{what}")

public void sendUnknown(@PathVariable String what) {

this.template.send(new GenericMessage<>(what,

Collections.singletonMap(KafkaHeaders.TOPIC, "bars")));

}

}

事务

通过在应用程序中设置transactional-id前缀来启用事务。yml文件:

spring:

kafka:

producer:

value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

transaction-id-prefix: tx.

consumer:

properties:

isolation.level: read_committed

当使用spring-kafka 1.3时。x或更高版本和支持事务的kafka-clients版本(0.11或更高版本),在@KafkaListener方法中执行的任何KafkaTemplate操作都将参与事务,而侦听器容器将在提交事务之前向事务发送偏移量。请注意,我们还为使用者设置了隔离级别,使其无法看到未提交的记录。下面的例子暂停监听器,这样我们可以看到效果:


@KafkaListener(id = "fooGroup2", topics = "topic2")

public void listen(List foos) throws IOException {

logger.info("Received: " + foos);

foos.forEach(f -> kafkaTemplate.send("topic3", f.getFoo().toUpperCase()));

logger.info("Messages sent, hit enter to commit tx");

System.in.read();

}

@KafkaListener(id = "fooGroup3", topics = "topic3")

public void listen(String in) {

logger.info("Received: " + in);

}

本例中的生产者在一个事务中发送多条记录:

@PostMapping(path = "/send/foos/{what}")

public void sendFoo(@PathVariable String what) {

this.template.executeInTransaction(kafkaTemplate -> {

StringUtils.commaDelimitedListToSet(what).stream()

.map(s -> new Foo1(s))

.forEach(foo -> kafkaTemplate.send("topic2", foo));

return null;

});

}

curl -X POST http://localhost:8080/send/foos/a,b,c,d,e

Received: [Foo2 [foo=a], Foo2 [foo=b], Foo2 [foo=c], Foo2 [foo=d], Foo2 [foo=e]]

Messages sent, hit Enter to commit tx

Received: [A, B, C, D, E]

结论

在Apache Kafka中使用Spring可以消除很多样板代码。它还增加了诸如错误处理、重试和记录筛选等功能——而我们只是触及了表面。

相关文章
|
3月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
105 3
|
3月前
|
消息中间件 Java Kafka
|
3月前
|
消息中间件 Java Kafka
|
3月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
143 4
|
3月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
109 0
|
3月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
38 0
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
47 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
278 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3