「Spring和Kafka」Kafka整合Spring 深入挖掘第2部分:Kafka和Spring Cloud Stream

简介: 「Spring和Kafka」Kafka整合Spring 深入挖掘第2部分:Kafka和Spring Cloud Stream

在这个博客系列的第1部分之后,Apache Kafka的Spring——第1部分:错误处理、消息转换和事务支持「Spring和Kafka」Kafka整合Spring 深入挖掘 -第1部分,在这里的第2部分中,我们将关注另一个增强开发者在Kafka上构建流应用程序时体验的项目:Spring Cloud Stream。

我们将在这篇文章中讨论以下内容:

  • Spring云流及其编程模型概述
  • Apache Kafka®集成在Spring云流
  • Spring Cloud Stream如何让Kafka开发人员更轻松地开发应用程序
  • 使用Kafka流和Spring云流进行流处理

让我们首先看看什么是Spring Cloud Stream,以及它如何与Apache Kafka一起工作。

什么是Spring Cloud Stream?

Spring Cloud Stream是一个框架,它允许应用程序开发人员编写消息驱动的微服务。这是通过使用Spring Boot提供的基础来实现的,同时还支持其他Spring组合项目(如Spring Integration、Spring Cloud函数和Project Reactor)公开的编程模型和范例。它支持使用描述输入和输出组件的类型安全编程模型编写应用程序。应用程序的常见示例包括源(生产者)、接收(消费者)和处理器(生产者和消费者)。

典型的Spring cloud stream 应用程序包括用于通信的输入和输出组件。这些输入和输出被映射到Kafka主题。Spring cloud stream应用程序可以接收来自Kafka主题的输入数据,它可以选择生成另一个Kafka主题的输出。这些与Kafka连接接收器和源不同。有关各种Spring Cloud流开箱即用应用程序的更多信息,请访问项目页面。

消息传递系统和Spring cloud stream之间的桥梁是通过绑定器抽象实现的。绑定器适用于多个消息传递系统,但最常用的绑定器之一适用于Apache Kafka。

Kafka绑定器扩展了Spring Boot、Apache Kafka的Spring和Spring集成的坚实基础。由于绑定器是一个抽象,所以其他消息传递系统也有可用的实现。

Spring Cloud Stream支持发布/订阅语义、消费者组和本机分区,并尽可能将这些职责委派给消息传递系统。对于Kafka绑定器,这些概念在内部映射并委托给Kafka,因为Kafka本身就支持它们。当消息传递系统本身不支持这些概念时,Spring Cloud Stream将它们作为核心特性提供。

以下是绑定器抽象如何与输入和输出工作的图示:

使用Spring Cloud Stream创建Kafka应用程序

Spring Initializr是使用Spring Cloud Stream创建新应用程序的最佳场所。这篇博文介绍了如何在Spring启动应用程序中使用Apache Kafka,涵盖了从Spring Initializr创建应用程序所需的所有步骤。对于Spring Cloud Stream,惟一的区别是您需要“Cloud Stream”和“Kafka”作为组件。以下是你需要选择的一个例子:

initializr包含开发流应用程序所需的所有依赖项。通过使用Initializr,您还可以选择构建工具(如Maven或Gradle)和目标JVM语言(如Java或Kotlin)。

该构建将生成一个能够作为独立应用程序(例如,从命令行)运行的uber JAR。

Apache Kafka的Spring cloud stream编程模型

Spring Cloud Stream提供了一个编程模型,支持与Apache Kafka的即时连接。应用程序需要在其类路径中包含Kafka绑定,并添加一个名为@EnableBinding的注释,该注释将Kafka主题绑定到它的输入或输出(或两者)。

Spring Cloud Stream提供了三个与@EnableBinding绑定的方便接口:Source(单个输出)、Sink(单个输入)和Processor(单个输入和输出)。它还可以扩展到具有多个输入和输出的自定义接口。

下面的代码片段展示了Spring Cloud Stream的基本编程模型:

@SpringBootApplication

@EnableBinding(Processor.class)

public class UppercaseProcessor {

 @StreamListener(Processor.INPUT)

 @SendTo(Processor.OUTPUT)

 public String process(String s) {

    return s.toUpperCase();

 }

}

在这个应用程序中,注意这个方法是用@StreamListener注释的,它是由Spring Cloud Stream提供的,用于接收来自Kafka主题的消息。同样的方法也使用SendTo进行注释,SendTo是将消息发送到输出目的地的方便注释。这是一个Spring云流处理器应用程序,它使用来自输入的消息并将消息生成到输出。

