Spark源码分析 – DAGScheduler-阿里云开发者社区

开发者社区> 寒凝雪> 正文

Spark源码分析 – DAGScheduler

简介:
+关注继续查看

DAGScheduler的架构其实非常简单,

1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event

2. eventLoop Thread, 会不断的从eventQueue中获取event并处理

3. 实现TaskSchedulerListener, 并注册到TaskScheduler中, 这样TaskScheduler可以随时调用TaskSchedulerListener中的接口报告状况变更 
TaskSchedulerListener的实现其实也就是post各种event到eventQueue

/**
 * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
 * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
 * minimal schedule to run the job. It then submits stages as TaskSets to an underlying
 * TaskScheduler implementation that runs them on the cluster.
 *
 * In addition to coming up with a DAG of stages, this class also determines the preferred
 * locations to run each task on, based on the current cache status, and passes these to the
 * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
 * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
 * not caused by shuffie file loss are handled by the TaskScheduler, which will retry each task
 * a small number of times before cancelling the whole stage.
 *
 * THREADING: This class runs all its logic in a single thread executing the run() method, to which
 * events are submitted using a synchonized queue (eventQueue). The public API methods, such as
 * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods
 * should be private.
 */
private[spark]
class DAGScheduler(
    taskSched: TaskScheduler, // 绑定的TaskScheduler
    mapOutputTracker: MapOutputTracker,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv)
  extends TaskSchedulerListener with Logging {

  def this(taskSched: TaskScheduler) {
    this(taskSched, SparkEnv.get.mapOutputTracker, SparkEnv.get.blockManager.master, SparkEnv.get)
  }
  // task需要将task执行的状况报告给DAGScheduler,所以需要把DAGScheduler作为listener加到TaskScheduler中
taskSched.setListener(this)
  // 并且实现各种TaskSchedulerListener的接口, 以便于TaskScheduler在状态发生变化时调用
  // Called by TaskScheduler to report task's starting.
  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
    eventQueue.put(BeginEvent(task, taskInfo))
  }
  //……省略其他的接口实现
 
  private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent] // DAGScheduler的核心event queue

  val nextJobId = new AtomicInteger(0)
  val nextStageId = new AtomicInteger(0)
  val stageIdToStage = new TimeStampedHashMap[Int, Stage]
  val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
  private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]

  private val listenerBus = new SparkListenerBus() //DAGScheduler本身也提供SparkListenerBus, 便于其他模块listen DAGScheduler

  // Contains the locations that each RDD's partitions are cached on
  private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
 
  // Start a thread to run the DAGScheduler event loop
  def start() {
    new Thread("DAGScheduler") { // 创建event处理线程
      setDaemon(true)
      override def run() {
        DAGScheduler.this.run()
      }
    }.start()
  }
 
  /**
   * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
   * events and responds by launching tasks. This runs in a dedicated thread and receives events
   * via the eventQueue.
   */
  private def run() {
    SparkEnv.set(env)

    while (true) {
      val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)
      if (event != null) {
        logDebug("Got event of type " + event.getClass.getName)
      }
      this.synchronized { // needed in case other threads makes calls into methods of this class
        if (event != null) {
          if (processEvent(event)) {
            return
          }
        }

        val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
        // Periodically resubmit failed stages if some map output fetches have failed and we have
        // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
        // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
        // the same time, so we want to make sure we've identified all the reduce tasks that depend
        // on the failed node.
        if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
          resubmitFailedStages()
        } else {
          submitWaitingStages()
        }
      }
    }
  }
 
  /**
   * Process one event retrieved from the event queue.
   * Returns true if we should stop the event loop.
   */
  private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
    event match {
      case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
        val jobId = nextJobId.getAndIncrement()  // 获取新的jobId, nextJobId是AtomicInteger
        val finalStage = newStage(finalRDD, None, jobId, Some(callSite)) // 用finalRDD创建finalStage,前面是否有其他的stage或RDD需要根据deps推断
        val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) // 用finalStage创建Job
        clearCacheLocs()
        if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
          // Compute very short actions like first() or take() with no parent stages locally.
          runLocally(job) // 对于简单的Job, 直接locally执行
        } else {
          listenerBus.post(SparkListenerJobStart(job, properties))
          idToActiveJob(jobId) = job
          activeJobs += job
          resultStageToJob(finalStage) = job
          submitStage(finalStage)
        }
      // 对于各种event的处理, 这里只看JobSubmitted, 其他的先省略
  }

 

1. dagScheduler.runJob

继续前面, 在SparkContext中调用runJob的结果就是调用dagScheduler.runJob 
而dagScheduler.runJob的工作, 就是把toSubmit event放到eventQueue中去, 并且wait这个Job结束, 很简单 
而PrepareJob的工作就是创建JobWaiter和JobSubmitted对象

  def runJob[T, U: ClassManifest](
      finalRdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: String,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties = null)
  {
    if (partitions.size == 0) {
      return
    }

    val (toSubmit: JobSubmitted, waiter: JobWaiter[_]) = prepareJob(
        finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    eventQueue.put(toSubmit)
    waiter.awaitResult() match {
      case JobSucceeded => {}
      case JobFailed(exception: Exception, _) =>
        logInfo("Failed to run " + callSite)
        throw exception
    }
  }

 

1.1 JobWaiter

JobWaiter比较简单, 首先实现JobListener的taskSucceeded和jobFailed函数, 当DAGScheduler收到tasksuccess或fail的event就会调用相应的函数 
在tasksuccess会判断当所有task都success时, 就表示jobFinished 
而awaitResult, 就是一直等待jobFinished被置位

private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Unit)
  extends JobListener {
  override def taskSucceeded(index: Int, result: Any) {
    synchronized {
      if (jobFinished) {
        throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
      }
      resultHandler(index, result.asInstanceOf[T]) // 使用resultHandler处理task result
      finishedTasks += 1
      if (finishedTasks == totalTasks) {
        jobFinished = true
        jobResult = JobSucceeded
        this.notifyAll()
      }
    }
  }

  override def jobFailed(exception: Exception) {……}

  def awaitResult(): JobResult = synchronized {
    while (!jobFinished) {
      this.wait()
    }
    return jobResult
  }
}

 

