Flink 中的应用部署:当前状态与新应用模式

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 作为现代企业的重要工具,流处理和实时分析这类工具逐渐兴起,越来越多的企业以 Apache Flink 为核心构建平台,并将其作为服务在内部提供。

作为现代企业的重要工具,流处理和实时分析这类工具逐渐兴起,越来越多的企业以 Apache Flink 为核心构建平台,并将其作为服务在内部提供。在最新举办的 Flink Forward 会议中, Uber、 Netflix 和阿里巴巴等公司的许多相关主题演讲进一步说明了这一趋势。

众多平台旨在通过减轻最终用户的所有运营负担来简化内部的 Application (应用)提交。为了提交 Flink 应用程序,这些平台通常只公开一个集中式或低并行度端点(例如 Web 前端)用于应用提交,我们将调用 Deployer(部署器)。

平台开发人员和维护人员经常提到的障碍之一是,Deployer 可能是一个很难配置的大量资源消耗者。如果按照平均负载进行配置,可能会导致 Deployer 服务被部署请求淹没(在最坏的情况下,短时间内对所有生产应用程序都是如此),而按照最高负载进行规划的话,又会带来不必要的成本。根据这一观察结果,Flink 1.11 引入了 Application 模式(应用模式)作为部署选项,它允许一个轻量级、更可伸缩性的应用提交过程,从而使应用程序部署负载更均匀地分布在集群的各个节点上。

为了理解这个问题以及了解 Application 模式如何解决该问题,我们首先简要概述 Flink 中应用程序执行的当前状态,然后再阐述部署模式引入的架构变化以及如何利用它们。

Flink 中的应用程序执行

在 Flink 中执行应用程序主要涉及三个实体:Client(客户端)、JobManager(作业管理器)和 TaskManager(任务管理器)。Client 负责将应用提交给集群,JobManager 负责执行期间必要的 bookkeeping,而 TaskManager 则负责实际的计算。更多细节请参考 Flink 的架构 文档。

当前部署模式

在 1.11 版本中引入 Application 模式之前,Flink 允许用户在 Session(会话)或 Per-Job 集群上执行应用程序。两者之间的差异与集群生命周期和它们提供的资源隔离保证有关。

Session 模式

Session 模式(会话模式)假定集群已经运行,并使用该集群的资源来执行任何提交的应用程序。在同一(Session)集群中执行的应用程序使用相同的资源,并因此相互竞争。这样做的好处是,你无需为每个提交的作业分配整个集群的资源开销。但是,如果其中一个作业行为不正常或者关闭了 TaskManager,那么在该 TaskManager 上运行的所有作业都将受到故障的影响。除了对导致故障的作业产生负面影响之外,这还意味着潜在的大规模恢复过程,即所有重新启动的作业同时访问文件系统,并使其不可用于其他服务。此外,单个集群运行多个作业意味着 JobManager 的负载更大,它负责集群中所有作业的 bookkeeping。这种模式非常适合启动短作业,例如交互式查询。

Per-Job 模式

在 Per-Job 模式中,可用的集群管理器框架(如 YARN 或 Kubernetes)用于为每个提交的作业启动 Flink 集群,该集群仅对该作业可用。当作业完成后,集群将关闭,并清理所有延迟的资源(例如文件)。这种模式允许更好的资源隔离,因为行为不正常的作业不会影响任何其他作业。另外,由于每个应用程序都有自己的 JobManager,因此它将 bookkeeping 负载分散到多个实体。考虑到前面提到的 Session 模式中的资源隔离问题,对于长时间运行的作业,用户经常选择 Per-Job 模式,因为这些作业愿意接受一定程度的启动延迟的增加,以支持弹性。

总之,在 Session 模式中,集群生命周期独立于集群中运行的任何作业,并且集群中运行的所有作业共享其资源。Per-Job 模式选择为每个提交的作业分配一个集群,已提供更好的资源隔离保证,因为资源不会在作业之间共享。在这种情况下,集群的生命周期与作业的生命周期相关。

Application 提交

Flink 应用程序的执行包括两个阶段:pre-flight,即当用户的 main() 方法被调用时;runtime,即用户代码调用 execute()时立即触发。main() 方法使用 Flink 的 API(DataStream API、Table API、DataSet API)之一构造用户程序。当 main() 方法调用 env.execute() 时,用户定义的管道将被转换成一种 Flink 运行时可以理解的形式,称为 Job Graph(作业图),并将其传递给集群。

尽管它们方法有所不同,Session 模式和 Per-Job 模式都会在 Client 执行应用程序的 main() 方法,即 pre-flight 阶段。

对于已经在本地拥有作业的所有依赖项,然后通过在其机器上运行的 Client 提交其应用程序的单个用户来说,这通常不是问题。但是,对于通过远程实体(如 Deployer)提交的情况下,这个过程包括:

  • 本地下载应用程序的依赖项;
  • 执行 main() 方法提取 Job Graph;
  • 将 Job Graph 及其依赖项发送到集群以便执行;
  • 等待结果。

这使得 Client 消耗了大量的资源,因为它可能需要大量的网络带宽来下载依赖项或将二进制文件发送到集群,并且需要 CPU 周期来执行 main() 方法。随着越来越多的用户共享同一个 Client,这个问题甚至会变得更加突出。

1.jpg

上图展示了使用红色、蓝色和绿色表示的三个应用程序的两种部署模式。每个矩形都有三个并行项。黑色矩形表示不同的进程,分别是 TaskManager、JobManager 和 Deployer。我们假设在所有情况下只有一个 Deployer 进程。彩色三角形表示提交进程的负载,而彩色矩形表示 TaskManager 和 JobManager 进程的负载。如图所示,Per-Job 模式和 Session 模式下的 Deployer 共享相同的负载。它们的不同之处在于任务的分配和 JobManager 负载。在 Session 模式下,集群中的所有作业都有一个 JobManager,而在 Per-Job 模式下,每个作业都有一个 JobManager。此外,Session 模式下的任务会随机分配给 TaskManager,而在 Per-Job 模式下,每个 TaskManager 只能有单个作业任务。

