开发者学堂课程【大数据实时计算框架Spark快速入门:Spark DAG源码剖析_2】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/100/detail/1674
Spark DAG源码剖析_2
//First figure out the indexes of indexes of partition ids to compute.
Val partitionsToCompute:Seq[Int] = stage.findMissingpartitions()
Val taskIdToLocations:Map[Int,Seq[TaskLocation]] = try {
Stage match {
Case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id,getPreferredLocs(stage.rdd,id))}.toMap
Case s: ResultStage =>
Val job = s.activeJob.get
partitionsToCompute.map ( id =>
Val p = s.partitions(id)
(id,getPreferredLocs(stage.rdd,p))
}.toMap
}
}
rdd在上图Stage2中代表的是F表格
// If the partition is cached,return the cache locations
Val cached “ getCacheLocs(rdd)(partition)
If (cached.nonEmpty) {
Return cached
}
看rdd有没有自动返存,如果rdd和partition有自动返存直接返存到原来的位置,如果没有自动返存的话:
// If the RDD has some placement pceferences(as is the case for input RDDs),get those
Val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
If (rddPrefs.nonEmpty) {
Return rddPrefs.map(TaskLocation(_))
}
·Get the array of partitions of this RDD,taking into account whether the
·RDD is checkpointed or not.
·/
Final def partitions:Array[Partition] = {
checkpointRDD.map(_.partitions).getorElse {
If (partitions_== null) {
Partitions_= getpartitions
}
Partitions_
}
}
Protected def getInputFormat(conf:Jobconf):InputFormat{K,V} = {
Val newInputFormat = Reflect:onutils,newInstance(inputFormatClass,asInstanceOf[class[_]],conf).asInstanceOf{InputFormat[K,V]]
newInputFormat match {
Cass c:configurable => c.setconf(conf)
Cass_=>
}
newInputFormat
}
Override def getPartitions:Array[Partition] = {
Val jobconf = getJobconf()
//add the credentials here as this can be called before SparkContext initiallzed
SparkHadooputil.get.addcredentials(jobconf)
Val inputFormat = getInputFormat(jobconf)
Val inputSplits = inputFormat.getSplits(jobconf,minPartitions)
Val array = new Array[Partition](inputSplits.size)
For(i <- 0 until inputSplits.size) {
Arry(i) = new HadoopPartition(id,i,inputSplits(i))
}
Array
}
Def textFile
hadoopFile(path,classof[TextInputFormat],classof[LongWritable],classof[Text]
hadoopFile方法返回的是new HadoopRDD