《循序渐进学Spark》一3.2 Spark调度机制-阿里云开发者社区

开发者社区> 华章出版社> 正文

《循序渐进学Spark》一3.2 Spark调度机制

简介:

本节书摘来自华章出版社《循序渐进学Spark》一书中的第3章,第3.2节,作者 小象学院 杨 磊,更多章节内容可以访问云栖社区“华章计算机”公众号查看。



3.2 Spark调度机制

Spark调度机制是保证Spark应用高效执行的关键。本节从Application、job、stage和task的维度,从上层到底层来一步一步揭示Spark的调度策略。

3.2.1 Application的调度

Spark中,每个Application对应一个SparkContext。SparkContext之间的调度关系取决于Spark的运行模式。对Standalone模式而言,Spark Master节点先计算集群内的计算资源能否满足等待队列中的应用对内存和CPU资源的需求,如果可以,则Master创建Spark Driver,启动应用的执行。宏观上来讲,这种对应用的调度类似于FIFO策略。在Mesos和YARN模式下,底层的资源调度系统的调度策略都是由Mesos和YARN决定的。具体分类描述如下:

1. Standalone模式

默认以用户提交Application的顺序来调度,即FIFO策略。每个应用执行时独占所有资源。如果有多个用户要共享集群资源,则可以使用参数spark.cores.max来配置应用在集群中可以使用的最大CPU核数。如果不配置,则采用默认参数spark.deploy.defaultCore的值来确定。

2. Mesos模式

如果在Mesos上运行Spark,用户想要静态配置资源的话,可以设置spark.mesos.coarse为true,这样Mesos变为粗粒度调度模式,然后可以设置spark.cores.max指定集群中可以使用的最大核数,与上面的Standalone模式类似。同时,在Mesos模式下,用户还可以设置参数spark.executor.memory来配置每个executor的内存使用量。如果想使Mesos在细粒度模式下运行,可以通过mesos://<url-info>设置动态共享cpu core的执行模式。在这种模式下,应用不执行时的空闲CPU资源得以被其他用户使用,提升了CPU使用率。

3. YARN模式

如果在YARN上运行Spark,用户可以在YARN的客户端上设置--num-executors 来控制为应用分配的Executor数量,然后设置--executor-memory指定每个Executor的内存大小,设置--executor-cores指定Executor占用的CPU核数。

3.2.2 job的调度

前面章节提到过,Spark应用程序实际上是一系列对RDD的操作,这些操作直至遇见Action算子,才触发Job的提交。事实上,在底层实现中,Action算子最后调用了runJob函数提交Job给Spark。其他的操作只是生成对应的RDD关系链。如在RDD.scala程序文件中,count函数源码所示。

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

其中sc为SparkContext的对象。可见在Spark中,对Job的提交都是在Action算子中隐式完成的,并不需要用户显式地提交作业。在SparkContext中Job提交的实现中,最后会调用DAGScheduler中的Job提交接口。DAGScheduler最重要的任务之一就是计算Job与Task的依赖关系,制定调度逻辑。

Job调度的基本工作流程如图3-4所示,每个Job从提交到完成,都要经历一系列步骤,拆分成以Tsk为最小单位,按照一定逻辑依赖关系的执行序列。

38fb6d701b77e781861612fae464409e38c12fb8

图3-4 Job的调度流程

图3-5则从Job调度流程中的细节模块出发,揭示了工作流程与对应模块之间的关系。从整体上描述了各个类在Job调度流程中的交互关系。

b14778293256dd10d4021e84b4bfe38ceb678911

图3-5 Job调度流程细节

在Spark1.5.0的调度目录下的SchedulingAlgorithm.scala文件中,描述了Spark对Job的调度模式。

1. FIFO模式

默认情况下,Spark对Job以FIFO(先进先出)的模式进行调度。在SchedulingAlgorithm.scala文件中声明了FIFO算法实现。


3. 配置调度池

DAGScheduler构建了具有依赖关系的任务集。TaskScheduler负责提供任务给Task-SetManager作为调度的先决条件。TaskSetManager负责具体任务集内部的调度任务。调度池(pool)则用于调度每个SparkContext运行时并存的多个互相独立无依赖关系的任务集。调度池负责管理下一级的调度池和TaskSetManager对象。

用户可以通过配置文件定义调度池的属性。一般调度池支持如下3个参数:

1)调度模式Scheduling mode:用户可以设置FIFO或者FAIR调度方式。

2)weight:调度池的权重,在获取集群资源上权重高的可以获取多个资源。

3)miniShare:代表计算资源中的CPU核数。

用户可以通过conf/fairscheduler.xml配置调度池的属性,同时要在SparkConf对象中配置属性。

3.2.3 stage(调度阶段)和TasksetManager的调度

1. Stage划分

当一个Job被提交后,DAGScheduler会从RDD依赖链的末端触发,遍历整个RDD依赖链,划分Stage(调度阶段)。划分依据主要基于ShuffleDependency依赖关系。换句话说,当某RDD在计算中需要将数据进行Shuffle操作时,这个包含Shuffle操作的RDD将会被用来作为输入信息,构成一个新的Stage。以这个基准作为划分Stage,可以保证存在依赖关系的数据按照正确数据得到处理和运算。在Spark1.5.0的源代码中,DAGScheduler.scala中的getParentStages函数的实现从一定角度揭示了Stage的划分逻辑。


2. Stage调度

在第一步的Stage划分过程中,会产生一个或者多个互相关联的Stage。其中,真正执行Action算子的RDD所在的Stage被称为Final Stage。DAGScheduler会从这个final stage生成作业实例。

在Stage提交时,DAGScheduler首先会判断该Stage的父Stage的执行结果是否可用。如果所有父Stage的执行结果都可用,则提交该Stage。如果有任意一个父Stage的结果不可用,则尝试迭代提交该父Stage。所有结果不可用的Stage都将会被加入waiting队列,等待执行,如图3-6所示。


b14778293256dd10d4021e84b4bfe38ceb678911

图3-6 Stage依赖

在图3-6中,虚箭头表示依赖关系。Stage序号越小,表示Stage越靠近上游。

图3-6中的Stage调度运行顺序如图3-7所示。

118d344241fed943191b39e29d8bf3776cf47d19

图3-7 Stage执行顺序

从图3-7可以看出,上游父Stage先得到执行,waiting queue中的stage随后得到执行。

3. TasksetManager

每个Stage的提交会被转化为一组task的提交。DAGScheduler最终通过调用taskscheduler的接口来提交这组任务。在taskScheduler内部实现中创建了taskSetManager实例来管理任务集taskSet的生命周期。事实上可以说每个stage对应一个tasksetmanager。

至此,DAGScheduler的工作基本完毕。taskScheduler在得到集群计算资源时,taskSet-Manager会分配task到具体worker节点上执行。在Spark1.5.0的taskSchedulerImpl.scala文件中,提交task的函数实现如下:


当taskSetManager进入到调度池中时,会依据job id对taskSetManager排序,总体上先进入的taskSetManager先得到调度。对于同一job内的taskSetManager而言,job id较小的先得到调度。如果有的taskSetManager父Stage还未执行完,则该taskSet-Manager不会被放到调度池。

3.2.4 task的调度

在DAGScheduler.scala中,定义了函数submitMissingTasks,读者阅读完整实现,从中可以看到task的调度方式。限于篇幅,以下截取部分代码。





版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:

华章出版社

官方博客
官网链接