「Spring和Kafka」Kafka深挖第4部分:事件流管道的连续交付

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 「Spring和Kafka」Kafka深挖第4部分:事件流管道的连续交付




对于事件流应用程序开发人员,根据管道中各个应用程序的更改需要不断更新流管道非常重要。理解流开发人员用于构建事件流管道的一些常见流拓扑也很重要。

在Apache Kafka Deep Dive博客系列的Spring的第4部分中,我们将讨论:

  • Spring云数据流支持的通用事件流拓扑模式
  • 在Spring云数据流中持续部署事件流应用程序

第3部分向您展示了如何:

  • 为Spring Cloud数据流设置本地开发环境
  • 创建和管理事件流管道,包括使用Spring Cloud数据流的Kafka Streams应用程序

有关如何设置Spring Cloud data flow的本地开发的详细信息,请参阅第3部分。

在这篇博客文章中,让我们尝试另一个REST客户机实现来访问Spring Cloud数据流服务器,即用于处理事件流部署的Spring Cloud数据流shell,正如您在第3部分中已经看到的Spring Cloud数据流仪表板的用法。

首先,下载并启动Spring云数据流shell:

wget http://central.maven.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.1.0.RELEASE/spring-cloud-dataflow-shell-2.1.0.RELEASE.jar java -jar spring-cloud-dataflow-shell-2.1.0.RELEASE.jar

Spring cloud data flow 中常见的事件流拓扑

命名的目的地

在Spring Cloud Stream术语中,指定的目的地是消息传递中间件或事件流平台中的特定目的地名称。例如,在Apache Kafka®中,它是Kafka主题本身的名称。在Spring Cloud数据流中,根据目的地(Kafka主题)是作为发布者还是消费者,指定的目的地(Kafka主题)既可以作为直接源,也可以作为接收器。

这对于Apache Kafka用户尤其有用,因为在大多数情况下,事件流平台是Apache Kafka本身。您可以使用来自Kafka主题的数据,也可以将数据生成到Kafka主题。Spring Cloud Data Flow允许使用指定的目的地支持构建从/到Kafka主题的事件流管道。如果事件流部署时主题不存在,则由Spring Cloud Data Flow使用Spring Cloud stream自动创建。

流DSL语法要求指定的目的地以冒号(:)作为前缀。

假设您希望从HTTP web端点收集用户/单击事件,并在将这些事件发布到名为user-click-events的Kafka主题之前应用一些过滤逻辑。在这种情况下,Spring Cloud数据流中的流定义如下:

http | filter > :user-click-events

现在,Kafka主题用户点击事件被设置为从HTTP web端点接收过滤的用户点击事件。让我们假设您希望创建另一个事件流管道,该管道使用这些过滤的用户单击事件,在将它们存储到RDBMS之前应用一些业务逻辑。这种情况下的流DSL应该是这样的:

:user-click-events > transform | jdbc

以上两种流实际上形成了一个事件流管道,它接收来自http源的用户/单击事件——通过过滤器处理器过滤不需要的过滤数据,通过转换处理器应用一些业务逻辑,最终使用jdbc接收器将转换后的数据存储到RDBMS中。

在为扇入/扇出用例开发事件流管道时,命名目的地也很有用。

并行事件流管道

通过从主流处理管道的事件发布者分叉相同的数据来构造并行事件流管道是一种常见的用例。采取一个主要的事件流,如:

mainstream=http | filter --expression= | transform --expression= | jdbc

在部署名为主流的流时,由Spring Cloud Data Flow使用Spring Cloud stream自动创建连接每个应用程序的Kafka主题。Spring Cloud数据流根据流和应用程序命名约定为这些主题命名,您可以使用适当的Spring Cloud流绑定属性覆盖这些名称。在这种情况下,将创建三个Kafka主题:

  • mainstream.http:连接http源的输出和过滤器处理器的输入的Kafka主题
  • mainstream.filter:连接过滤器处理器的输出和转换处理器的输入的Kafka主题
  • mainstream.transform:将转换处理器的输出连接到jdbc接收器的输入的Kafka主题