1.2 JobSubmitted

JobSubmitted只是DAGSchedulerEvent的一种, 典型的pattern matching的场景 
可以看到除了JobSubmitted还其他很多的DAGSchedulerEvent

private[spark] sealed trait DAGSchedulerEvent

private[spark] case class JobSubmitted(
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    allowLocal: Boolean,
    callSite: String,
    listener: JobListener,
    properties: Properties = null)
  extends DAGSchedulerEvent

private[spark] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent

private[spark] case class CompletionEvent(
    task: Task[_],
    reason: TaskEndReason,
    result: Any,
    accumUpdates: Map[Long, Any],
    taskInfo: TaskInfo,
    taskMetrics: TaskMetrics)
  extends DAGSchedulerEvent

private[spark] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent

private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent

private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent

private[spark] case object StopDAGScheduler extends DAGSchedulerEvent

 

2 processEvent.JobSubmitted

JobSubmit, 首先创建final stage, 然后submit final stage

stage相关操作参考, Spark 源码分析 -- Stage

2.1 submitStage

在submitStage, 首先会产生Stage的DAG, 然后按照先后顺序去提交每个stage的tasks

  /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    logDebug("submitStage(" + stage + ")")
    if (!waiting(stage) && !running(stage) && !failed(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id) // 根据final stage发现是否有parent stage
      logDebug("missing: " + missing)
      if (missing == Nil) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage) // 如果没有parent stage需要执行, 则直接submit当前stage
        running += stage
      } else {
        for (parent <- missing) {
          submitStage(parent) // 如果有parent stage,需要先submit parent, 因为stage之间需要顺序执行
        }
        waiting += stage // 当前stage放到waiting列表中
      }
    }
  }

 

2.2 submitMissingTasks

task相关参考 Spark 源码分析 -- Task

可见无论是哪种stage, 都是对于每个stage中的每个partitions创建task 
并最终封装成TaskSet, 将该stage提交给taskscheduler

/** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage) {
   // Get our pending tasks and remember them in our pendingTasks entry
    var tasks = ArrayBuffer[Task[_]]()
    if (stage.isShuffleMap) { // 对于ShuffleMap Stage
      for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
        val locs = getPreferredLocs(stage.rdd, p)
        tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
      }
    } else { // 对于Result Stage 
      // This is a final stage; figure out its job's missing partitions
      val job = resultStageToJob(stage)
      for (id <- 0 until job.numPartitions if !job.finished(id)) {
        val partition = job.partitions(id)
        val locs = getPreferredLocs(stage.rdd, partition)
        tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
      }
    }

    taskSched.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      if (!stage.submissionTime.isDefined) {
        stage.submissionTime = Some(System.currentTimeMillis())
      }
    } else {
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
      running -= stage
    }
  }

本文章摘自博客园,原文发布日期:2013-12-30

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Scheduled SQL: SLS 大规模日志上的全局分析与调度
本文总结了大规模日志全局分析的需求,讨论SLS上现有的典型分析方案,并延伸到 SLS 原生数据处理方案,介绍 Schedueld SQL 功能与最佳实践。
1041 0
基于 Scheduled SQL 对 VPC FlowLog 实现细粒度时间窗口分析
针对VPC FlowLog的五元组和捕获窗口信息,在分析时使用不同时间窗口精度,可能得到不一样的流量特征,本文介绍一种方法将原始采集日志的时间窗口做拆分,之后重新聚合为新的日志做分析,达到更细粒度的分析效果。
444 0
【译】深入分析Spark UDF的性能
这篇博客会阐述一份关于Apache Spark的在Scala UDF、 PySpark UDF 和PySpark Pandas UDF之间的性能评测报告。
1332 0
Kubernetes 1.8 kube-scheduler的源码分析
很长时间没有写文章,一直在啃kubernetes文档,本来立志一定要读完所有的文档。还有它的最佳实践openshift的文档。但目前为止,我并没有读完kubernetes的文档。当前,我们有需求需要客制化kubernetes的调度函数,所以开始研究kube-scheduler的代码。
2420 0
怎么设置阿里云服务器安全组?阿里云安全组规则详细解说
阿里云服务器安全组设置规则分享,阿里云服务器安全组如何放行端口设置教程
8374 0
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
1607 0
云HBase Spark分析引擎对接云数据库POLARDB
HBase Spark分析引擎是云数据库HBase版提供的分析引擎,基于Spark提供的复杂分析、流式处理、机器学习的能力。Spark分析引擎可以对接阿里云的各种数据源,例如:云HBase数据、MongoDB、Phoenix等,同时也支持对接POLARDB数据库。
1612 0
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
11882 0
+关注
5854
文章
223
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载