【Flink】Flink作业调度流程分析

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 当向Flink集群提交用户作业时,从用户角度看,只需要作业处理逻辑正确,输出正确的结果即可;而不用关心作业何时被调度的,作业申请的资源又是如何被分配的以及作业何时会结束;但是了解作业在运行时的具体行为对于我们深入了解Flink原理有非常大的帮助,并且对我们如何编写更合理的作业逻辑有指导意义,因此本文详细分析作业的调度及资源分配以及作业的生命周期。

1. 概述


当向Flink集群提交用户作业时,从用户角度看,只需要作业处理逻辑正确,输出正确的结果即可;而不用关心作业何时被调度的,作业申请的资源又是如何被分配的以及作业何时会结束;但是了解作业在运行时的具体行为对于我们深入了解Flink原理有非常大的帮助,并且对我们如何编写更合理的作业逻辑有指导意义,因此本文详细分析作业的调度及资源分配以及作业的生命周期。


2. 流程分析


基于社区master主线(1.11-SNAPSHOT),commit: 12f7873db54cfbc5bf853d66ccd4093f9b749c9a ,HA基于ZK实现分析

网络异常,图片无法展示
|

上图概括了Flink作业从Client端提交到到Flink集群的提交的基本流程[1]。

当运行./flink run脚本提交用户作业至Dispathcer后,Dispatcher会拉起JobManagerRunner,而后JobManagerRunner会向Zookeeper注册竞争Leader。对于之前流程有兴趣可以参考 深入理解Flink-On-Yarn模式

JobManagerRunner竞争成为Leader时,会调用JobManagerRunnerImpl#grantLeadership,此时便开始处理作业,会通过如下的代码调用路径启动JobMaster。

  • JobManagerRunnerImpl#grantLeadership
  • JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobManager
  • JobManagerRunnerImpl#startJobMaster。
    startJobMaster方法会首先将该作业的ID写入对应的ZK目录并置为RUNNING状态,写入该目录可用于在Dispathcer接收作业时,判断该作业是否重复提交或恢复作业时使用;在JobManagerRunner调度作业时也在从ZK上拉取作业信息来判断作业状态,若为DONE状态,则无需调度。启动JobMaster时会先启动其RPC Endpoint,以便与其他组件进行RPC调用,之后JobMaster便通过JobMaster#startJobExecution开始执行作业,执行作业前会有些前置校验,如必须确保运行在主线程中;启动JobMaster上的一些服务(组件),如TaskManager和ResourceManager的心跳管理;启动SlotPool、Scheduler;重连至ResourceManager,并且在ZK中注册监听ResourceManager Leader的变化的Retriever等。
    当初始化完JobMaster上相应服务(组件)后,便开始调度,会有如下代码调用路径
  • JobMaster#start
  • JobMaster#startJobExecution
  • JobMaster#resetAndStartScheduler
  • JobMaster#startScheduling
  • SchedulerBase#startScheduling。

我们知道用户编写的作业是以JobGraph提交到Dispatcher,但是在实际调度时会将JobGraph转化为ExecutionGraph,JobGraph生成ExecutionGraph是在SchedulerBase对象初始化的时候完成转化,如下图所示表示了典型的转化过程(JobVertex与ExecutionJobVertex一一对应),而具体的转化逻辑实现可参考如何生成ExecutionGraph及物理执行图

1.png

在SchedulerBase初始化时生成ExecutionGraph后,之后便基于ExecutionGraph调度,而调度基类SchedulerBase默认实现为DefaultScheduler,会继续通过DefaultScheduler#startSchedulingInternal调度作业,此时会将作业(ExecutionGraph)的状态从CREATED状态变更为RUNNING状态,此时在Flink web界面查看任务的状态便已经为RUNNING,但注意此时作业(各顶点)实际并未开始调度,顶点还是处于CREATED状态,任作业状态与顶点状态不完全相关联,有其各自的演化生命周期,具体可参考Flink作业调度[2];然后根据不同的策略EagerSchedulingStrategy(主要用于流式作业,所有顶点(ExecutionVertex)同时开始调度)和LazyFromSourcesSchedulingStrategy(主要用于批作业,从Source开始开始调度,其他顶点延迟调度)调度。