在前面的代码中没有提到Kafka主题。此时可能出现的一个自然问题是,“这个应用程序如何与Kafka通信?”答案是:入站和出站主题是通过使用Spring Boot支持的许多配置选项之一来配置的。在本例中,我们使用一个名为application的YAML配置文件。yml,它是默认搜索的。下面是输入和输出目的地的配置:

spring.cloud.stream.bindings:

 input:

   destination: topic1

 output:

   destination: topic2

 

Spring Cloud Stream将输入映射到topic1,将输出映射到topic2。这是一组非常少的配置,但是可以使用更多的选项来进一步定制应用程序。默认情况下,主题是用单个分区创建的,但是可以由应用程序覆盖。更多信息请参考这些文档。

最重要的是,开发人员可以简单地专注于编写核心业务逻辑,让Spring Cloud Stream和Spring Boot来处理基础设施问题(比如连接到Kafka、配置和调优应用程序等等)。

下面的例子展示了另一个简单的应用程序(消费者):

@SpringBootApplication

@EnableBinding(Sink.class)

public class LoggingConsumerApplication {

 @StreamListener(Sink.INPUT)

 public void handle(Person person) {

    System.out.println("Received: " + person);

 }

 public static class Person {

    private String name;

    public String getName() {

       return name;

    }

    public void setName(String name) {

       this.name = name;

    }

    public String toString() {

       return this.name;

    }

 }

}

 

注意,@EnableBinding提供了一个接收器,这表明这是一个消费者。与前一个应用程序的一个主要区别是,使用@StreamListener注释的方法将一个名为Person的POJO作为参数,而不是字符串。来自Kafka主题的消息是如何转换成这个POJO的?Spring Cloud Stream提供了自动的内容类型转换。默认情况下,它使用application/JSON作为内容类型,但也支持其他内容类型。您可以通过使用属性spring.cloud.stream.binding .input来提供内容类型。然后将其设置为适当的内容类型,如application/Avro。

适当的消息转换器由Spring Cloud Stream根据这个配置来选择。如果应用程序希望使用Kafka提供的本地序列化和反序列化,而不是使用Spring Cloud Stream提供的消息转换器,那么可以设置以下属性。

序列化:

spring.cloud.stream.bindings.output.useNativeEncoding=true

反序列化:

spring.cloud.stream.bindings.input.useNativeDecoding=true

Auto-provisioning of topic

Apache Kafka绑定器提供了一个在启动时配置主题的配置程序。如果在代理上启用了主题创建,Spring Cloud Stream应用程序可以在应用程序启动时创建和配置Kafka主题。

例如,可以向供应者提供分区和其他主题级配置。这些定制可以在绑定器级别进行,绑定器级别将应用于应用程序中使用的所有主题,也可以在单独的生产者和消费者级别进行。这非常方便,特别是在应用程序的开发和测试期间。有许多关于如何为多个分区配置主题的示例。

支持使用者组和分区

可以使用Spring Cloud Stream配置众所周知的属性,如用户组和分区。消费者组可以通过属性设置:

spring.cloud.stream.bindings.input.group =组名称

如前所述,在内部,这个组将被翻译成Kafka的消费者组。

在编写生产者应用程序时,Spring Cloud Stream提供了将数据发送到特定分区的选项。同样,在内部,框架将这些职责委托给Kafka。

对于使用者,如果禁用自动再平衡(这是一个需要覆盖的简单配置属性),则特定的应用程序实例可以限制为使用来自一组特定分区的消息。有关详细信息,请参阅这些配置选项。

绑定可视化和控制

通过使用Spring Boot的致动器机制,我们现在能够控制Spring cloud stream中的各个绑定。

在运行时,可以使用执行器端点来停止、暂停、恢复等,执行器端点是Spring Boot的机制,用于在将应用程序推向生产环境时监视和管理应用程序。该特性使用户能够对应用程序处理来自Kafka的数据的方式有更多的控制。如果应用程序因绑定而暂停,那么来自该特定主题的处理记录将暂停,直到恢复。

Spring Cloud Stream还集成了Micrometer,以启用更丰富的指标、发出混乱的速率并提供其他与监视相关的功能。这些系统可以与许多其他监测系统进一步集成。Kafka绑定器提供了扩展的度量功能,为主题的消费者滞后提供了额外的见解。

Spring Boot通过一个特殊的健康状况端点提供应用程序健康状况检查。Kafka绑定器提供了一个健康指示器的特殊实现,它考虑到代理的连接性,并检查所有的分区是否都是健康的。如果发现任何分区没有leader,或者代理无法连接,那么health check将报告相应的状态。

Kafka流在Spring cloud stream中的支持概述

