Spark TaskSchedulerImpl 任务调度方式(FIFO)

简介: Spark任务调度器,实现stage放到调度池中,和取时对调度池是队列进行排序的FIFO先进先出算法

Spark TaskSchedulerImpl 任务调度方式(FIFO)

更多资源

视频分享

图解

FIFO

TaskSchedulerImpl提交任务集

  • 在DAGScheduler.scal中件中的submitMissingTasks()方法中调用 taskScheduler.submitTasks
  • 把任务集通过任务调度器进行提交
 taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  • 任务调度器实现
override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }
  • 把任务集放到TaskSetManager(任务集管理器)中
  • TaskSetManager(任务集管理器)继承 Schedulable,(可调度元素,就是把到调度池队列中的一个元素,供调度使用)
val manager = createTaskSetManager(taskSet, maxTaskFailures)
  • 把任务集管理器增加到指定调度类型(FIFO,PAIR)的调度池中,也就是调度池中的调度队列中schedulableQueue
  • 此时,相当于需要调度的任务已有了,存放在调度池中,下面是用具体的调度算法,按指定的顺序调度池中的任务
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  • 任务调度器的submitTasks()方法中调用 backend.reviveOffers()方法,backend为SparkDeploySchedulerBackend,继承CoarseGrainedSchedulerBackend,所以调用的是CoarseGrainedSchedulerBackend中的reviveOffers()方法
 backend.reviveOffers()
  • 相当于是给Driver发送消息ReviveOffers
   override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }
  • driverEndpoint 中receive()方法处理消息,调用makeOffers()方法
     case ReviveOffers =>
        makeOffers()
  • scheduler.resourceOffers(workOffers)会计算出需要启动的任务序列
  • resourceOffers()方法中调用方法得到调度任务的队列(按指定顺序的) rootPool.getSortedTaskSetQueue()
  • launchTasks()方法把启动任务消息发送给executor
   // Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toIndexedSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }
  • 按指定的调度算法,对调度池中的调度任务进行排序
  • 返回排序后调度队列
  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }

FIFO调度算法的实现

  • 默认的调度算法FIFO
  • 按作业id进行比较,id小的放在前,也就是先进来的作业先处理
  • 如果作业id相同,就按stageId比较,StageId小的放在前,也就是从第一个Stage依次开始排列

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

定时任务处理调度池中的任务

  • DriverEndpoint 的 onStart()方法中会每秒调用一次处理调度池中调度任务的方法
  • 通过发送Driver消息ReviveOffers 来触发
   override def onStart() {
      // Periodically revive offers to allow delay scheduling to work
      val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

      reviveThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          Option(self).foreach(_.send(ReviveOffers))
        }
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }
相关文章
|
6月前
|
存储 分布式计算 调度
Spark任务调度与数据本地性
Spark任务调度与数据本地性
|
分布式计算 资源调度 Kubernetes
[翻译]Spark on MR3——运行 Apache Spark 的新方式
> 此文是对 Spark on MR3 资料的翻译 原文链接:https://www.datamonad.com/post/2021-08-18-spark-mr3/ 代码链接:https://github.com/mr3project/spark-mr3 MR3 是一个通用的执行引擎,原生支持 Hadoop 和 Kubernetes。虽然 Hive on MR3 是主要应用,但 MR3 也可以
559 0
[翻译]Spark on MR3——运行 Apache Spark 的新方式
|
分布式计算 数据挖掘 大数据
Spark 入门_代码编写方式|学习笔记
快速学习 Spark 入门_代码编写方式
Spark 入门_代码编写方式|学习笔记
|
分布式计算 算法 调度
spark2.2以后版本任务调度将增加黑名单机制
spark2.2以后版本任务调度将增加黑名单机制
330 0
|
SQL 分布式计算 Scala
Pandas vs Spark:获取指定列的N种方式
本篇继续Pandas与Spark常用操作对比系列,针对常用到的获取指定列的多种实现做以对比。 注:此处的Pandas特指DataFrame数据结构,Spark特指spark.sql下的DataFrame数据结构。
619 0
Pandas vs Spark:获取指定列的N种方式
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
897 0
|
分布式计算 算法 搜索推荐
Spark排序算法系列之GBTs使用方式介绍
在本篇文章中你可以学到: Spark MLLib包中的GBDT使用方式 模型的通过保存、加载、预测 PipeLine ML包中的GBDT
|
分布式计算 调度 Spark
Spark TaskSchedulerImpl TaskSet处理
Spark 任务调度调度器,实现将任务集拆分成任务发送给executor启动
1905 0