Spark学习---day06、Spark内核(源码提交流程、任务执行)

简介: Spark学习---day06、Spark内核(源码提交流程、任务执行)

这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;

但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;

最后想说一句君子不隐其短,不知则问,不能则学。

如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)

org.apache.spark.deploy.SparkSubmit

前言

Spark内核泛指Spark的核心运行机制,包括Spark核心组件的运行机制、Spark任务调度机制、Spark内存管理机制、Spark核心功能的运行原理等,熟练掌握Spark内核原理,能够帮助我们更好地完成Spark代码设计,并能够帮助我们准确锁定项目运行过程中出现的问题的症结所在。

Driver

Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。Driver在Spark作业执行时主要负责:

1) 将用户程序转化为作业(Job);

2) 在Executor之间调度任务(Task);

3) 跟踪Executor的执行情况;

4) 通过UI展示查询运行情况;

2 Executor

Spark Executor对象是负责在Spark作业中运行具体任务,任务彼此之间相互独立。Spark 应用启动时,ExecutorBackend节点被同时启动,并且始终伴随着整个Spark应用的生命周期而存在。如果有ExecutorBackend节点发生了故障或崩溃,Spark应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。

Executor有两个核心功能:

1) 负责运行组成Spark应用的任务,并将结果返回给驱动器(Driver);

2) 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

 Spark通用运行流程概述

image.png

上图为Spark通用运行流程体现了基本的Spark应用程序在部署中的基本提交流程。

这个流程是按照如下的核心步骤进行工作的:

1) 任务提交后,都会先启动Driver程序;

2) 随后Driver向集群管理器注册应用程序;

3) 之后集群管理器根据此任务的配置文件分配Executor并启动;

4) Driver开始执行main函数,Spark查询为懒执行,当执行到Action算子时开始反向推算,根据宽依赖进行Stage的划分,随后每一个Stage对应一个Taskset,Taskset中有多个Task查找可用资源Executor进行调度

根据本地化原则,Task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况。

1.1 Spark提交流程(YarnCluster)

image.png

 YARN模式运行机制

YARN Cluster模式

1) 执行脚本提交任务,实际是启动一个SparkSubmit的JVM进程;

2) SparkSubmit类中的main方法反射调用YarnClusterApplication的main方法;

3) YarnClusterApplication创建Yarn客户端,然后向Yarn服务器发送执行指令:bin/java ApplicationMaster

4) Yarn框架收到指令后会在指定的NM中启动ApplicationMaster;

5) ApplicationMaster启动Driver线程,执行用户的作业;

6) AM向RM注册,申请资源;

7) 获取资源后AM向NM发送指令:bin/java YarnCoarseGrainedExecutorBackend

8) CoarseGrainedExecutorBackend进程会接收消息,跟Driver通信,注册已经启动的Executor;然后启动计算对象Executor等待接收任务

9) Driver线程继续执行完成作业的调度和任务的执行。

10) Driver分配任务并监控任务的执行。

注意:SparkSubmit、ApplicationMasterCoarseGrainedExecutorBackend是独立的进程;Driver是独立的线程;ExecutorYarnClusterApplication是对象。

 讲解了Spark YARN-Cluster模式下的任务提交流程,但是我们并没有具体说明Driver的工作流程, Driver线程主要是初始化SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。

当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,Executor进程起来后,会向Driver反向注册,注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。

YARN Client模式

1) 执行脚本提交任务,实际是启动一个SparkSubmit的JVM进程;

2) SparkSubmit类中的main方法反射调用用户代码的main方法;

3) 启动Driver线程,执行用户的作业,并创建ScheduleBackend;

4) YarnClientSchedulerBackend向RM发送指令:bin/java ExecutorLauncher;

5) Yarn框架收到指令后会在指定的NM中启动ExecutorLauncher(实际上还是调用ApplicationMaster的main方法);

object ExecutorLauncher {

 

 def main(args: Array[String]): Unit = {

   ApplicationMaster.main(args)

 }

 

}

6) AM向RM注册,申请资源;

7) 获取资源后AM向NM发送指令:bin/java CoarseGrainedExecutorBackend

8) CoarseGrainedExecutorBackend进程会接收消息,跟Driver通信,注册已经启动的Executor;然后启动计算对象Executor等待接收任务

9) Driver分配任务并监控任务的执行。

注意:SparkSubmit、ApplicationMasterYarnCoarseGrainedExecutorBackend是独立的进程;Executor和Driver是对象。

image.png

Spark通讯架构

image.png

RpcEndpoint:RPC通信终端。Spark针对每个节点(Client/Master/Worker)都称之为一个RPC终,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher在Spark中,所有的终端都存在生命周期:

 Constructor

 onStart

 receive*

 onStop

 RpcEnv:RPC上下文环境,每个RPC运行时依赖的上下文环境称为RpcEnv;在把当前Spark版本中使用的NettyRpcEnv