当提交流式作业时,会有如下代码调用路径:

  • EagerSchedulingStrategy#startScheduling
  • EagerSchedulingStrategy#allocateSlotsAndDeploy,在部署之前会根据待部署的ExecutionVertex生成对应的ExecutionVertexDeploymentOption,然后调用DefaultScheduler#allocateSlotsAndDeploy开始部署。同样,在部署之前也需要进行一些前置校验(ExecutionVertex对应的Execution的状态必须为CREATED),接着将待部署的ExecutionVertex对应的Execution状态变更为SCHEDULED,然后开始为ExecutionVertex分配Slot。会有如下的调用代码路径:
  • DefaultScheduler#allocateSlots(该过程会ExecutionVertex转化为ExecutionVertexSchedulingRequirements,会封装包含一些location信息、sharing信息、资源信息等)
  • DefaultExecutionSlotAllocator#allocateSlotsFor,该方法会开始逐一异步部署各ExecutionVertex,部署也是根据不同的Slot提供策略来分配,接着会经过如下代码调用路径层层转发,SlotProviderStrategy#allocateSlot -> SlotProvider#allocateSlot(SlotProvider默认实现为SchedulerImpl) -> SchedulerImpl#allocateSlotInternal -> SchedulerImpl#internalAllocateSlot(该方法会根据vertex是否共享slot来分配singleSlot/SharedSlot),以singleSlot为例说明。
    在分配slot时,首先会在JobMaster中SlotPool中进行分配,具体是先SlotPool中获取所有slot,然后尝试选择一个最合适的slot进行分配,这里的选择有两种策略,即按照位置优先和按照之前已分配的slot优先;若从SlotPool无法分配,则通过RPC请求向ResourceManager请求slot,若此时并未连接上ResourceManager,则会将请求缓存起来,待连接上ResourceManager后再申请。

当ResourceManager收到申请slot请求时,若发现该JobManager未注册,则直接抛出异常;否则将请求转发给SlotManager处理,SlotManager中维护了集群所有空闲的slot(TaskManager会向ResourceManager上报自己的信息,在ResourceManager中由SlotManager保存Slot和TaskManager对应关系),并从其中找出符合条件的slot,然后向TaskManager发送RPC请求申请对应的slot。

等待所有的slot申请完成后,然后会将ExecutionVertex对应的Execution分配给对应的Slot,即从Slot中分配对应的资源给Execution,完成分配后可开始部署作业。

部署作业代码调用路径如下:

  • DefaultScheduler#waitForAllSlotsAndDeploy
  • DefaultScheduler#deployAll
  • DefaultScheduler#deployOrHandleError
  • DefaultScheduler#deployTaskSafe
  • DefaultExecutionVertexOperations#deploy
  • ExecutionVertex#deploy
  • Execution#deploy(每次调度ExecutionVertex,都会有一个Execute,在此阶段会将Execution的状态变更为DEPLOYING状态,并且为该ExecutionVertex生成对应的部署描述信息,然后从对应的slot中获取对应的TaskManagerGateway,以便向对应的TaskManager提交Task)
  • RpcTaskManagerGateway#submitTask(此时便将Task通过RPC提交给了TaskManager)。

TaskManager(TaskExecutor)在接收到提交Task的请求后,会经过一些初始化(如从BlobServer拉取文件,反序列化作业和Task信息、LibaryCacheManager等),然后这些初始化的信息会用于生成Task(Runnable对象),然后启动该Task,其代码调用路径如下 Task#startTaskThread(启动Task线程)-> Task#run(将ExecutionVertex状态变更为RUNNING状态,此时在FLINK web前台查看顶点状态会变更为RUNNING状态,另外还会生成了一个AbstractInvokable对象,该对象是FLINK衔接执行用户代码的关键,而后会经过如下调用

  • AbstractInvokable#invoke(AbstractInvokable有几个关键的子类实现, BatchTask/BoundedStreamTask/DataSinkTask/DataSourceTask/StreamTask/SourceStreamTask。对于streaming类型的Source,会调用StreamTask#invoke)
  • StreamTask#invoke
  • StreamTask#beforeInvoke
  • StreamTask#initializeStateAndOpen(初始化状态和进行初始化,这里会调用用户的open方法(如自定义实现的source))-> StreamTask#runMailboxLoop,便开始处理Source端消费的数据,并流入下游算子处理。

至此作业从提交到资源分配及调度运行整体流程就已经分析完毕,对于流式作业而言,正常情况下其会一直运行,不会结束。


3. 总结


对于作业的运行,会先提交至Dispatcher,由Dispatcher拉起JobManagerRunner,在JobManagerRunner成为Leader后,便开始处理作业,首先会根据JobGraph生成对应的ExecutionGraph,然后开始调度,作业的状态首先会变更为RUNNING,然后对各ExecutionVertex申请slot,申请slot会涉及JM与RM、TM之间的通信,当在TM上分配完slot后,便可将Task提交至TaskManager,然后TaskManager会为每个提交的Task生成一个单独的线程处理。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
320 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
149 11
|
11月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
207 5
|
11月前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
425 0
|
10月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
947 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
10月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
11月前
|
消息中间件 分布式计算 大数据
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
611 0
|
11月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
227 0
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何从savepoint重新启动作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
285 0