要创建从主流接收副本的并行事件流管道,需要使用Kafka主题名称来构造事件流管道。例如:

  • 您可能希望利用http应用程序的输出构建一个接收未过滤数据的新事件流管道。流DSL应该是这样的:
  • unfiltered-http-events=:mainstream.http > jdbc
  • 你可能还想要进入过滤器应用程序的输出,以获得过滤后的数据的副本,用于另一个下游持久性:
  • filtered-http-events=:mainstream.filter > mongodb

在Spring Cloud数据流中,事件流的名称是惟一的。因此,它被用作从给定Kafka主题消费的应用程序的消费者组名。这允许多个事件流管道获取相同数据的副本,而不是竞争消息。要了解更多关于tap支持的信息,请参阅Spring Cloud数据流文档。

分区的事件流

分区支持允许在事件流管道中基于内容将有效负载路由到下游应用程序实例。当您希望下游应用程序实例处理来自特定分区的数据时,这尤其有用。例如,如果数据管道中的处理器应用程序基于来自有效负载的唯一标识符(例如,customerId)执行操作,则可以基于该唯一标识对事件流进行分区。

有关Spring Cloud数据流中分区支持的更多信息,请参阅Spring Cloud数据流文档。

函数组合

通过函数组合,可以将功能逻辑动态地附加到现有的事件流应用程序。业务逻辑仅仅是java.util的实现。函数,java.util。供应商或java.util。分别映射到处理器、源和接收器的消费者接口。

如果您有一个使用java.util实现的函数逻辑。函数,您可以表示这个java.util。充当Spring云数据流处理器,并将其附加到现有的源或接收器应用程序。在这个上下文中,函数组合可以是源和处理器组合成一个应用程序:一个新源,也可以是处理器和接收器组合成一个应用程序:一个新接收器。不管采用哪种方式,都可以将处理器应用程序中表示的转换逻辑组合成源或接收应用程序,而不必开发单独的处理器应用程序。

这种灵活性为事件流应用程序开发人员提供了有趣的新机会。在Spring Cloud数据流中组成函数支持的博客文章提供了一个用例,演示了该功能。您还可以参考Spring Cloud数据流文档中的功能组合特性。

多个输入/输出目的地

默认情况下,Spring Cloud数据流表示事件流管道中的生产者(源或处理器)和消费者(处理器或接收器)应用程序之间的一对一连接。

如果事件流管道需要多个输入和输出绑定,Spring Cloud数据流将不会自动配置这些绑定。相反,开发人员负责在应用程序本身中更显式地配置多个绑定。在事件流管道中也可以有一个非spring - cloud - stream应用程序(例如Kafka Connect应用程序或polyglot应用程序),开发人员可以在其中显式地配置输入/输出绑定。为了突出这一区别,Spring Cloud数据流提供了流DSL的另一种变体,其中双管道符号(||)表示事件流管道中的自定义绑定配置。

下面的示例具有多个事件流管道,演示了上述一些事件流拓扑。这个示例在第2部分中使用了Kafka Streams应用程序,它分别根据从userClicks和userRegions Kafka主题接收到的用户/点击和用户/区域事件计算每个区域的用户点击数量。用户区域数据维护在KTable状态存储中,而用户单击数据被解释为KStreams记录。Kafka Streams应用程序的输出被发送到一个名为log-user-click -per-region的演示应用程序,它记录结果。为了模拟用户单击/区域事件,构建在现有开箱即用HTTP应用程序之上的示例HTTP -ingest应用程序将用户单击和用户/区域事件分别放入userClicks和userRegions Kafka主题。

640.jpg


640.jpg


640.jpg


HTTP -ingest应用程序侦听配置的HTTP web端点,并以键/值对发布事件。默认情况下,键是从名为username的HTTP请求头的值派生出来的,而值是从HTTP请求有效负载派生出来的。默认情况下,HTTP请求有效负载类型被认为是字符串类型。

