【万字长文】详解Flink作业提交流程(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 【万字长文】详解Flink作业提交流程

一、提交流程

Flink 作业在开发完毕之后,需要提交到 Flink 集群执行。ClientFronted 是入口,触发用户开发的 Flink 应用 Jar 文件中的 main 方法,然后交给 PipelineExecutor(流水线执行器,在 FlinkClient 升成 JobGraph 之后,将作业提交给集群的重要环节。)#execue 方法,最终会选择一个触发一个具体的 PiplineExecutor 执行。

640.png

640.png


提交模式又可分为:

  • Detached:Flink Client 创建完集群之后,可以退出命令行窗口,集群独立运行。
  • Attached:不能关闭命令行窗口,需要与集群之间维持连接。

1.1 Yarn Session 提交流程

640.png


启动集群:

  • 使用 bin/yarn-session.sh 提交会话模式的作业。如果提交到已经存在的集群,则获取 Yarn 集群信息、应用 ID,并准备提交作业。如果启动新的 Yarn Session 集群,则进入步骤(2)
  • Yarn 启动新 Flink 集群
    1)如果没有集群,则创建一个新的 Session 模式的集群。首先将应用配置(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储(如 HDFS)的应用暂存目录。
    2)通过 Yarn Client 向 Yarn 提交 Flink 创建集群的申请,Yarn 分配资源,在申请的 Yarn Container 中初始化并启动 FlinkJobManager 进程,在 JobManager 进程中运行 YarnSessionClusterEntrypoint 作为集群启动入口(不同的集群部署模式有不同的 ClusterEntrypoint 实现),初始化 Dispatcher、ResourceManager,启动相关的 RPC 服务,等待 Client 通过 Rest 接口提交作业。

作业提交:

Yarn 集群准备好后,开始作业提交。

1)Flink Client 通过 Rest 向 Dispatcher 提交 JobGraph。

2)Dispatcher 是 Rest 接口,不负责实际的调度、执行方面的工作,当收到 JobGraph 后,为作业创建一个 JobMaster,将工作交给 JobManager(负责作业调度、管理作业和 Task 的生命周期),构建 ExecutionGraph(JobGraph 的并行化版本,调度层最核心的数据结构)。

这两个步骤结束后,作业进入调度执行阶段。

作业调度执行:

1)JobMaster 向 YarnResourceManager 申请资源,开始调度 ExecutionGraph 执行,向 YarnResourceManager 申请资源;初次提交作业集群中尚没有 TaskManager,此时资源不足,开始申请资源。

2)YarnResourceManager 收到 JobManager 的资源请求,如果当前有空闲 Slot 则将 Slot 分配给 JobMaster.,否则 YarnResourceManager 将向 YarnMaster 请求创建 TaskManager。

3)YarnResourceManager 将资源请求加入到等待请求队列,并通过心跳向 Yarn RM 申请新的 Container 资源来启动 TaskManager 进程,Yarn 分配新的 Container 给 TaskManager。

4)YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需要的的相关资源,在容器中启动 TaskManager。

5)TaskManager 启动之后,向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResouceManager。

6)ResourceManager 从等待队列中取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给哪个 JobMaster。

7)TaskManager 向 JobMaster 提供 Slot,JobMaster 调度 Task 到 TaskManager 的此 Slot 上执行。

1.2 Yarn Per-Job 提交流程

640.png

启动集群:

  • 使用./flink run -m yarn-cluster 提交 Per-Job 模式的作业。
  • Yarn 启动 Flink 集群。该模式下 Flink 集群的启动入口是 YarnJobClusterEntryPoint,其他与 YarnSession 模式下集群的启动类似。

作业提交:

该步骤与 Seesion 模式下的不同,Client 并不会通过 Rest 向 Dispatcher 提交 JobGraph,由 Dispatcher 从本地文件系统获取 JObGraph,其后的不好走与 Session 模式的一样

作业调度执行:

与 Yarn Session 模式下一致。

1.3 K8s Session 提交流程

640.png

启动集群:

  • Flink 客户端首先连接 Kubernetes API Server,提交 Flink 集群的资源描述文件,包括 flink-configuration-configmap.yaml、jobmanager-service.yaml、jobmanager-deployment.yaml 和 taskmanager-deployment.yaml 等。
  • Kubernets Master 会根据这些资源描述文件去创建对应的 Kubernetes 实体。以 JobManager 部署为例,Kubernetes 集群中的某个节点收到请求后,Kubelet 进程会从中央仓库下载 Flink 镜像,准备和挂载卷,然后执行启动命令。Pod 启动后 Flink Master(JobManager)进程随之启动,初始化 Dispacher 和 KubernetesResourceManager。并通过 K8s 服务对外暴露 FlinkMaster 的端口,K8s 服务类似于路由服务。

两个步骤完成之后,Session 模式的集群就创建成功,集群可以接收作业提交请求,但是此时还没有 JobManager、TaskManager,当作业需要执行时,才会按需创建。

