RDD依赖关系
查看血缘关系
RDD只支持粗粒度转换,每一个转换操作都是对上游RDD的元素执行函数f得到一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系。
将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算丢失的RDD的数据分区所依赖的父RDD分区数据以实现恢复,这样就避免了从头再次开始计算了。
package dep import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_dep { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local") val sc = new SparkContext(sparkConf) val datas: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\2.txt") println(datas.dependencies) //OneToOneDependency // 窄依赖 : 上游RDD的一个分区的数据只能被下有RDD一个分区的数据独享. //println(datas.toDebugString) println("-------------------------------") val words = datas.flatMap(_.split(" ")) println(words.dependencies) //OneToOneDependency //println(words.toDebugString) println("-------------------------------") val word2One = words.map((_, 1)) println(word2One.dependencies) //OneToOneDependency //println(word2One.toDebugString) println("-------------------------------") val wordcount = word2One.reduceByKey(_ + _) println(wordcount.dependencies) //ShuffleDependency // 宽依赖(shuffle依赖): 上游RDD的一个分区的数据被下游RDD的多个分区共享. //println(wordcount.toDebugString) println("-------------------------------") wordcount.collect().foreach(println) //wordcount.collect() sc.stop() } }
RDD 阶段划分
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。
RDD 任务划分
RDD任务切分中间分为:Application、Job、Stage和Task
l Application:初始化一个SparkContext即生成一个Application;
l Job:一个Action算子就会生成一个Job;
l Stage:Stage等于宽依赖(ShuffleDependency)的个数加1;
Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系。
RDD持久化
1) RDD Cache缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用,LRU算法,当内存过多的时候,spark内存中会自动采用LRU机制去删除元素。
// cache操作会增加血缘关系,不改变原有的血缘关系 println(wordToOneRdd.toDebugString) // 数据缓存。 wordToOneRdd.cache() // 可以更改存储级别 //mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
存储级别
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
package dep import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_persist { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local") val sc = new SparkContext(sparkConf) val fileRDD: RDD[String] = sc.textFile("data/word.txt") val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" ")) val word2OneRDD: RDD[(String, Int)] = wordRDD.map( word => { println("**********") (word, 1) } ) //缓存 //word2OneRDD.cache() //持久化 word2OneRDD.persist(StorageLevel.MEMORY_ONLY) val reduceRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_ + _) println(reduceRDD.toDebugString) reduceRDD.collect().foreach(println) println("--------------------------------------------------") val groupRDD: RDD[(String, Iterable[Int])] = word2OneRDD.groupByKey() println(groupRDD.toDebugString) groupRDD.collect().foreach(println) sc.stop() } }
RDD CheckPoint检查点
所谓的检查点其实就是通过将RDD中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
package dep import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_checkpoint { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local") val sc = new SparkContext(sparkConf) sc.setCheckpointDir("cp") //sc.setCheckpointDir("hdfs://hadoop102:8020/spark/cp") val fileRDD: RDD[String] = sc.textFile("data/word.txt") val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" ")) val word2OneRDD: RDD[(String, Int)] = wordRDD.map( word => { println("**********") (word, 1) } ) //缓存 word2OneRDD.cache() //持久化 //word2OneRDD.persist(StorageLevel.MEMORY_ONLY) //检查点 配合 cache使用, 检查点可以从cache中获取数据 word2OneRDD.checkpoint() val reduceRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_ + _) println(reduceRDD.toDebugString) reduceRDD.collect().foreach(println) println("--------------------------------------------------") val groupRDD: RDD[(String, Iterable[Int])] = word2OneRDD.groupByKey() println(groupRDD.toDebugString) groupRDD.collect().foreach(println) sc.stop() } }
缓存和检查点区别
1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
4)persist 涉及到磁盘I/O,性能较低,但是数据安全,会独立运行,所以要和cache联合使用
RDD分区器
Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。
Ø 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
Ø 每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
1) Hash分区:对于给定的key,计算其hashCode,并除以分区个数取余
1) Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序
package dep import org.apache.spark.rdd.RDD import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext} /** * Spark - 分区 * * HashPartitioner: * MR : key % 分区数 (key.hashCode() & 2147483647) % numReduceTasks * * Kafka : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; * * Spark : Utils.nonNegativeMod(key.hashCode, numPartitions) * * * HashMap : * key.hashcode & (length -1 ) ,长度必须是2^n * * 01010001 * & * 00000100 * ------------------ * 000000100 -> 4 * 000000000000 -> 0 * */ object Spark01_RDD_partitioner { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local") val sc = new SparkContext(sparkConf) val rdd: RDD[(Int, String)] = sc.makeRDD(List( (1, "a"), (3, "d"), (2, "b"), (4, "e") ), 2) //val partitionRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2)) //partitionRDD.saveAsTextFile("output1") val myRDD: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2)) myRDD.saveAsTextFile("output2") sc.stop() } class MyPartitioner(num: Int) extends Partitioner { //分区数量 override def numPartitions: Int = num //根据数据的key值返回数据的分区索引(从0开始) override def getPartition(key: Any): Int = { val keyInt: Int = key.asInstanceOf[Int] // key match { // case // } if (keyInt <= 2) { 0 } else { 1 } } } }
RDD文件读取与保存
Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:text文件、csv文件、sequence文件以及Object文件;
文件系统分为:本地文件系统、HDFS、HBASE以及数据库。
text文件
// 读取输入文件 val inputRDD: RDD[String] = sc.textFile("input/1.txt") // 保存数据 inputRDD.saveAsTextFile("output")
sequence文件
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。
// 保存数据为SequenceFile
dataRDD.saveAsSequenceFile("output") // 读取SequenceFile文件 sc.sequenceFile[Int,Int]("output").collect().foreach(println)
object对象文件
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[T: ClassTag](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
// 保存数据 dataRDD.saveAsObjectFile("output") // 读取数据 sc.objectFile[Int]("output").collect().foreach(println)
package dep import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_ReadAndWrite { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local") val sc = new SparkContext(sparkConf) //val rdd: RDD[String] = sc.makeRDD( List("Hello","Spark", "Scala","Hello"),1) //写text //rdd.saveAsTextFile("textoutput") //写sequence //rdd.map((_,1)).saveAsSequenceFile("sequenceoutput") //写object //rdd.saveAsObjectFile("objoutput") //读text // val rdd: RDD[String] = sc.textFile("textoutput") // rdd.collect().foreach(println) //读sequence // val rdd: RDD[(String, Int)] = sc.sequenceFile[String,Int]("sequenceoutput") // rdd.collect().foreach(println) //读object val rdd: RDD[String] = sc.objectFile[String]("objoutput") rdd.collect().foreach(println) sc.stop() } }