例如,如果HTTP -ingest接收到以下HTTP请求:

curl -X POST http://localhost:9002 -H "username: Glenn" -d "americas" -H "Content-Type: text/plain"

它将键/值输出发布为Glenn/americas,方法是通过从名为username的HTTP请求头获取键Glenn,并从HTTP有效负载获取americas值。通过这种方式,可以使用http-ingest应用程序发布用户/区域数据。

http-ingest还有一个这样的函数bean定义:

@Bean

public Function<String, Long> sendAsUserClicks() {

return value -> Long.parseLong(value);

}

启用此函数bean并将其附加到http-ingest应用程序(通过前面介绍的函数组合支持),每个有效负载都可以从字符串转换为Long,而不是默认的有效负载类型字符串。通过这种方式,在运行时支持函数组合,可以使用相同的http-ingest应用程序发送用户/单击事件。

例如,如果HTTP -ingest(启用了上述函数)接收到以下HTTP请求:

curl -X POST http://localhost:9003 -H "username: Glenn" -d 9 -H "Content-Type: text/plain"

它将键/值输出发布为Glenn/9,方法是从名为username的HTTP请求头中派生出键Glenn,并将HTTP有效负载字符串转换为Long(通过启用上面的sendAsUserClicks函数)来获得值9。

这是演示Spring Cloud数据流中的功能组合的最简单的方法之一,因为可以使用同一个http-ingest应用程序在运行时发布用户/区域和用户/单击数据。

对于那些好奇的人来说,本文中讨论的所有示例应用程序都可以在spring-cloud-dataflow-samples中找到。您可以将这些Maven构件注册为Spring Cloud数据流中的事件流应用程序。

让我们在使用Spring Cloud Data Flow shell注册各个应用程序之后创建事件流管道。如果您还没有安装Spring Cloud Data Flow,请在设置Spring Cloud Data Flow之后下载并启动Spring Cloud Data Flow shell。

wget http://central.maven.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.1.0.RELEASE/spring-cloud-dataflow-shell-2.1.0.RELEASE.jar java -jar spring-cloud-dataflow-shell-2.1.0.RELEASE.jar

您可以将http-ingest构件注册为Spring Cloud Data Flow source应用程序,该应用程序来自Spring Cloud Data Flow shell:

dataflow:>app register --name http-ingest --type source --uri maven://org.springframework.cloud.dataflow.samples:http-ingest:1.0.0.BUILD-SNAPSHOT

由于我们希望显式地将Kafka主题命名为userregion和userclick,所以在创建事件流时,让我们使用指定的目的地支持来摄取用户/区域和用户/单击事件到相应的Kafka主题中。

首先创建一个事件流,摄取用户的区域:

dataflow:>stream create ingest-user-regions --definition "http-ingest --server.port=9002 > :userRegions" --deploy

由于我们希望显式地将Kafka主题命名为userregion和userclick,所以在创建事件流时,让我们使用指定的目的地支持来摄取用户/区域和用户/单击事件到相应的Kafka主题中。

首先创建一个事件流,摄取用户的区域:

dataflow:>stream create ingest-user-regions --definition "http-ingest --server.port=9002 > :userRegions" --deploy

创建一个事件流摄取用户点击事件:

dataflow:>stream create ingest-user-clicks --definition "http-ingest --server.port=9003 --spring.cloud.stream.function.definition=sendAsUserClicks --spring.cloud.stream.kafka.binder.configuration.value.serializer=org.apache.kafka.common.serialization.LongSerializer > :userClicks" --deploy

通过将Spring Cloud流属性Spring . Cloud . Stream .function.definition设置为函数名sendAsUserClicks,可以启用函数组合,该函数将HTTP有效载荷从字符串转换为Long。我们还需要设置Kafka配置属性值。序列化到org.apache.kafka.common. serialize . longserializer来处理长类型。

由于Kafka Streams应用程序kstreams-join-user-click -and-region有多个输入(一个用于用户单击事件,另一个用于用户区域事件),因此需要将该应用程序部署为应用程序类型。此外,开发人员有责任显式地将绑定配置到适当的Kafka主题。

