扩展:mapPartitionsWithIndex(同时获取分区号)
功能:取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的 val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) //该函数的功能是将对应分区中的数据取出来,并且带上分区编号 // 一个index 分区编号 // 一个iter分区内的数据 val func = (index: Int, iter: Iterator[Int]) => { iter.map(x => “[partID:” + index + ", val: " + x + “]”) } rdd1.mapPartitionsWithIndex(func).collect //Array[String] = Array( [partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:2, val: 7], [partID:2, val: 8], [partID:2, val: 9] )
扩展:aggregate
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) //0表示初始值 //第一个_+,表示区内聚合,第一个_表示历史值,第二个_表示当前值 //第二个+_,表示区间聚合,第一个_表示历史值,第二个_表示当前值 val result1: Int = rdd1.aggregate(0)( _ + _ , _ + _) //45 ==> 6 + 15 + 24 = 45 //10表示初始值,每个分区有初始值,区间聚合的时候也有初始值 val result2: Int = rdd1.aggregate(10)( _ + _ , _ + _) //85 ==> 10+ (10+6 + 10+15 + 10+24)=85
扩展:combineByKey
val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”).flatMap(.split(" ")).map((, 1)) //Array((hello,1), (me,1), (hello,1), (you,1), (hello,1), (her,1)) //x => x,表示key不变 //(a: Int, b: Int) => a + b:表示区内聚合 //(m: Int, n: Int) => m + n:表示区间聚合 val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) //val rdd2 = rdd1.combineByKey(x => x, _ + _ , _ + _ )//注意这里简写错误,原则:能省则省,不能省则不要偷懒 rdd2.collect //Array[(String, Int)] = Array((hello,3), (me,1), (you,1), (her,1)) val rddData1: RDD[(String, Float)] = sc.parallelize( Array( (“班级1”, 95f), (“班级2”, 80f), (“班级1”, 75f), (“班级3”, 97f), (“班级2”, 88f)), 2) val rddData2 = rddData1.combineByKey( grade => (grade, 1), (gc: (Float, Int), grade) => (gc._1 + grade, gc._2 + 1), (gc1: (Float, Int), gc2: (Float, Int)) => (gc1._1 + gc2._1, gc1._2 + gc2._2) ) val rddData3 = rddData2.map(t => (t._1, t._2._1 / t._2._2)) rddData3.collect
扩展:aggregateByKey
val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2) def func(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = { iter.map(x => “[partID:” + index + ", val: " + x + “]”) } pairRDD.mapPartitionsWithIndex(func).collect //Array( [partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)] ) pairRDD.aggregateByKey(0)(math.max( _ , _ ), _ + _ ).collect // Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6)) //100表示区内初始值,区间聚合没有 pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect //Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200)) pairRDD.aggregateByKey(5)(math.max(_, _), _ + _).collect //Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,10)) pairRDD.aggregateByKey(10)(math.max(_, _), _ + _).collect //Array[(String, Int)] = Array((dog,12), (cat,22), (mouse,20)) val rddData1 = sc.parallelize( Array( (“用户1”, “接口1”), (“用户2”, “接口1”), (“用户1”, “接口1”), (“用户1”, “接口2”), (“用户2”, “接口3”)), 2) val rddData2 = rddData1.aggregateByKey(collection.mutable.SetString)( (urlSet, url) => urlSet += url, (urlSet1, urlSet2) => urlSet1 ++= urlSet2) rddData2.collect
小练习
需求 给定一个键值对RDD val rdd = sc.parallelize(Array((“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6))) key表示图书名称, value表示某天图书销量, 请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。 最终结果:(“spark”,4),(“hadoop”,5) val rdd1 = rdd.groupByKey rdd1.collect //Array((spark,CompactBuffer(6, 2)), (hadoop,CompactBuffer(4, 6))) val rdd2 = rdd1.mapValues(v => v.sum / v.size) rdd2.collect
答案 val rdd = sc.parallelize(Array((“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6))) val rdd2 = rdd.groupByKey() rdd2.collect //Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4))) val rdd3 = rdd2.map(t=>(t._1,t._2.sum /t._2.size)) rdd3.collect //Array[(String, Int)] = Array((spark,4), (hadoop,5))
总结
1)分类
RDD的算子分为两类,一类是Transformation转换操作,一类是Action动作操作
2)如何区分 Transformation 和 Action
返回值是RDD的为Transformation转换操作,延迟执行/懒执行/惰性执行 返回值不是RDD(如Unit、Array、Int)的为Action动作操作
3)面试题:
1.Transformation操作的API有哪些? --map/flatMap/filter… 2.Action操作的API有哪些? --collect/reduce/saveAsTextFile… 3.reduceByKey是Transformation还是Action? --Transformation 4.reduce是Transformation还是Action? – Action 5.foreach和foreachPartition的区别? foreach作用于每个元素,foreachPartition作用于每个分区
4)注意:
RDD不实际存储真正要计算的数据,而只是记录了RDD的转换关系(调用了什么方法,传入什么函数,依赖哪些RDD,分区器是什么,数量块来源机器列表) RDD中的所有转换操作都是延迟执行(懒执行)的,也就是说并不会直接计算。只有当发生Action操作的时候,这些转换才会真正运行。
3、RDD 的持久化/缓存
3.1 引入
在实际开发中某些 RDD 的计算或转换可能会比较耗费时间,如果这些 RDD 后续还会频繁的被使用到,那么可以将这些 RDD 进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
3.2 持久化/缓存 API 详解
persist 方法和 cache 方法
RDD通过persist或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。 通过查看RDD的源码发现cache最终也是调用了persist无参方法(默认存储只存在内存中)
3.3 代码演示
1)启动集群和 spark-shell
/export/servers/spark/sbin/start-all.sh /export/servers/spark/bin/spark-shell \ --master spark://node01:7077,node02:7077 \ --executor-memory 1g \ --total-executor-cores 2
2)将一个 RDD 持久化,后续操作该 RDD 就可以直接从缓存中拿
val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt") val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_) rdd2.cache //缓存/持久化 rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化 rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了
3)存储级别
默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的
总结
3.4 总结
1.RDD持久化/缓存的目的是为了提高后续操作的速度 2.缓存的级别有很多,默认只存在内存中,开发中使用memory_and_disk 3.只有执行action操作的时候才会真正将RDD数据进行持久化/缓存 4.实际开发中如果某一个RDD后续会被频繁的使用,可以将该RDD进行持久化/缓存
4、RDD 容错机制 Checkpoint
4.1 引入
1)持久化的局限
持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。
2)问题解决
Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用
3)使用步骤
1.SparkContext.setCheckpointDir("目录") //HDFS的目录 2.RDD.checkpoint()
4.2 代码演示
sc.setCheckpointDir(“hdfs://node01:8020/ckpdir”) //设置检查点目录,会立即在HDFS上创建一个空目录 val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”).flatMap(.split(" ")).map(( _ , 1)).reduceByKey( _ +) rdd1.checkpoint() //对rdd1进行检查点保存 rdd1.collect //Action操作才会真正执行checkpoint //后续如果要使用到rdd1可以从checkpoint中读取
查看结果
hdfs dfs -ls / 或者通过web界面查看 http://192.168.1.101:50070/dfshealth.html#tab-overview
4.3 总结
1)开发中如何保证数据的安全性性及读取效率
可以对频繁使用且重要的数据,先做缓存/持久化,再做checkpint操作
2)持久化和 Checkpoint 的区别
1.位置 Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存–实验中) Checkpoint 可以保存数据到 HDFS 这类可靠的存储上 2.生命周期 Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法 Checkpoint的RDD在程序结束后依然存在,不会被删除 3.Lineage(血统、依赖链–其实就是依赖关系) Persist和Cache,不会丢掉RDD间的依赖链/依赖关系,因为这种缓存是不可靠的,如果出现了一些错误(例如 Executor 宕机),需要通过回溯依赖链重新计算出来
Checkpoint 会斩断依赖链,因为 Checkpoint 会把结果保存在 HDFS 这类存储中,更加的安全可靠,一般不需要回溯依赖链。
补充:Lineage RDD的Lineage(血统、依赖链)会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。 在进行故障恢复时,Spark会对读取Checkpoint的开销和重新计算RDD分区的开销进行比较,从而自动选择最优的恢复策略。
5、RDD 依赖关系
5.1. 宽窄依赖
1)两种依赖关系类型
RDD和它依赖的父RDD的关系有两种不同的类型,即 宽依赖(wide dependency/shuffle dependency) 窄依赖(narrow dependency)
2)图解
3)如何区分宽窄依赖
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖 宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)
4)面试题
子RDD的一个分区依赖多个父RDD是宽依赖还是窄依赖? 不能确定,也就是宽窄依赖的划分依据是父RDD的一个分区是否被子RDD的多个分区所依赖,是,就是宽依赖,或者从shuffle的角度去判断,有shuffle就是宽依赖
5.2 为什么要设计宽窄依赖
1)对于窄依赖 Spark可以并行计算 如果有一个分区数据丢失,只需要从父RDD的对应1个分区重新计算即可,不需要重新计算整个任务,提高容错。 2)对于宽依赖 是划分Stage的依据
6、DAG 的生成和划分 Stage(上面也有提到 DAG)
6.1 为什么要划分 Stage? --并行计算
一个复杂的业务逻辑如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照shuffle进行划分(也就是按照宽依赖就行划分),就可以将一个DAG划分成多个Stage/阶段,在同一个Stage中,会有多个算子操作,可以形成一个pipeline流水线,流水线内的多个平行的分区可以并行执行
6.2 如何划分 DAG 的 stage
对于窄依赖,partition的转换处理在stage中完成计算,不划分(将窄依赖尽量放在在同一个stage中,可以实现流水线计算) 对于宽依赖,由于有shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算,也就是说需要要划分stage(出现宽依赖即拆分)
6.3 总结
Spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中 具体的划分算法请参见AMP实验室发表的论文 《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》 http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se
7、Spark 运行原理及流程
7.1. 基本流程
Spark 运行基本流程
1.当一个Spark应用被提交时,首先需要为这个Spark Application构建基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext, 2.SparkContext向资源管理器注册并申请运行Executor资源; 3.资源管理器为Executor分配资源并启动Executor进程,Executor运行情况将随着心跳发送到资源管理器上; 4.SparkContext根据RDD的依赖关系构建成DAG图,并提交给DAGScheduler进行解析划分成Stage,并把该Stage中的Task组成Taskset发送给TaskScheduler。 5.TaskScheduler将Task发放给Executor运行,同时SparkContext将应用程序代码发放给Executor。 6.Executor将Task丢入到线程池中执行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
7.2 流程图解
7.3 总结
1.Spark应用被提交–>SparkContext向资源管理器注册并申请资源–>启动Executor 2.RDD–>构建DAG–>DAGScheduler划分Stage形成TaskSet–>TaskScheduler提交Task–>Worker上的Executor执行Task
8、RDD 累加器和广播变量
在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark 提供了两种类型的变量:
1.累加器accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和) 2.广播变量broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。
8.1 累加器
8.1.1 不使用累加器
8.1.2 使用累加器
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果。 val xx: Accumulator[Int] = sc.accumulator(0)
8.1.3 代码演示
package cn.itcast.core import org.apache.spark.rdd.RDD import org.apache.spark.{Accumulator, SparkConf, SparkContext} object AccumulatorTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //使用scala集合完成累加 var counter1: Int = 0; var data = Seq(1,2,3) data.foreach(x => counter1 += x ) println(counter1)//6 println("+++++++++++++++++++++++++") //使用RDD进行累加 var counter2: Int = 0; val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3] dataRDD.foreach(x => counter2 += x) println(counter2)//0 //注意:上面的RDD操作运行结果是0 //因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量 //而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2 //最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系 //那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊! //如果解决?---使用累加器 val counter3: Accumulator[Int] = sc.accumulator(0) dataRDD.foreach(x => counter3 += x) println(counter3)//6 } }
8.2 广播变量
8.2.1 不使用广播变量
8.2.2 使用广播变量
8.2.3 代码演示
package cn.itcast.core import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object BroadcastVariablesTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //不使用广播变量 val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape"))) val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana) val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3)) //根据水果编号取水果名称 val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x)) fruitNames.foreach(println) //注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多, //那么会导致,被各个Task共用到的fruitMap会被多次传输 //应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可 //如何做到?---使用广播变量 println("=====================") val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap) val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x)) fruitNames2.foreach(println) } }