Spark 源码分析 -- Stage

简介:

理解stage, 关键就是理解Narrow DependencyWide Dependency, 可能还是觉得比较难理解 
关键在于是否需要shuffle, 不需要shuffle是可以随意并发的, 所以stage的边界就是需要shuffle的地方, 如下图很清楚

image

并且Stage分为两种,

shuffle map stage, in which case its tasks' results are input for another stage 
其实就是,非最终stage, 后面还有其他的stage, 所以它的输出一定是需要shuffle并作为后续的输入 
result stage, in which case its tasks directly compute the action that initiated a job (e.g. count(), save(), etc) 
最终的stage, 没有输出, 而是直接产生结果或存储

 

1 stage class

这个注释写的很清楚 
可以看到stage的RDD参数只有一个RDD, final RDD, 而不是一系列的RDD 
因为在一个stage中的所有RDD都是map, partition不会有任何改变, 只是在data依次执行不同的map function 
所以对于task scheduler而言, 一个RDD的状况就可以代表这个stage

/**
 * A stage is a set of independent tasks all computing the same function that need to run as part
 * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
 * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
 * DAGScheduler runs these stages in topological order.
 *
 * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
 * another stage, or a result stage, in which case its tasks directly compute the action that
 * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
 * that each output partition is on.
 *
 * Each Stage also has a jobId, identifying the job that first submitted the stage.  When FIFO
 * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
 * faster on failure.
 */
private[spark] class Stage(
    val id: Int,
    val rdd: RDD[_], // final RDD
    val shuffleDep: Option[ShuffleDependency[_,_]],  // Output shuffle if stage is a map stage
    val parents: List[Stage], // 父stage
    val jobId: Int,
    callSite: Option[String])
  extends Logging {

  val isShuffleMap = shuffleDep != None  // 是否是shuffle map stage, 取决于是否有shuffleDep 
  val numPartitions = rdd.partitions.size
  val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) // 用于buffer每个shuffle中每个maptask的MapStatus
  var numAvailableOutputs = 0

  private var nextAttemptId = 0
 
  def isAvailable: Boolean = {
    if (!isShuffleMap) {
      true
    } else {
      numAvailableOutputs == numPartitions
    }
  }
}

 

2 newStage

如果是shuffle map stage, 需要在这里向mapOutputTracker注册shuffle

  /**
   * Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or
   * as a result stage for the final RDD used directly in an action. The stage will also be
   * associated with the provided jobId.
   */
  private def newStage(
      rdd: RDD[_],
      shuffleDep: Option[ShuffleDependency[_,_]],
      jobId: Int,
      callSite: Option[String] = None)
    : Stage =
  {
    if (shuffleDep != None) {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
      mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
    }
    val id = nextStageId.getAndIncrement()
    val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
    stageIdToStage(id) = stage
    stageToInfos(stage) = StageInfo(stage)
    stage
  }

3 getMissingParentStages

可以根据final stage的deps找出所有的parent stage

  private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        if (getCacheLocs(rdd).contains(Nil)) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_,_] => // 如果发现ShuffleDependency, 说明遇到新的stage
                val mapStage = getShuffleMapStage(shufDep, stage.jobId) // check shuffleToMapStage, 如果该stage已经被创建则直接返回, 否则newStage
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] => // 对于NarrowDependency, 说明仍然在这个stage中
                visit(narrowDep.rdd)
            }
          }
        }
      }
    }
    visit(stage.rdd)
    missing.toList
  }

本文章摘自博客园,原文发布日期:2013-12-26
目录
相关文章
|
5月前
|
SQL 分布式计算 HIVE
[已解决]Job failed with org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in st
[已解决]Job failed with org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in st
98 0
|
存储 分布式计算 大数据
Spark 原理_运行过程_stage 和 task 的关系 | 学习笔记
快速学习 Spark 原理_运行过程_stage 和 task 的关系
164 0
Spark 原理_运行过程_stage 和 task 的关系 | 学习笔记
|
分布式计算 大数据 调度
Spark 原理_运行过程_Job 和 Stage 的关系 | 学习笔记
快速学习 Spark 原理_运行过程_Job 和 Stage 的关系
111 0
Spark 原理_运行过程_Job 和 Stage 的关系 | 学习笔记
|
分布式计算 大数据 Spark
Spark 原理_物理图_Stage 划分 | 学习笔记
快速学习 Spark 原理_物理图_Stage 划分
86 0
Spark 原理_物理图_Stage 划分 | 学习笔记
|
分布式计算 调度 Spark
Spark作业调度中stage的划分
Spark在接收到提交的作业后,会进行RDD依赖分析并划分成多个stage,以stage为单位生成taskset并提交调度。
Spark作业调度中stage的划分
|
机器学习/深度学习 分布式计算 算法
Apache Spark 将支持 Stage 级别的资源控制和调度
我们需要对不同 Stage 设置不同的资源。但是目前的 Spark 不支持这种细粒度的资源配置,导致我们不得不在作业启动的时候设置大量的资源,从而导致资源可能浪费,特别是在机器学习的场景下。
Apache Spark 将支持 Stage 级别的资源控制和调度
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
1966 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
902 0
|
分布式计算 Spark Hadoop
Spark2.4.0源码分析之WorldCount Stage提交(DAGScheduler)(六)
- 理解ShuffuleMapStage是如何转化为ShuffleMapTask并作为TaskSet提交 - 理解ResultStage是如何转化为ResultTask并作为TaskSet提交
1145 0
|
分布式计算 Apache Spark
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)
理解FinalStage是如何按stage从前到后依次提交顺序
2172 0