在编写流处理应用程序时,Spring Cloud stream提供了另一个专门用于Kafka流的绑定器。与常规的Kafka绑定器一样,Kafka Streams绑定器也关注开发人员的生产力,因此开发人员可以专注于为KStream、KTable、GlobalKTable等编写业务逻辑,而不是编写基础结构代码。绑定器负责连接到Kafka,以及创建、配置和维护流和主题。例如,如果应用程序方法具有KStream签名,则绑定器将连接到目标主题,并在后台从该主题生成流。应用程序开发人员不必显式地这样做,因为绑定器已经为应用程序提供了绑定。

其他类型(如KTable和GlobalKTable)也是如此。底层的KafkaStreams对象由绑定器提供,用于依赖注入,因此,应用程序不直接维护它。更确切地说,它是由春天的云流为你做的。

要使用Spring Cloud Stream开始Kafka流,请转到Spring Initializr并选择如下图所示的选项,以生成一个应用程序,该应用程序带有使用Spring Cloud Stream编写Kafka流应用程序的依赖项:

上面的例子展示了一个用Spring Cloud Stream编写的Kafka Streams应用程序:

@SpringBootApplication

public class KafkaStreamsTableJoin {

 @EnableBinding(StreamTableProcessor.class)

 public static class KStreamToTableJoinApplication {

    @StreamListener

    @SendTo("output")

    public KStream<String, Long> process(@Input("input1") KStream<String, Long> userClicksStream,

                                @Input("input2") KTable<String, String> userRegionsTable) {

       return userClicksStream

             .leftJoin(userRegionsTable,

                   (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),

                   Joined.with(Serdes.String(), Serdes.Long(), null))

             .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))

             .groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))

             .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)

             .toStream();

    }

 }

 interface StreamTableProcessor {

    @Input("input1")

    KStream inputStream();

    @Output("output")

    KStreamoutputStream();

    @Input("input2")

    KTable inputTable();

 }

}

在前面的代码中有几件事情需要注意。在@StreamListener方法中,没有用于设置Kafka流组件的代码。应用程序不需要构建流拓扑,以便将KStream或KTable与Kafka主题关联起来,启动和停止流,等等。所有这些机制都是由Kafka流的Spring Cloud Stream binder处理的。在调用该方法时,已经创建了一个KStream和一个KTable供应用程序使用。

应用程序创建一个名为StreamTableProcessor的自定义接口,该接口指定用于输入和输出绑定的Kafka流类型。此接口与@EnableBinding一起使用。此接口的使用方式与我们在前面的处理器和接收器接口示例中使用的方式相同。与常规的Kafka绑定器类似,Kafka上的目的地也是通过使用Spring云流属性指定的。您可以为前面的应用程序提供这些配置选项来创建必要的流和表:

spring.cloud.stream.bindings.input1.destination=userClicksTopic spring.cloud.stream.bindings.input2.destination=userRegionsTopic spring.cloud-stream.bindings.output.destination=userClickRegionsTopic

我们使用两个Kafka主题来创建传入流:一个用于将消息消费为KStream,另一个用于消费为KTable。框架根据自定义接口StreamTableProcessor中提供的绑定适当地使用所需的类型。然后,这些类型将与方法签名配对,以便在应用程序代码中使用。在出站时,出站的KStream被发送到输出Kafka主题。

Kafka流中可查询的状态存储支持

Kafka流为编写有状态应用程序提供了第一类原语。当使用Spring Cloud Stream和Kafka流构建有状态应用程序时,就有可能使用RESTful应用程序从RocksDB的持久状态存储中提取信息。下面是一个Spring REST应用程序的例子,它依赖于Kafka流中的状态存储:

@RestController

public class FooController {

 private final Log logger = LogFactory.getLog(getClass());

 @Autowired

 private InteractiveQueryService interactiveQueryService;

@RequestMapping("/song/id")

public SongBean song(@RequestParam(value="id") Long id) {

    final ReadOnlyKeyValueStore<Long, Song> songStore =

          interactiveQueryService.getQueryableStore(“STORE-NAME”,

QueryableStoreTypes.<Long, Song>keyValueStore());

    final Song song = songStore.get(id);

    if (song == null) {

       throw new IllegalArgumentException("Song not found.");

    }

    return new SongBean(song.getArtist(), song.getAlbum(), song.getName());

 }

}

 

InteractiveQueryService是Apache Kafka Streams绑定器提供的一个API,应用程序可以使用它从状态存储中检索数据。应用程序可以使用此服务按名称查询状态存储,而不是直接通过底层流基础设施访问状态存储。当Kafka Streams应用程序的多个实例运行时,该服务还提供了用户友好的方式来访问服务器主机信息,这些实例之间有分区。

