一、提交流程
Flink 作业在开发完毕之后,需要提交到 Flink 集群执行。ClientFronted 是入口,触发用户开发的 Flink 应用 Jar 文件中的 main 方法,然后交给 PipelineExecutor(流水线执行器,在 FlinkClient 升成 JobGraph 之后,将作业提交给集群的重要环节。)#execue 方法,最终会选择一个触发一个具体的 PiplineExecutor 执行。
提交模式又可分为:
- Detached:Flink Client 创建完集群之后,可以退出命令行窗口,集群独立运行。
- Attached:不能关闭命令行窗口,需要与集群之间维持连接。
1.1 Yarn Session 提交流程
启动集群:
- 使用 bin/yarn-session.sh 提交会话模式的作业。如果提交到已经存在的集群,则获取 Yarn 集群信息、应用 ID,并准备提交作业。如果启动新的 Yarn Session 集群,则进入步骤(2)
- Yarn 启动新 Flink 集群
1)如果没有集群,则创建一个新的 Session 模式的集群。首先将应用配置(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储(如 HDFS)的应用暂存目录。
2)通过 Yarn Client 向 Yarn 提交 Flink 创建集群的申请,Yarn 分配资源,在申请的 Yarn Container 中初始化并启动 FlinkJobManager 进程,在 JobManager 进程中运行 YarnSessionClusterEntrypoint 作为集群启动入口(不同的集群部署模式有不同的 ClusterEntrypoint 实现),初始化 Dispatcher、ResourceManager,启动相关的 RPC 服务,等待 Client 通过 Rest 接口提交作业。
作业提交:
Yarn 集群准备好后,开始作业提交。
1)Flink Client 通过 Rest 向 Dispatcher 提交 JobGraph。
2)Dispatcher 是 Rest 接口,不负责实际的调度、执行方面的工作,当收到 JobGraph 后,为作业创建一个 JobMaster,将工作交给 JobManager(负责作业调度、管理作业和 Task 的生命周期),构建 ExecutionGraph(JobGraph 的并行化版本,调度层最核心的数据结构)。
这两个步骤结束后,作业进入调度执行阶段。
作业调度执行:
1)JobMaster 向 YarnResourceManager 申请资源,开始调度 ExecutionGraph 执行,向 YarnResourceManager 申请资源;初次提交作业集群中尚没有 TaskManager,此时资源不足,开始申请资源。
2)YarnResourceManager 收到 JobManager 的资源请求,如果当前有空闲 Slot 则将 Slot 分配给 JobMaster.,否则 YarnResourceManager 将向 YarnMaster 请求创建 TaskManager。
3)YarnResourceManager 将资源请求加入到等待请求队列,并通过心跳向 Yarn RM 申请新的 Container 资源来启动 TaskManager 进程,Yarn 分配新的 Container 给 TaskManager。
4)YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需要的的相关资源,在容器中启动 TaskManager。
5)TaskManager 启动之后,向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResouceManager。
6)ResourceManager 从等待队列中取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给哪个 JobMaster。
7)TaskManager 向 JobMaster 提供 Slot,JobMaster 调度 Task 到 TaskManager 的此 Slot 上执行。
1.2 Yarn Per-Job 提交流程
启动集群:
- 使用./flink run -m yarn-cluster 提交 Per-Job 模式的作业。
- Yarn 启动 Flink 集群。该模式下 Flink 集群的启动入口是 YarnJobClusterEntryPoint,其他与 YarnSession 模式下集群的启动类似。
作业提交:
该步骤与 Seesion 模式下的不同,Client 并不会通过 Rest 向 Dispatcher 提交 JobGraph,由 Dispatcher 从本地文件系统获取 JObGraph,其后的不好走与 Session 模式的一样
作业调度执行:
与 Yarn Session 模式下一致。
1.3 K8s Session 提交流程
启动集群:
- Flink 客户端首先连接 Kubernetes API Server,提交 Flink 集群的资源描述文件,包括 flink-configuration-configmap.yaml、jobmanager-service.yaml、jobmanager-deployment.yaml 和 taskmanager-deployment.yaml 等。
- Kubernets Master 会根据这些资源描述文件去创建对应的 Kubernetes 实体。以 JobManager 部署为例,Kubernetes 集群中的某个节点收到请求后,Kubelet 进程会从中央仓库下载 Flink 镜像,准备和挂载卷,然后执行启动命令。Pod 启动后 Flink Master(JobManager)进程随之启动,初始化 Dispacher 和 KubernetesResourceManager。并通过 K8s 服务对外暴露 FlinkMaster 的端口,K8s 服务类似于路由服务。
两个步骤完成之后,Session 模式的集群就创建成功,集群可以接收作业提交请求,但是此时还没有 JobManager、TaskManager,当作业需要执行时,才会按需创建。
作业提交:
- Client 用户可以通过 Flink 命令行(即 Flink Client)向这个会话模式的集群提交任务。此时 JobGraph 会在 FlinkClient 端生成,然后和用户 Jar 包一起通过 RestClient 上传。
- 作业提交成功,Dispatcher 会为每个作业启动一个 JobMaster,将 JobGraph 交给 JobMaster 调度执行。
两个步骤完成之后,作业进入调度执行阶段。
作业调度执行:
K8s Session 模式集群下,ResourceManager 向 k8sMaster 申请和释放 TaskManager,除此之外,作业的调度与执行和 Yarn 模式是一样的。
1)JobMaster 向 KubernetesResourceManager 请求 Slot。
2)KubernetesResourceManager 从 kubernetes 集群分配 TaskManager。每个 TaskManager 都是具有唯一标识的 Pod。KubernetesResourceManager 会为 TaskManager 生成一份新的配置文件,里面有 Flink Master 的 service name 作为地址。这样在 FLInkMaster failover 之后,TaskManager 仍然可以重新连上。
3)Kubernetes 集群分配一个新的 Pod 后,在上面启动 TaskManager。
4)TaskManager 启动后注册到 SlotManager。
5)SlotManager 向 TaskManager 请求 Slot.
6)TaskManager 提供 Slot 给 JobMaster,然后任务就会被分配到这个 Slot 上运行。
二、Graph 总览
- 流计算应用的 Graph 转换:StreamGraph-->JobGraph-->ExecutionGraph-->物理执行图(启动计算任务)
- 批处理应用的 Graph 转换:OptimizedPlan-->JobGraph
- Table & SQL API 的 Graph 转换:Blink Table Planner /Flink Table Planner。
2.1 流图
使用 DataStreamAPI 开发的应用程序,首先被转换为 Transformation,然后被映射为 StreamGraph。
2.1.1 SteramGraph 核心对象
- StreamNode
StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。
实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。 - StreamEdge
StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。
2.1.2 StreamGraph 生成过程
StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。
StreamGraph 实际上是在 StreamGraphGenerator 中生成的,从 SinkTransformation(输出) 向前追溯到 SourceTransformation。在遍历过程中一边遍历一边构建 StreamGraph。
在遍历 Transformation 的过程中,会对不同类型的 Transformation 分别进行转换。对于物理 Transformation 则转换为 StreamNode 实体,对于虚拟 Transformation 则作为虚拟 StreamNode。
针对于某一种类型的 Transformation,会调用其相应的 transformxxx()函数进行转换。transfromxxx()首先转换上游 Transformation 进行递归转换,确保上游的都已经完成了转换。然后通过 addOperator()方法构造出 StreamNode,通过 addEdge()方法与上游的 transform 进行连接,构造出 StreamEdge。
在添加 StreamEdge 的过程中,如果 ShuffleMode 为 null,则使用 ShuffleMode PIPELINED 模式,在流计算中,只有 PIPLINED 模式才会在批处理中设计其他模式。构建 StreamEdge 的时候,在转换 Transformation 过程中生成的 虚拟 StreamNode 会将虚拟 StreamNode 的信息附着在 StreamEdge 上