Spark DAG源码剖析_2 | 学习笔记

简介: 快速学习Spark DAG源码剖析_2

开发者学堂课程【大数据实时计算框架Spark快速入门:Spark DAG源码剖析_2】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/100/detail/1674


Spark DAG源码剖析_2


//First figure out the indexes of indexes of partition ids to compute.

Val partitionsToCompute:Seq[Int] = stage.findMissingpartitions()

Val taskIdToLocations:Map[Int,Seq[TaskLocation]] = try {

 Stage match {

Case s: ShuffleMapStage =>

 partitionsToCompute.map { id => (id,getPreferredLocs(stage.rdd,id))}.toMap

Case s: ResultStage =>

 Val job = s.activeJob.get

 partitionsToCompute.map ( id =>

   Val p = s.partitions(id)

   (id,getPreferredLocs(stage.rdd,p))

 }.toMap

}

}

rdd在上图Stage2中代表的是F表格

// If the partition is cached,return the cache locations

Val cached  “ getCacheLocs(rdd)(partition)

If (cached.nonEmpty) {

 Return cached

}

看rdd有没有自动返存,如果rdd和partition有自动返存直接返存到原来的位置,如果没有自动返存的话:

// If the RDD has some placement pceferences(as is the case for input RDDs),get those

Val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

If (rddPrefs.nonEmpty) {

 Return rddPrefs.map(TaskLocation(_))

}

·Get the array of partitions of this RDD,taking into account whether the

·RDD is checkpointed or not.

·/

Final def partitions:Array[Partition] = {

 checkpointRDD.map(_.partitions).getorElse {

  If (partitions_== null) {

Partitions_= getpartitions

  }

 Partitions_

}

}

Protected def getInputFormat(conf:Jobconf):InputFormat{K,V} = {

Val newInputFormat = Reflect:onutils,newInstance(inputFormatClass,asInstanceOf[class[_]],conf).asInstanceOf{InputFormat[K,V]]

newInputFormat match {

 Cass c:configurable => c.setconf(conf)

 Cass_=>

}

newInputFormat

}

Override def getPartitions:Array[Partition] = {

Val jobconf = getJobconf()

//add the credentials here as this can be called before SparkContext initiallzed

SparkHadooputil.get.addcredentials(jobconf)

Val inputFormat = getInputFormat(jobconf)

Val inputSplits = inputFormat.getSplits(jobconf,minPartitions)

Val array = new Array[Partition](inputSplits.size)

For(i <- 0 until inputSplits.size) {

Arry(i) = new  HadoopPartition(id,i,inputSplits(i))

}

Array

}

Def textFile

hadoopFile(path,classof[TextInputFormat],classof[LongWritable],classof[Text]

hadoopFile方法返回的是new HadoopRDD

相关文章
|
7月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
159 0
|
7月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
162 0
|
7月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
724 0
|
7月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
279 0
|
7月前
|
分布式计算 大数据 Scala
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
【大数据技术Hadoop+Spark】Spark RDD创建、操作及词频统计、倒排索引实战(超详细 附源码)
326 1
|
4月前
|
分布式计算 资源调度 监控
什么是 Spark DAG?
【8月更文挑战第14天】
247 5
|
6月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
43 0
|
7月前
|
分布式计算 Java Hadoop
Spark3.3.0源码编译补充篇-抓狂的证书问题
Spark3.3.0源码编译补充篇-抓狂的证书问题
47 0
|
7月前
|
分布式计算 Java 测试技术
肝Spark源码的若干骚操作
肝Spark源码的若干骚操作
51 0
|
7月前
|
分布式计算 Java 程序员
Spark3.0源码编译打包
Spark3.0源码编译打包
42 0