Application 模式

Application 模式建立在上述观察结果的基础上,并尝试将 Per-Job 模式的资源隔离与轻量级且可伸缩的应用提交过程结合起来。为实现这一点,它为每个提交的应用程序创建一个集群,但是这一次,应用程序的 main() 方法在 JobManager 上执行。

2.jpg

为每个应用程序创建一个集群可以看作是创建一个只在特定应用程序的作业之间共享的 Session 集群,并在应用程序结束时关闭。使用这种架构,Application 模式提供与 Per-Job 模式相同的资源隔离和负载平衡保证,但在整个应用程序的粒度上。这是有道理的,因为属于同一应用程序的工作应该相互关联,并被视为一个单元。

在 JobManager 上执行 main() 方法不仅可以节省提取 Job Graph 所需的 CPU 周期,也可以节省 Client 本地下载依赖项并将 Job Graph 及其依赖项发送到集群所需的带宽。此外,由于每个应用程序只有一个 JobManager,因此,它可以更均匀地分散网络负载。上图对此进行了说明,其中我们具有与 “Session 和 Per-Job 部署模式” 部分中相同的场景,但是这一次 Client 负载已经转移到了每个应用程序的 JobManager。

注:在 Application 模式下, main() 方法是在集群上执行的,而不是像在其他模式中那样在 Client 上执行。和可能对代码产生影响,例如,使用 regsiterCachedFile() 在环境中注册的任何路径都必须由应用程序的 JobManager 进行访问。

与 Per-Job 模式相比,Application 模式允许提交由多个作业组成的应用程序。作业执行的顺序不受部署模式的影响,而是受用于启动作业的调用的影响。使用阻塞 execute() 方法建立一个顺序,并将导致下一个作业的执行被延迟到“这个”作业完成为止。相反,一旦提交了当前作业,非阻塞 executeAsync() 方法将立即继续提交“下一个”作业。

降低网络需求

如上所述,通过在 JobManager 上执行应用程序的 main() 方法,Application 模式可以节省以前在提交作业时所需的大量资源。但仍有改进的余地。

重点关注 YARN,它已经支持所有提到的 here 2,即使 Application 模式已经就绪,Client 仍然需要发送用户 Jar 到 JobManager。此外,对于每个应用程序,Client 必须将 “flink-dist” 目录发送到集群,该目录包含框架本身的二进制文件,包括 flink-dist.jar 、 lib/ 和 plugin/ 目录。这两者可能会在 Client 占用大量带宽。此外,在每次提交时发送相同的 flink-dist 二进制文件既是对带宽的浪费,也是对存储空间的浪费。只需允许应用程序共享相同的二进制文件即可减少存储空间的浪费。

在 Flink 1.11 中,我们引入了医学选项,允许用户进行如下操作:

  1. 指定一个目录的远程路径,在该目录中,YARN 可以找到 Flink 分发二进制文件,以及
  2. 指定一个远程路径,YARN 可以在其中找到用户 Jar。

对于第一步,我们利用了 YARN 的分布式缓存,并允许应用程序共享这些二进制文件。因此,如果一个应用程序碰巧在它的 TaskManager 的本地存储中找到了 Flink 的副本,由于之前的一个应用程序在同一个 TaskManager 上执行,它甚至都不需要在内部下载它。

注:这两种优化都可以用于 YARN 上的所有部署模式,而不仅仅是 Application 模式。

示例:YARN 上的 Application 模式

有关完整说明,请参阅 Flink 的官方文档,更具体地说,请参阅引用集群管理框架的页面,例如 YARN 或 Kubernetes。接下来我将给出一些关于 YARN 的例子,其中上述所有功能都是可用的。

要以 Application 模式启动用用程序,可以使用:

./bin/flink run-application -t yarn-application ./MyApplication.jar

使用这条命令,所有的配置参数,例如用于引导应用程序状态的保存点的路径,或者所需的 JobManager/TaskManager 内存大小,都可以通过它们的配置选项(以 -d 作为前缀)来指定。有关可用配置选项的目录,请参阅 Flink 的 配置页面。

例如,指定 JobManager 和 TaskManager 内存大小的命令如下所示:

./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
./MyApplication.jar

最后,为了进一步节省提交应用程序 Jar 所需的带宽,可以预先将其上传到 HDFS,并指定指向 ./MyApplication.jar 的远程路径,如下所示:

./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/remote-flink-dist-dir" \
hdfs://myhdfs/jars/MyApplication.jar

这将使作业提交变得更加轻量级,因为所需的 Flink jar 和应用程序 Jar 将从指定的远程位置提取,而不是由 Client 发送到集群。Client 将唯一提供给集群的是应用程序的配置,其中包括上述提到的所有路径。

总结

我们希望本文的讨论能够帮助你理解 Flink 提供的各种部署模式之间的差异,并且能够帮助你作出明智的决定,究竟哪一种模式适合你自己的设置。

作者介绍:

Kostas Kloudas,Apache Flink PMC Member、Committer,Ververica 软件工程师。

原文链接:

https://flink.apache.org/news/2020/07/14/application-mode.html

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
11月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
984 1
|
4月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
738 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
6月前
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
768 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
6月前
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
985 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
11月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
286 0
|
11月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
218 0
|
6月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
237 6
|
9月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
1049 2
探索Flink动态CEP:杭州银行的实战案例
|
11月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
285 3
|
11月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
296 0

相关产品

  • 实时计算 Flink版