作业提交:

  • Client 用户可以通过 Flink 命令行(即 Flink Client)向这个会话模式的集群提交任务。此时 JobGraph 会在 FlinkClient 端生成,然后和用户 Jar 包一起通过 RestClient 上传。
  • 作业提交成功,Dispatcher 会为每个作业启动一个 JobMaster,将 JobGraph 交给 JobMaster 调度执行。

两个步骤完成之后,作业进入调度执行阶段。

作业调度执行:

K8s Session 模式集群下,ResourceManager 向 k8sMaster 申请和释放 TaskManager,除此之外,作业的调度与执行和 Yarn 模式是一样的。

1)JobMaster 向 KubernetesResourceManager 请求 Slot。

2)KubernetesResourceManager 从 kubernetes 集群分配 TaskManager。每个 TaskManager 都是具有唯一标识的 Pod。KubernetesResourceManager 会为 TaskManager 生成一份新的配置文件,里面有 Flink Master 的 service name 作为地址。这样在 FLInkMaster failover 之后,TaskManager 仍然可以重新连上。

3)Kubernetes 集群分配一个新的 Pod 后,在上面启动 TaskManager。

4)TaskManager 启动后注册到 SlotManager。

5)SlotManager 向 TaskManager 请求 Slot.

6)TaskManager 提供 Slot 给 JobMaster,然后任务就会被分配到这个 Slot 上运行。

二、Graph 总览

640.png

  • 流计算应用的 Graph 转换:StreamGraph-->JobGraph-->ExecutionGraph-->物理执行图(启动计算任务)
  • 批处理应用的 Graph 转换:OptimizedPlan-->JobGraph
  • Table & SQL API 的 Graph 转换:Blink Table Planner /Flink Table Planner。

2.1 流图

使用 DataStreamAPI 开发的应用程序,首先被转换为 Transformation,然后被映射为 StreamGraph。

2.1.1 SteramGraph 核心对象

  • StreamNode
    StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。
    实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。
  • StreamEdge
    StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。

2.1.2 StreamGraph 生成过程

StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。

640.png

StreamGraph 实际上是在 StreamGraphGenerator 中生成的,从 SinkTransformation(输出) 向前追溯到 SourceTransformation。在遍历过程中一边遍历一边构建 StreamGraph。

640.png

在遍历 Transformation 的过程中,会对不同类型的 Transformation 分别进行转换。对于物理 Transformation 则转换为 StreamNode 实体,对于虚拟 Transformation 则作为虚拟 StreamNode。

640.png

针对于某一种类型的 Transformation,会调用其相应的 transformxxx()函数进行转换。transfromxxx()首先转换上游 Transformation 进行递归转换,确保上游的都已经完成了转换。然后通过 addOperator()方法构造出 StreamNode,通过 addEdge()方法与上游的 transform 进行连接,构造出 StreamEdge。

在添加 StreamEdge 的过程中,如果 ShuffleMode 为 null,则使用 ShuffleMode PIPELINED 模式,在流计算中,只有 PIPLINED 模式才会在批处理中设计其他模式。构建 StreamEdge 的时候,在转换 Transformation 过程中生成的 虚拟 StreamNode 会将虚拟 StreamNode 的信息附着在 StreamEdge 上

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错之当将两个连接器放在同一个作业中时,MySQL作业无法启动,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
21天前
|
存储 SQL 测试技术
Flink⼤状态作业调优实践指南:状态报错与启停慢篇
本文整理自俞航翔、陈婧敏、黄鹏程老师所撰写的大状态作业调优实践指南。由于内容丰富,本文分享终篇状态报错与启停慢篇.
50361 58
Flink⼤状态作业调优实践指南:状态报错与启停慢篇
|
23天前
|
SQL 存储 运维
Flink⼤状态作业调优实践指南:Flink SQL 作业篇
本文整理自俞航翔、陈婧敏、黄鹏程老师所撰写的大状态作业调优实践指南。由于内容丰富,本文中篇内容分享 Flink SQL 作业大状态导致反压的调优原理与方法。
69306 7
Flink⼤状态作业调优实践指南:Flink SQL 作业篇
|
24天前
|
存储 监控 数据处理
Flink⼤状态作业调优实践指南:Datastream 作业篇
本文整理自俞航翔、陈婧敏、黄鹏程老师所撰写的大状态作业调优实践指南。
56242 5
Flink⼤状态作业调优实践指南:Datastream 作业篇
|
12天前
|
SQL 资源调度 数据处理
实时计算 Flink版产品使用问题之在DolphinScheduler调度Flink批作业时,遇到作业提交后状态立即变为成功,但实际上作业还在后台运行的情况,如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
SQL 资源调度 Oracle
实时计算 Flink版产品使用问题之在将作业提交到双网卡集群时,如何不绑定内网IP
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之运行run-application --target kubernetes-application执行,通过进程的返回码来决定作业是否成功,任务返回码都是0,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
101 0
|
12天前
|
SQL Java API
实时计算 Flink版产品使用问题之如何在本地运行和调试包含VVR DataStream连接器的作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错合集之连接器换成2.4.2之后,mysql作业一直报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
52 3