将Kafka Streams应用程序注册为Spring Cloud数据流中的应用程序类型:

dataflow:> app register --name join-user-clicks-and-regions --type app --uri maven://org.springframework.cloud.dataflow.samples:kstreams-join-user-clicks-and-region:1.0.0.BUILD-SNAPSHOT

我们还有演示应用程序log-user-click -per-region,它记录Kafka Streams应用程序join-user-click -and-regions的结果。由于app类型与其他事件流应用程序类型source、sink和processor不兼容,因此此应用程序还需要注册为app类型,以便作为一个连贯的事件流管道一起工作。

dataflow:> app register --name log-user-clicks-per-region --type app --uri maven://org.springframework.cloud.dataflow.samples:kstreams-log-user-clicks-per-region:1.0.0.BUILD-SNAPSHOT

现在两个应用程序都已注册,让我们创建一个流,捆绑Kafka Streams应用程序和它的结果记录器:

dataflow:> stream create clicks-per-region --definition "join-user-clicks-and-regions || log-user-clicks-per-region" dataflow:>stream deploy clicks-per-region --properties "deployer.log-user-clicks-per-region.local.inheritLogging=true"

您可以使用Spring Cloud Data Flow shell中的stream list命令来确认所有三个事件流(用户区域、用户点击、每个区域点击)都已成功部署。

让我们发送一些示例数据来观察动作中的Kafka流聚合。

ingest-user-regions事件流中的http-ingest应用程序通过http://localhost:9002接收用户区域数据:

curl -X POST http://localhost:9002 -H "username: Glenn" -d "americas" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Soby" -d "americas" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Janne" -d "europe" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: David" -d "americas" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Ilaya" -d "americas" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Mark" -d "americas" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Sabby" -d "americas" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Gunnar" -d "americas" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Ilaya" -d "asia" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Chris" -d "americas" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Damien" -d "europe" -H "Content-Type: text/plain"

curl -X POST http://localhost:9002 -H "username: Christian" -d "europe" -H "Content-Type: text/plain"

ingest-user-click事件流中的http-ingest应用程序接受用户在http://localhost:9003处单击事件:

curl -X POST http://localhost:9003 -H "username: Glenn" -d 9 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: Soby" -d 15 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: Janne" -d 10 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: Mark" -d 7 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: David" -d 15 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: Sabby" -d 20 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: Gunnar" -d 18 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: Ilaya" -d 10 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: Chris" -d 5 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: Damien" -d 21 -H "Content-Type: text/plain"

curl -X POST http://localhost:9003 -H "username: Christian" -d 12 -H "Content-Type: text/plain"

一旦发布了上述数据,您将看到Kafka Streams应用程序计算每个区域的用户单击的实时聚合,并将结果发送给下游应用程序。在这种情况下,它是日志记录器应用程序,并在日志中有以下结果:

o.s.c.s.a.l.UserClicksPerRegion$Logger$1 : europe : 43

o.s.c.s.a.l.UserClicksPerRegion$Logger$1 : asia : 10

o.s.c.s.a.l.UserClicksPerRegion$Logger$1 : americas : 89

考虑到这是一个实时的事件流管道,您可以发布更多的用户/单击和用户/区域事件,并看到结果继续实时更新。

因为我们在Docker中运行了所有的事件流应用,所以在我们移动到下一个例子之前,我们先删除事件流:

dataflow:>stream destroy ingest-user-regions

dataflow:>stream destroy ingest-user-clicks

dataflow:>stream destroy clicks-per-region

连续部署事件流应用程序

在事件流管道中组成的应用程序可以自主地进行更改,比如切换启用功能或修复bug。为了避免流处理的停机时间,必须在不影响整个数据管道的情况下更新或回滚所需应用程序的此类更改。

