作为Apache Kafka深挖的博客系列第1部分和第2部分的后续,在第3部分中我们将讨论另一个Spring 团队的项目:Spring Cloud Data Flow,其重点是使开发人员能够轻松地开发、部署和协调事件流管道基于Apache Kafka。作为前一篇博客系列文章的延续,本文解释了Spring Cloud数据流如何帮助您提高开发人员的工作效率并管理基于apache - kafka的事件流应用程序开发。
我们将在这篇文章中讨论以下内容:
- Spring云数据流生态系统概述
- 如何使用Spring云数据流来开发、部署和编排事件流管道和应用程序
Spring Cloud Data Flow生态系统
Spring Cloud Data Flow是一个用于设计、开发和持续交付数据管道的工具包。它支持从设计到生产部署的事件流应用程序开发的集中管理。在Spring Cloud数据流中,数据管道可以是事件流(实时长时间运行)或任务/批处理(短期)数据密集型应用程序的组合。与Spring Cloud数据流交互的方式多种多样:
- 仪表板GUI
- 命令行Shell
- 流Java DSL(领域特定语言)
- 通过curl的RESTful api,等等。
为了将事件流管道部署到Cloud Foundry (CF)和Kubernetes (K8s)等平台,Spring Cloud数据流将应用程序生命周期操作(部署、更新、回滚)委托给另一个名为Spring Cloud Skipper的服务器组件。虽然事件流管道部署由Spring Cloud Skipper处理,但将短时间(任务/批处理)数据管道部署到目标平台则由Spring Cloud数据流本身管理。
Spring Cloud数据流和Spring Cloud Skipper运行时都配置为通过OAuth 2.0和OpenID连接提供身份验证和授权。Spring Cloud Data Flow使用基于微米的集成来帮助监视事件流应用程序,并提供Grafana仪表板,您可以安装和定制它。
开发事件流应用程序
在Spring Cloud Data Flow中,事件流管道通常由Spring Cloud Stream应用程序组成,不过任何定制构建的应用程序都可以安装在管道中。开发人员可以直接使用或扩展任何开箱即用的实用程序事件流应用程序来覆盖常见的用例,或者使用Spring Cloud Stream编写自定义应用程序。
所有开箱即用的事件流应用程序是:
- 可作为Apache Maven构件或Docker映像使用
- 使用RabbitMQ或Apache Kafka Spring云流绑定器构建
- 内置 Prometheus和InfluxDB 监测系统
开箱即用的应用程序与Kafka Connect应用程序类似,不同之处是它们使用Spring Cloud Stream框架进行集成和调试。
为了构建一个事件流管道,Spring Cloud数据流提供了一组应用程序类型:
- 源表示数据管道中的第一步,它是一个生产者,从数据库、文件系统、FTP服务器、物联网设备等外部系统中提取数据。
- 处理器表示可以从上游生产者(源或处理器)消费的应用程序,对消费的数据执行业务操作,并将处理后的数据发出供下游消费
- sink表示数据管道的最后一个阶段,它可以将消耗的数据写入外部系统,如Cassandra、PostgreSQL、Amazon S3等。
需要注意的是,在Spring Cloud数据流中,事件流数据管道默认是线性的。这意味着管道中的每个应用程序使用单个目的地(例如Kafka主题)与另一个应用程序通信,数据从生产者线性地流向消费者。然而,在某些用例中,流管道是非线性的,并且可以有多个输入和输出——这是Kafka Streams应用程序的典型设置。
在事件流数据管道中也可以有非spring - cloud - stream应用程序(Kafka连接应用程序、Polygot应用程序等)。
Spring Cloud Data Flow使用流应用程序DSL支持这些情况,并使用应用程序类型app突出显示这些应用程序。
上面的可视化演示了一个由两个应用程序组成的事件流管道,其中可以使用Spring Cloud数据流部署http和jdbc。这两个应用程序都是使用Spring Cloud Stream框架构建的,我们在第2部分中介绍了这个框架,它们都可以在公共Maven存储库/Docker Hub中使用。管道符号|(即。在流DSL中表示一个事件流平台,如Apache Kafka,配置为事件流应用程序的通信。
事件流平台或消息传递中间件提供了流的生产者http源和消费者jdbc接收器应用程序之间的松散耦合。这种松散耦合对于云本地部署模型至关重要,因为管道内的应用程序可以独立地发展、扩展或执行滚动升级,而不会影响上游生产者或下游消费者。当Spring Cloud数据流将Apache Kafka用于事件流应用程序时,它与流媒体平台上的各种产品产生了良好的共鸣。
Spring Cloud data flow环境设置
Spring Cloud Data Flow网站已经开始为本地、Kubernetes和Cloud Foundry提供指南。对于本博客,让我们使用Docker在本地运行这个设置。首先,您需要从Spring Cloud数据流GitHub repo下载Docker撰写文件。
这个Docker的撰写配置有:
- Apache Kafka
- Spring Cloud Data Flow server
- Spring Cloud Skipper server
- Prometheus (application metrics and monitoring)
- Grafana (data visualization)
- Automatic registration of out-of-the-box event streaming applications
由于以上所有组件将与事件流应用程序一起在我们的Docker环境中运行,请确保为您的Docker设置分配最少6GB的空间。
接下来,安装docker-compose并运行以下命令:
export DATAFLOW_VERSION=2.1.0.RELEASE
export SKIPPER_VERSION=2.0.2.RELEASE
docker-compose up
启动所有组件后,可以通过http://localhost:9393/dashboard访问Spring Cloud Data flow仪表板,并注册以下开箱即用的事件流应用程序:
创建事件流管道
让我们使用上一篇博客文章中介绍的相同的大写处理器和日志接收应用程序在Spring Cloud数据流中创建一个事件管道。使用这些应用程序,让我们创建一个简单的流http-events-transformer,如下所示:
- http源侦听http web端点以获取传入数据,并将它们发布到Kafka主题。
- 转换处理器使用来自Kafka主题的事件,其中http源发布步骤1中的数据。然后应用转换逻辑—将传入的有效负载转换为大写,并将处理后的数据发布到另一个Kafka主题。
- 日志接收器使用第2步中转换处理器的输出Kafka主题中的事件,它的职责只是在日志中显示结果。
Spring Cloud数据流中的流DSL语法应该是这样的:
http | transform | log
在Spring Cloud数据流仪表板的“Streams”页面中,您可以创建一个新的流,如下所示。
输入以下流DSL文本:
http-events-transformer=http --server.port=9000 | transform --expression=payload.toUpperCase() | log
当部署流时,有两种类型的属性可以被覆盖:
- 应用程序级属性,这是Spring云流应用程序的配置属性
- 部署目标平台的属性,如本地、Kubernetes或Cloud Foundry
在Spring Cloud Data Flow dashboard的“Streams”页面中,选择stream http-events-transformer,然后单击“deploy”。
在部署流时,请确保将平台选择为本地平台,以便在本地环境中部署流。将日志应用程序的本地平台部署者属性inheritLogging设置为true(如下面的屏幕截图所示),这样可以将日志应用程序的日志文件复制到Spring Cloud Skipper服务器日志中。将应用程序日志放在Skipper服务器日志下可以简化演示。
在部署流时,将检索各个应用程序的http、转换和日志,并将每个应用程序的部署请求发送到目标平台(即、本地、Kubernetes和CloudFoundry)的数据流。同样,当应用程序引导时,以下Kafka主题由Spring Cloud Stream框架自动创建,这就是这些应用程序如何在运行时作为连贯的事件流管道组合在一起。
- http-events-transformer.http(将http源的输出连接到转换处理器的输入的主题)
- http-events-transformer.transform(将转换处理器的输出连接到日志接收器的输入的主题)
Kafka主题名是由Spring云数据流根据流和应用程序命名约定派生的。您可以通过使用适当的Spring云流绑定属性来覆盖这些名称。
要查看所有的运行时流应用程序,请参阅“运行时”页面:
成功部署流之后,HTTP应用程序就可以接收http://localhost:9000上的数据了。让我们发布一些测试数据到http web端点:
curl -X POST http://localhost:9000 -d "spring" -H "Content-Type: text/plain"
因为我们继承了日志应用的日志,所以Spring Cloud Skipper server日志中日志应用的输出可以看作:
log-sink : SPRING
调试流应用程序
您可以在运行时调试部署的应用程序。调试配置根据目标平台而异。请参阅在本地、Kubernetes和Cloud Foundry目标环境中调试部署的应用程序的文档。要在本地开发环境中调试应用程序,只需传递本地部署器属性debugPort即可。
监控事件流应用程序
对于当前的设置,我们使用基于prometheus的应用程序监控,并在缺省情况下使用admin/admin设置一个Grafana仪表板。
通过从Spring Cloud数据流仪表板的“Streams”页面单击事件流http-events-transformer的“Grafana dashboard”图标,可以从Grafana仪表板监视事件流部署。
审计用户操作
Spring Cloud Data Flow server涉及的所有操作都经过审计,审计记录可以从Spring Cloud Data Flow dashboard中的“审计记录”页面访问。
您可以通过单击“Streams”页面中http-events-transformer的Destroy stream选项来删除流。
有关事件流应用程序开发和部署的详细信息,请参阅流开发人员指南。
使用Kafka Streams应用程序开发事件流管道
当您有一个使用Kafka Streams应用程序的事件流管道时,它们可以在Spring Cloud数据流事件流管道中用作处理器应用程序。在下面的示例中,您将看到如何将Kafka Streams应用程序注册为Spring Cloud数据流处理器应用程序,并随后在事件流管道中使用。
本博客中使用的所有样例应用程序都可以在GitHub上找到。应用程序kstreams-word-count是一个Kafka Streams应用程序,它使用Spring Cloud Stream框架来计算给定时间窗口内输入的单词。该应用程序被构建并发布到Spring Maven repo中。
在Spring Cloud Data Flow“Apps”页面的“Add Application(s)”中,您可以通过选择其应用程序类型作为处理器来注册kstreams-word-count应用程序,以及其Maven URI:
让我们使用开箱即用的http源应用程序,它在http web端点http://localhost:9001处侦听传入的数据,并将使用的数据发布到上面步骤中注册的kstream-wordcount处理器。Kafka Streams处理器根据时间窗口计算字数,然后将其输出传播到开箱即用的日志应用程序,该应用程序将字数计数Kafka Streams处理器的结果记录下来。
从Spring Cloud数据流仪表板中的“Streams”页面,使用stream DSL创建一个流:
通过将平台指定为本地,从“Streams”页面部署kstream-wc-sample流。另外,指定部署程序属性local。将日志应用程序的继承日志记录设置为true。
当流成功部署后,所有http、kstream-word-count和log都作为分布式应用程序运行,通过事件流管道中配置的特定Kafka主题连接。
现在,你可以张贴一些字卡夫卡流的应用程序来处理:
curl -X POST http://localhost:9001 -H "Content-Type: text/plain" -d "Baby shark, doo doo doo doo doo doo"
你可以看到日志应用程序现在有以下:
skipper | 2019-03-25 09:53:37.228 INFO 66 --- [container-0-C-1] log-sink : {"word":"baby","count":1,"start":"2019-03-25T09:53:30.000+0000","end":"2019-03-25T09:54:00.000+0000"}
skipper | 2019-03-25 09:53:37.229 INFO 66 --- [container-0-C-1] log-sink : {"word":"shark","count":1,"start":"2019-03-25T09:53:30.000+0000","end":"2019-03-25T09:54:00.000+0000"}
skipper | 2019-03-25 09:53:37.234 INFO 66 --- [container-0-C-1] log-sink : {"word":"doo","count":6,"start":"2019-03-25T09:53:30.000+0000","end":"2019-03-25T09:54:00.000+0000"}
从上面的示例中,您可以看到Kafka Streams应用程序如何适应事件流数据管道。您还看到了如何在Spring Cloud数据流中管理这样的事件流管道。此时,您可以从kstream-wc-sample流页面取消部署并删除流。
结论
对于使用Apache Kafka的事件流应用程序开发人员和数据爱好者来说,本博客提供了Spring Cloud数据流如何帮助开发和部署具有所有基本特性的事件流应用程序,如易于开发和管理、监控和安全性。
Spring Cloud Data Flow提供了一系列工具和自动化来跨云原生平台部署和管理事件流管道。本系列的第4部分将提供通用的事件流拓扑和连续部署模式,作为Spring Cloud数据流中的事件流应用程序的原生集。请继续关注!