Spark修炼之道(高级篇)——Spark源码阅读:第四节 Stage划分

简介: Stage划分在上一节中我们讲了 Spark Job的提交,在该讲中我们提到,当rdd触发action操作之后,会调用SparkContext的runJob方法,最后调用的DAGScheduler.handleJobSubmitted方法完成整个job的提交。然后DAGScheduler根据RDD的lineage进行Stage划分,再生成TaskSet,由TaskSche

Stage划分

在上一节中我们讲了 Spark Job的提交,在该讲中我们提到,当rdd触发action操作之后,会调用SparkContext的runJob方法,最后调用的DAGScheduler.handleJobSubmitted方法完成整个job的提交。然后DAGScheduler根据RDD的lineage进行Stage划分,再生成TaskSet,由TaskScheduler向集群申请资源,最终在Woker节点的Executor进程中执行Task。

今天我们先来看一下如何进行Stage划分。下图给出的是对应Spark应用程序代码生成的Stage。它的具体划分依据是根据RDD的依赖关系进行,在遇到宽依赖时将两个RDD划分为不同的Stage。

这里写图片描述

从上图中可以看到,RDD G与RDD F间的依赖是宽依赖,所以RDD F与 RDD G被划分为不同的Stage,而RDD G 与 RDD 间为窄依赖,因此 RDD B 与 RDD G被划分为同一个Stage。通过这种递归的调用方式,将所有RDD进行划分。


//DAGScheduler.handleJobSubmitted方法 
//参数finalRDD为触发action操作时最后一个RDD
private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    //finalStage表示最后一个Stage
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      //调用newResultStage创建Final Stage
      finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    //如果存在finalStage 
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions".format(
        job.jobId, callSite.shortForm, partitions.length))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val jobSubmissionTime = clock.getTimeMillis()
      jobIdToActiveJob(jobId) = job
      activeJobs += job
      finalStage.resultOfJob = Some(job)
      val stageIds = jobIdToStageIds(jobId).toArray
      val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
      //向LiveListenerBus post SparkListenerJobStart
      //listenerThread后台线程处理该事件
      listenerBus.post(
        SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
      //提交finalStage,该方法会提交所有关联的未提交的stage
      submitStage(finalStage)
    }
    //检查是否有等待或失败的Stage需要重新提交
    submitWaitingStages()
  }

这里重点关注newResultStage方法,具体代码如下

/**
   * Create a ResultStage associated with the provided jobId.
   */
  private def newResultStage(
      rdd: RDD[_],
      numTasks: Int,
      jobId: Int,
      callSite: CallSite): ResultStage = {
     //获取Parent Stages及ID
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    //创建ResultStage 
    val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)

    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

跳到getParentStagesAndId方法中可以看到如下代码:

 /**
   * Helper function to eliminate some code re-use when creating new stages.
   */
  private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    //根据宽依赖中的ShuffleDependency生成ParentStage
    val parentStages = getParentStages(rdd, firstJobId)
    //生成StageID
    val id = nextStageId.getAndIncrement()
    (parentStages, id)
  }

跳到getParentStages方法中,可以看到下面的代码:

/**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    //广度优先遍历方式
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) {
          dep match {
            //如何是ShuffleDependency,则生成一个新的stage
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, firstJobId)
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

再跳到getShuffleMapStage方法,可以看到如下代码:

/**
   * Get or create a shuffle map stage for the given shuffle dependency's map side.
   */
  private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      //如何存在,则不需要创建,原样返回
      case Some(stage) => stage
      //如果不存在,则创建
      case None =>
        // We are going to register ancestor shuffle dependencies
        //查看其依赖的父类Stage是否存在,没有的话创建
        registerShuffleDependencies(shuffleDep, firstJobId)
        // Then register current shuffleDep
        //创建当前shuffleDep RDD的Stage
        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
        shuffleToMapStage(shuffleDep.shuffleId) = stage

        stage
    }
  }

Stage整体划分的逻辑讲清楚了,在下一节中我们将介绍submitStage及submitWaitingStages方法进行Stage的提交。

目录
相关文章
|
5月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
43 0
|
5月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
87 0
|
5月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
271 0
|
5月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
114 0
|
5月前
|
分布式计算 大数据 Scala
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
98 1
|
2月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
|
5月前
|
SQL 分布式计算 HIVE
[已解决]Job failed with org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in st
[已解决]Job failed with org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in st
97 0
|
5月前
|
存储 Java 关系型数据库
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
52 1
【Kafka+Flume+Mysql+Spark】实现新闻话题实时统计分析系统(附源码)
|
5月前
|
机器学习/深度学习 分布式计算 搜索推荐
【大数据技术】Spark MLlib机器学习协同过滤电影推荐实战(附源码和数据集)
【大数据技术】Spark MLlib机器学习协同过滤电影推荐实战(附源码和数据集)
97 0
|
5月前
|
机器学习/深度学习 分布式计算 前端开发
【大数据技术】Spark MLlib机器学习线性回归、逻辑回归预测胃癌是否转移实战(附源码和数据集)
【大数据技术】Spark MLlib机器学习线性回归、逻辑回归预测胃癌是否转移实战(附源码和数据集)
36 0