开发者学堂课程【开源 Flink 极客训练营:Flink Runtime Architecture】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/760/detail/13339
Flink Runtime Architecture
4、出错恢复
重启出错失败的任务以及可能受其影响的任务
-停止相关任务-FAILED/CANCELED
-重置任务状态-CREATED
-通知调度策略重新调度
由出错恢复策略(FailoverStrategy)决定需要重启的任务 -RestartPipelinedRegionFailoverStrategy
-RestartAllFailoverStrategy
-单点重启??
当任务出现错误的时候,Jobmaster 策略或者基本思路是通过重启出错、失败的任务,以及可能受到其影响的任务来恢复作业的数据处理。包含三个步骤,第一步是停止相关的任务,包括出错、失败、任务和可能受影响,失败任务可能已经是 failed ,其他影响任务会被 canceled ,最终进 canceled 的状态。接下来会把任务重治回created 状态,最后会通知调度策略来重新调度任务。需要重启可能受影响的任务是由出错恢复策略 FailoverStrategy 决定的。Flink 默认的 FailoverStrategy 是RestartPipelinedRegionFailover Strategy,采用策略后一个 task failed 会重启,region跟 pipeline的数据交换有关系,有 pipeline 数据交换的节点之间如果任意一个节点失败,相关联的其他节点也会跟着失败,主要为了防止出现数据的不一致,因此为了避免单个 task failed 导致多次 failover ,一般会在第一个 task failed 的时候,把其他的 task一并给 cancel掉来一并的进行重启。因为 RestartPipelinedRegion 策略除了重启失败任务所在的 region 之外,还会重启下游 region 。原因在于任务的产出很多时候是非确定性的,比如一条 record 这次分发到了下游的第一个并发,重跑一次可能会分发到下游的第二个并发,一旦两个下游在不同 region 中,可能会导致 record 重复或者丢失,产生数据的不一致,为了避免当任一个 region 发生 failed 时候,也会重启所有的下游,采用 PipelinedRegionfavorstrategy 的会重启失败任务所在 region 以及所有的下游 region,此外还有PipelinedRegion 策略会在任意 task failed 的时候重启作业中所有的任务,一般并不需要但是在一些特殊情况下,比如用户希望有任意任务失败的时候,不希望作业在局部的运行,而是整个任务结束等到全部都能启动起来之后再一块运行,只重启失败的任务本身及单点重启是肯定的,目前的话还有一些技术难点需要慢慢的去解决,比如接下来一到三个版本内,可能会推出 des effort的单点重启的策略,因为在这种策略下不保证数据的一致性,优势在于可以重启失败的任务本身,对作业吞吐的影响最小。
三、TaskExecutor 任务的运行存储
1、TaskExecutor-任务的运行容器
·TaskExecutor 的资源模型
· Process memory
· JVM memory Flink memory
Framework memory Task memory
·TaskSlot 对应 Task 资源的子集,目前为 TE 的资源按照 slot 数量进行均分
· Task Heap memory
.Task Off-heap memory
.Network memory
.Managed memory
·TaskSlot 是任务运行的逻辑容器,需要满足任务的资源需求
运行任务具有各种各样的资源主要看到的是 memory 的资源,所有的内存资源都可以进行单独配置。Taskmanager 也对配置进行分层的管理,最外层是 Process Memory 对应整个 Task scooter,JVM的总资源,内存又包含了 JVM 自身占用内存以及 Flink 占用内存, Flink 占用内存又包含了框架占用内存和任务占用内存,任务占用内存包括 Taskmemory 任务 Java 对象内存 ,Taskmemory 一般适用于 native 的第三方库,natural buffer memory 用来创建natural buffer服务于任务的输入和输出。Managed memory是受管控的 Off-Heap memory ,会被一些组件用到比如 rocks DB、个别 flash operator、slot、join等,task 的资源会被划分成一个一个的 slot, slot 是任务运行的逻辑容器。当前 slot 大小直接把整个 task slot 资源按照 slot 数量进行均分得到。
2、Task Slot 共享
同一个共享组(SlotSharingGroup)中的不同类型(JobVertex)的任务,可以同时在一个 slot 中运行
-降低数据交换开销
-方便用户配置资源
-均衡负载
Slot 是 task 的运行容器,一个 Slot 里可以运行一个到多个任务,有一定约束即同一个共享组中不同类型的人物才可以同在一个 Slot 中运行。同一个 region 中的任务都是在一个共享组,流失作业的所有任务也是在一个共享组之中,不同类型是会需要不同的 JobVertex。右边示例是 source、map、sink 的作业,经过部署之后有三个Slot 中有三个任务,分别是 source、map、sink 各一份,source 中有两个任务,因为 source 组有三个并发,没有更多并发补进来。进行SlotSharing 的好处是可以降低数据交换的开销, map 和 sink之间一对一的数据交换。实际上有物理的数据交换的节点都被共享在一块,可以使得它们数据交换在内存中进行,比如在网络中进行的开销更低。也方便用户配置资源,通过 SlotSharing 用户只需要配置 n 给 Slot 就可以保证作业总能跑起来,n 是最大算子的并发度,最后在各个算子的并发度差异不大的情况下,提高负载均衡,因为每个 Slot 会有不同类型的算子各一份,可以避免某些负载很重的算子全体在同一个 tasktracker 中,从而避免个别 tasktracker 的负载特别重。
3、任务执行模型
任务(Task)在一个独占的线程中执行
Task 从 InputGate 中读取数据,喂给 OperatorChain,Operator Chain 会将产出数据输出到 ResultPartition 中
Sourcetask 直接通过 SourceFunction 产出数据
每个任务对应的一个 OperatorChain , OperatorChain 都有自己的输入和输出,输入是 InputGate ,输出是 ResultPartition ,做一个任务总体会在独占的线程中执行,任务从 InputGate 中读取数据喂给 OperatorChain 进行业务逻辑的处理,最后会将产出的数据输出到 ResultPartition 。有一个例外是 Source task 不从 InputGate 读取数据直接通过 SourceFunction 产出数据,上游的 ResultPartition 和下游的InputGate 之间是通过 Flink 的 Shuffle Service 进行数据交换。Shuffle Service 是一个插件,Flink 的 ResultPartition 及下游的 InputGate 会通过 netty 从从上游的 ResultPartition 中获取数据。 ResultPartition 是一个一个的sub-partition 组成,对应着数据不同的并发下游消费者,InputGate 也是由一个一个 input-Channel 组成,每个不同的 input-Channel 都对应输入不同并发的上游。
四、ResourceManager 资源的管理中心
1、ResourceManager-资源的管理中心
· ResourceManager 管理 TaskExecutor
· SlotManager 管理 Slot
· 通过 TE-RM 心跳更新 Slot状态,心跳信息中包含TE中所有的slot状态
· Slot 申请流程
·JM->RM->TE->JM,以 slot offer 的结果为准
TaskExecutor 包含了各种各样的资源,ResourceManager 管理着 TaskExecutor ,新启动的 TaskExecutor需要向Resource Manager 进行注册之后,里的资源才能服务于作业的请求。ResourceManager 有个关键组件叫 SlotManager,管理着 slot的状态。 slot 状态是通过 TaskExecutor 到 ResourceManager之间的心跳来进行更新的,包含了 TaskExecutor 中所有 slot 的状态状态,有了所有的 slot 状态之后, ResourceManager 就可以服务作业的资源申请,当 JobMaster 调动一个任务的时候, 向ResourceManager 发起 Slot 请求,收到请求的ResourceManager 会转交给 SlotManager , SlotManager 回去检查可用的 Slot 有没有符合请求条件的。如果有就会向相应的TaskExecutor 发起 Slot 的申请,如果请求成功 TaskExecutor 会向主动向 JobMaster offer slot。绕一圈是因为避免分布式带来不一致的问题, SlotManager 中的 Slot 状态是通过心跳来进行更新的,所以存在一定的延迟,在整个 Slot 申请过程中,Slot 状态也可能发生变化,所以需要以 Slot offer 以及 ACK 来作为 Slot 申请的最终结果。
2、ResourseManager 的多种实现
·StandaloneResourceManager
·手动拉起 Worker
·YarnResourceManager
·MesosResourceManager
·KubernetesResourceManager
·Slot 申请过程中自动拉起 Worker
ResourceManager 有多种不同的实现,一般情况下直接跑在机器上,
从一个集群 ResourceManager,StandaloneResourceManager 需要用户手动的拉起 worker 节点,要求用户要了解作业会需要多少的总资源。除此之外还有一些会去自动生成资源的ResourceManager ,包括YarnResourceManager 、Mesos ResourceManager、KubernetesResourceManager,采用 ResourceManager 之后, ResourceManager 会在 Slot 的请求过程中,如果不能满足就会自动的去拉起 worker 节点,Yarn ResourceManager 叫 master 去为一个任务去请求 slot ,YarnResourceManager 将请求交给 ResourceManager, ResourceManager 发觉没有 Slot 申请会告知YarnResource Manager,YarnResourceManager 会像真正的外部的Yarn ResourceManager 去请求 container,拿到 container 之后会启动 TaskExecutor 起来之后会注册到 ResourceManager 中,并去告知可用的 slot 的信息, SlotManager 拿到信息之后会尝试去满足当前判定的KubernetesResourceManager 。如果能够满足会向TaskExecutor 发起 Slot 请求,请求成功 TaskExecutor 会向 JobMaster offer slot,用户不需要一开始去计算作业总的需要的资源量,只需要保证单个 Slot 的大小能够满足任务的执行就可以。