Spring Cloud数据流为事件流应用程序的持续部署提供了本机支持。Spring Cloud Data Flow中的应用程序注册表允许您为同一个事件流应用程序注册多个版本。这样,当更新在生产环境中运行的事件流管道时,您可以选择切换到应用程序的特定版本或更改在事件流管道中组成的应用程序的任何配置属性。

让我们使用第3部分中使用的事件流(即开箱即用的事件流应用程序)来体验一下开发人员的体验。

如果您还没有创建和部署事件流http-events-transformer,您可以执行以下shell命令来创建和部署事件流。下面的例子使用开箱即用的事件流应用程序是注册在你的Docker撰写设置:

dataflow:>stream create http-events-transformer --definition "http --server.port=9000 | transform --expression=payload.toUpperCase() | log"

dataflow:>stream deploy http-events-transformer --properties "deployer.log.local.inheritLogging=true"

一旦流被成功部署,发布一些来自Spring Cloud数据流shell的数据:

dataflow:>http post --target "http://localhost:9000" --data "spring"

在船长日志,你会看到以下内容:

log-sink : SPRING

命令流manifest http-events-transformer显示该事件流的所有应用程序。例如,您可以看到转换应用程序具有“transformer”属性。表达”:“payload.toUpperCase ()”。命令流history http-events-transformer显示了该事件流的历史记录,列出了所有可用的版本。

现在,假设您希望更改转换应用程序中使用的转换逻辑,而不需要重新部署整个流,并独立地更新转换应用程序。

dataflow:>stream update http-events-transformer --properties "app.transform.expression=payload.toUpperCase().concat('!!!')"

当您再次运行流清单http-events-transformer命令时,您将看到转换应用程序现在已更改为包含expression属性,该属性通过附加!!在最后。

一旦流被更新,发布一些数据来测试更新:

dataflow:>http post --target "http://localhost:9000" --data "spring"

在skipper 日志,你将看到以下内容:

log-sink : SPRING!!!

命令流历史http-events-transformer将在该流的历史中包含新的事件。如果希望将事件流回滚到特定的版本,可以使用命令流回滚http-events-transformer——releaseVersion。

回滚到事件流的初始版本后(转换应用程序只是做了大写转换):

dataflow:>stream rollback http-events-transformer --releaseVersion 1 Rollback request has been sent for the stream 'http-events-transformer'

已为流“http-event -transformer”发送回滚请求

一旦回滚完成,发布一些数据:

dataflow:>http post --target "http://localhost:9000" --data "spring"

> POST (text/plain) http://localhost:9000 spring

> 202 ACCEPTED

在日志中,您将看到使用初始transformer表达式的输出:

log-sink : SPRING

你可以删除事件流如下:

dataflow:>stream destroy http-events-transformer

Destroyed stream 'http-events-transformer'

请注意,所有这些操作都可以在Spring Cloud数据流仪表板上执行。

结论

我们通过一个示例应用程序介绍了使用Apache Kafka和Spring云数据流的一些常见事件流拓扑。您还了解了Spring Cloud数据流如何支持事件流应用程序的持续部署。

这个Spring for Apache Kafka Deep Dive博客系列向您展示了Spring项目组合(如Spring Kafka、Spring Cloud Stream和Spring Cloud Data Flow)如何帮助您在Apache Kafka上高效地构建和管理应用程序。

相关文章
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
124 3
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
153 4
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
112 0
|
4月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
38 0
|
4月前
|
消息中间件 存储 Kafka
Kafka 与 SQS:事件流工具深入比较
【8月更文挑战第13天】
126 0
|
3月前
|
SQL 监控 druid
springboot-druid数据源的配置方式及配置后台监控-自定义和导入stater(推荐-简单方便使用)两种方式配置druid数据源
这篇文章介绍了如何在Spring Boot项目中配置和监控Druid数据源,包括自定义配置和使用Spring Boot Starter两种方法。
|
2月前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
183 2
|
18天前
|
缓存 IDE Java
SpringBoot入门(7)- 配置热部署devtools工具
SpringBoot入门(7)- 配置热部署devtools工具
28 2
 SpringBoot入门(7)- 配置热部署devtools工具