Dispatcher:消息调度(分发器,针对于RPC需要发送远程消息或者从远程RPC接收到的消息,分发至对应的指令收件箱发件箱。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;

 Inbox:指令消息收件箱。一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;

Ø RpcEndpointRefRpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。

Ø OutBox:指令消息发件箱。对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;

RpcAddress表示远程的RpcEndpointRef的地址,Host + Port。

TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;

 TransportServer:Netty通信服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱;

Spark任务划分

image.png


一个Spark应用程序包括Job、Stage以及Task三个概念:

1) Job是以Action方法为界,遇到一个Action方法则触发一个Job;

2) Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;

Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。

Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度,总体调度流程如下图所示:

image.png

Spark RDD通过其Transactions操作,形成了RDD血缘依赖关系图,即DAG,最后通过Action的调用,触发Job并调度执行执行过程中会创建两个调度器:DAGSchedulerTaskScheduler

 DAGScheduler负责Stage级的调度,主要是将job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。

Ø TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统。

image.png

Driver初始化SparkContext过程中,会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend以及HeartbeatReceiver。SchedulerBackend通过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活状况,并通知到TaskScheduler。

Spark Stage级调度

Spark的任务调度是从DAG切割开始,主要是由DAGScheduler来完成。当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来提交,下图是涉及到Job提交的相关方法调用流程图。

image.png

1) Job由最终的RDD和Action方法封装而成

SparkContext将Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD不断通过依赖回溯判断父依赖是否是依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,可以进行pipeline式的计算。划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,另一类叫做ShuffleMapStage,为下游Stage准备数据,下面看一个简单的例子WordCount。

image.png

Job由saveAsTextFile触发,该Job由RDD-3和saveAsTextFile方法组成,根据RDD之间的依赖关系从RDD-3开始回溯搜索,直到没有依赖的RDD-0,在回溯搜索过程中,RDD-3依赖RDD-2,并且是宽依赖,所以在RDD-2和RDD-3之间划分Stage,RDD-3被划到最后一个Stage,即ResultStage中,RDD-2依赖RDD-1,RDD-1依赖RDD-0,这些依赖都是窄依赖,所以将RDD-0、RDD-1和RDD-2划分到同一个Stage,形成pipeline操作,。即ShuffleMapStage中,实际执行的时候,数据记录会一气呵成地执行RDD-0到RDD-2的转化。不难看出,其本质上是一个深度优先搜索Depth First Search算法。

一个Stage是否被提交,需要判断它的父Stage是否执行,只有在父Stage执行完毕才能提交当前Stage,如果一个Stage没有父Stage,那么从该Stage开始提交。Stage提交时会将Task信息(分区信息以及方法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另一方面TaskScheduler会监控Stage的运行状态,只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。

相对来说DAGScheduler做的事情较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂,下面详细阐述其细节。

4.3 Spark Task级调度

Spark Task的调度是由TaskScheduler来完成,由前文可知,DAGScheduler将Stage打包到交给TaskScheTaskSetduler,TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中,TaskSetManager结构如下图所示。

image.png

TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。

前面也提到,TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,所以说SchedulerBackend是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行,大致方法调用流程如下图所示

image.png

上图中,将TaskSetManager加入rootPool调度池中之后,调用SchedulerBackend的riviveOffers方法给driverEndpoint发送ReviveOffer消息driverEndpoint收到ReviveOffer消息后调用makeOffers方法,过滤出活跃状态的Executor(这些Executor都是任务启动时反向注册到Driver的Executor),然后将Executor封装成WorkerOffer对象;准备好计算资源(WorkerOffer)后,taskScheduler基于这些资源调用resourceOffer在Executor上分配task。

image.png


image.png image.png image.png image.png

相关文章
|
29天前
|
存储 缓存 分布式计算
Spark任务OOM问题如何解决?
大家好,我是V哥。在实际业务中,Spark任务常因数据量过大、资源分配不合理或代码瓶颈导致OOM(Out of Memory)。本文详细分析了各种业务场景下的OOM原因,并提供了优化方案,包括调整Executor内存和CPU资源、优化内存管理策略、数据切分及减少宽依赖等。通过综合运用这些方法,可有效解决Spark任务中的OOM问题。关注威哥爱编程,让编码更顺畅!
167 3
|
29天前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
45 5
|
29天前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
46 3
|
29天前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
54 0
|
3月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
38 5
|
22天前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
25 0
|
22天前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
11 0
|
29天前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
37 0
|
2月前
|
分布式计算 Shell Scala
学习使用Spark
学习使用Spark
97 3