通常在这种情况下,应用程序必须通过直接访问Kafka Streams API来找到密钥所在的分区所在的主机。InteractiveQueryService提供了这些API方法的包装器。一旦应用程序获得了对状态存储的访问权,它就可以通过查询来形成进一步的见解。最终,可以通过上面所示的REST端点来提供这些见解。您可以在GitHub上找到一个使用Spring Cloud Stream编写的Kafka Streams应用程序的示例,在这个示例中,它使用本节中提到的特性来适应Kafka音乐示例。

Branching in Kafka Streams

通过使用SendTo注释,可以在Spring Cloud流中原生地使用Kafka流的分支特性。

@StreamListener("input")

@SendTo({“englishTopic”, “frenchTopic”, “spanishTopic”})

public KStream<?, WordCount>[] process(KStream<Object, String> input) {

 Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");

 Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");

 Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

 return input

       .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

       .groupBy((key, value) -> value)

       .windowedBy(timeWindows)

       .count(Materialized.as("WordCounts-1"))

       .toStream()

       .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))

       .branch(isEnglish, isFrench, isSpanish);

}

注意,SendTo注释有三个不同输出的绑定,方法本身返回一个KStream[]。Spring Cloud Stream在内部将分支发送到输出绑定到的Kafka主题。观察SendTo注释中指定的输出顺序。这些输出绑定将与输出的KStream[]按其在数组中的顺序配对。

数组的第一个索引中的第一个KStream可以映射到englishTopic,然后将下一个映射到frenchTopic,以此类推。这里的想法是,应用程序可以专注于功能方面的事情,并使用Spring Cloud Stream设置所有这些输出流,否则开发人员将不得不为每个流单独做这些工作。

Spring cloud stream中的错误处理

Spring Cloud Stream提供了错误处理机制来处理失败的消息。它们可以被发送到死信队列(DLQ),这是Spring Cloud Stream创建的一个特殊的Kafka主题。当失败的记录被发送到DLQ时,头信息被添加到记录中,其中包含关于失败的更多信息,如异常堆栈跟踪、消息等。

发送到DLQ是可选的,框架提供各种配置选项来定制它。

对于Spring Cloud Stream中的Kafka Streams应用程序,错误处理主要集中在反序列化错误上。Apache Kafka Streams绑定器提供了使用Kafka Streams提供的反序列化处理程序的能力。它还提供了在主流继续处理时将失败的记录发送到DLQ的能力。当应用程序需要返回来访问错误记录时,这是非常有用的。

模式演化和Confluent 模式注册

Spring Cloud Stream支持模式演化,它提供了与Confluent模式注册中心以及Spring Cloud Stream提供的本地模式注册中心服务器一起工作的功能。应用程序通过在应用程序级别上包含@EnableSchemaRegistryClient注释来启用模式注册表。Spring Cloud Stream提供了各种基于Avro的消息转换器,可以方便地与模式演化一起使用。在使用Confluent模式注册表时,Spring Cloud Stream提供了一个应用程序需要作为SchemaRegistryClient bean提供的特殊客户端实现(ConfluentSchemaRegistryClient)。

结论

Spring Cloud Stream通过自动处理其他同等重要的非功能需求(如供应、自动内容转换、错误处理、配置管理、用户组、分区、监视、健康检查等),使应用程序开发人员更容易关注业务逻辑,从而提高了使用Apache Kafka的生产率。


相关文章
|
1月前
|
负载均衡 Java API
【Spring Cloud生态】Spring Cloud Gateway基本配置
【Spring Cloud生态】Spring Cloud Gateway基本配置
37 0
|
3月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
78 3
|
3月前
|
Java Spring
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
【Azure Spring Cloud】Spring Cloud Azure 4.0 调用Key Vault遇见认证错误 AADSTS90002: Tenant not found.
|
3月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
107 0
|
3月前
|
Java Spring 容器
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
【Azure Spring Cloud】在Azure Spring Apps上看见 App Memory Usage 和 jvm.menory.use 的指标的疑问及OOM
|
3月前
|
存储 Java Spring
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
|
3月前
|
SQL Java 数据库连接
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
【Azure Spring Cloud】Azure Spring Cloud connect to SQL using MSI
|
3月前
|
Java 开发工具 Spring
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
【Azure Spring Cloud】使用azure-spring-boot-starter-storage来上传文件报错: java.net.UnknownHostException: xxxxxxxx.blob.core.windows.net: Name or service not known
|
3月前
|
NoSQL Java Redis
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常
【Azure Spring Cloud】Java Spring Cloud 应用部署到Azure上后,发现大量的 java.lang.NullPointerException: null at io.lettuce.core.protocol.CommandHandler.writeSingleCommand(CommandHandler.java:426) at ... 异常
|
3月前
|
Java Spring
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)
【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)