5万字长文!搞定Spark方方面面(三)

简介: 5万字长文!搞定Spark方方面面

扩展:mapPartitionsWithIndex(同时获取分区号)

功能:取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
//该函数的功能是将对应分区中的数据取出来,并且带上分区编号
// 一个index 分区编号
// 一个iter分区内的数据
val func = (index: Int, iter: Iterator[Int]) => {
iter.map(x => “[partID:” + index + ", val: " + x + “]”)
}
rdd1.mapPartitionsWithIndex(func).collect
//Array[String] = Array(
[partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3],
[partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6],
[partID:2, val: 7], [partID:2, val: 8], [partID:2, val: 9]
)

扩展:aggregate

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
//0表示初始值
//第一个_+,表示区内聚合,第一个_表示历史值,第二个_表示当前值
//第二个+_,表示区间聚合,第一个_表示历史值,第二个_表示当前值
val result1: Int = rdd1.aggregate(0)( _ + _ , _ + _) //45 ==> 6 + 15 + 24 = 45
//10表示初始值,每个分区有初始值,区间聚合的时候也有初始值
val result2: Int = rdd1.aggregate(10)( _ + _ , _ + _) //85 ==> 10+ (10+6 + 10+15 + 10+24)=85

扩展:combineByKey

val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”).flatMap(.split(" ")).map((, 1))
//Array((hello,1), (me,1), (hello,1), (you,1), (hello,1), (her,1))
//x => x,表示key不变
//(a: Int, b: Int) => a + b:表示区内聚合
//(m: Int, n: Int) => m + n:表示区间聚合
val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
//val rdd2 = rdd1.combineByKey(x => x, _ + _ , _ + _ )//注意这里简写错误,原则:能省则省,不能省则不要偷懒
rdd2.collect
//Array[(String, Int)] = Array((hello,3), (me,1), (you,1), (her,1))
val rddData1: RDD[(String, Float)] = sc.parallelize(
Array(
(“班级1”, 95f),
(“班级2”, 80f),
(“班级1”, 75f),
(“班级3”, 97f),
(“班级2”, 88f)),
2)
val rddData2 = rddData1.combineByKey(
grade => (grade, 1),
(gc: (Float, Int), grade) => (gc._1 + grade, gc._2 + 1),
(gc1: (Float, Int), gc2: (Float, Int)) => (gc1._1 + gc2._1, gc1._2 + gc2._2)
)
val rddData3 = rddData2.map(t => (t._1, t._2._1 / t._2._2))
rddData3.collect

扩展:aggregateByKey

val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2)
def func(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.map(x => “[partID:” + index + ", val: " + x + “]”)
}
pairRDD.mapPartitionsWithIndex(func).collect
//Array(
[partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)],
[partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)]
)
pairRDD.aggregateByKey(0)(math.max( _ , _ ), _ + _ ).collect
// Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
//100表示区内初始值,区间聚合没有
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
//Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
pairRDD.aggregateByKey(5)(math.max(_, _), _ + _).collect
//Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,10))
pairRDD.aggregateByKey(10)(math.max(_, _), _ + _).collect
//Array[(String, Int)] = Array((dog,12), (cat,22), (mouse,20))
val rddData1 = sc.parallelize(
Array(
(“用户1”, “接口1”),
(“用户2”, “接口1”),
(“用户1”, “接口1”),
(“用户1”, “接口2”),
(“用户2”, “接口3”)),
2)
val rddData2 = rddData1.aggregateByKey(collection.mutable.SetString)(
(urlSet, url) => urlSet += url,
(urlSet1, urlSet2) => urlSet1 ++= urlSet2)
rddData2.collect

小练习

需求
给定一个键值对RDD
val rdd = sc.parallelize(Array((“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6)))
key表示图书名称,
value表示某天图书销量,
请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
最终结果:(“spark”,4),(“hadoop”,5)
val rdd1 = rdd.groupByKey
rdd1.collect
//Array((spark,CompactBuffer(6, 2)), (hadoop,CompactBuffer(4, 6)))
val rdd2 = rdd1.mapValues(v => v.sum / v.size)
rdd2.collect
答案
val rdd = sc.parallelize(Array((“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6)))
val rdd2 = rdd.groupByKey()
rdd2.collect
//Array[(String, Iterable[Int])] = Array((spark,CompactBuffer(2, 6)), (hadoop,CompactBuffer(6, 4)))
val rdd3 = rdd2.map(t=>(t._1,t._2.sum /t._2.size))
rdd3.collect
//Array[(String, Int)] = Array((spark,4), (hadoop,5))

总结

1)分类

RDD的算子分为两类,一类是Transformation转换操作,一类是Action动作操作

2)如何区分 Transformation 和 Action

返回值是RDD的为Transformation转换操作,延迟执行/懒执行/惰性执行
返回值不是RDD(如Unit、Array、Int)的为Action动作操作

3)面试题:

1.Transformation操作的API有哪些? --map/flatMap/filter…
2.Action操作的API有哪些? --collect/reduce/saveAsTextFile…
3.reduceByKey是Transformation还是Action? --Transformation
4.reduce是Transformation还是Action? – Action
5.foreach和foreachPartition的区别? foreach作用于每个元素,foreachPartition作用于每个分区

4)注意:

RDD不实际存储真正要计算的数据,而只是记录了RDD的转换关系(调用了什么方法,传入什么函数,依赖哪些RDD,分区器是什么,数量块来源机器列表)
RDD中的所有转换操作都是延迟执行(懒执行)的,也就是说并不会直接计算。只有当发生Action操作的时候,这些转换才会真正运行。

3、RDD 的持久化/缓存

3.1 引入

在实际开发中某些 RDD 的计算或转换可能会比较耗费时间,如果这些 RDD 后续还会频繁的被使用到,那么可以将这些 RDD 进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

3.2 持久化/缓存 API 详解

persist 方法和 cache 方法

RDD通过persist或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
通过查看RDD的源码发现cache最终也是调用了persist无参方法(默认存储只存在内存中)

640.png

3.3 代码演示

1)启动集群和 spark-shell

/export/servers/spark/sbin/start-all.sh
/export/servers/spark/bin/spark-shell \
--master spark://node01:7077,node02:7077 \
--executor-memory 1g \
--total-executor-cores 2

2)将一个 RDD 持久化,后续操作该 RDD 就可以直接从缓存中拿

val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了

3)存储级别

默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的

640.png

总结

640.png

3.4 总结
1.RDD持久化/缓存的目的是为了提高后续操作的速度
2.缓存的级别有很多,默认只存在内存中,开发中使用memory_and_disk
3.只有执行action操作的时候才会真正将RDD数据进行持久化/缓存
4.实际开发中如果某一个RDD后续会被频繁的使用,可以将该RDD进行持久化/缓存

4、RDD 容错机制 Checkpoint

4.1 引入

1)持久化的局限

持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

2)问题解决

Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用

3)使用步骤

1.SparkContext.setCheckpointDir("目录") //HDFS的目录
2.RDD.checkpoint()
4.2 代码演示
sc.setCheckpointDir(“hdfs://node01:8020/ckpdir”)
//设置检查点目录,会立即在HDFS上创建一个空目录
val rdd1 = sc.textFile(“hdfs://node01:8020/wordcount/input/words.txt”).flatMap(.split(" ")).map(( _ , 1)).reduceByKey( _ +)
rdd1.checkpoint() //对rdd1进行检查点保存
rdd1.collect //Action操作才会真正执行checkpoint
//后续如果要使用到rdd1可以从checkpoint中读取

查看结果

hdfs dfs -ls /
或者通过web界面查看
http://192.168.1.101:50070/dfshealth.html#tab-overview
4.3 总结

1)开发中如何保证数据的安全性性及读取效率

可以对频繁使用且重要的数据,先做缓存/持久化,再做checkpint操作

2)持久化和 Checkpoint 的区别

1.位置
Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存–实验中)
Checkpoint 可以保存数据到 HDFS 这类可靠的存储上
2.生命周期
Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法
Checkpoint的RDD在程序结束后依然存在,不会被删除
3.Lineage(血统、依赖链–其实就是依赖关系)
Persist和Cache,不会丢掉RDD间的依赖链/依赖关系,因为这种缓存是不可靠的,如果出现了一些错误(例如 Executor 宕机),需要通过回溯依赖链重新计算出来

Checkpoint 会斩断依赖链,因为 Checkpoint 会把结果保存在 HDFS 这类存储中,更加的安全可靠,一般不需要回溯依赖链。

407549054520b2f823d725f4c54dcf12.png

补充:Lineage RDD的Lineage(血统、依赖链)会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
在进行故障恢复时,Spark会对读取Checkpoint的开销和重新计算RDD分区的开销进行比较,从而自动选择最优的恢复策略。

5、RDD 依赖关系

5.1. 宽窄依赖

1)两种依赖关系类型

RDD和它依赖的父RDD的关系有两种不同的类型,即
宽依赖(wide dependency/shuffle dependency)
窄依赖(narrow dependency)

0c92a83f556480d83f6ff4b2daa77573.png

2)图解

0bbe36faf7bf809fa4e2ca28362c33be.png

1f33d42c286b76bce6b907391800eb89.png

3)如何区分宽窄依赖

窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)

4)面试题

子RDD的一个分区依赖多个父RDD是宽依赖还是窄依赖?
不能确定,也就是宽窄依赖的划分依据是父RDD的一个分区是否被子RDD的多个分区所依赖,是,就是宽依赖,或者从shuffle的角度去判断,有shuffle就是宽依赖
5.2 为什么要设计宽窄依赖
1)对于窄依赖
Spark可以并行计算
如果有一个分区数据丢失,只需要从父RDD的对应1个分区重新计算即可,不需要重新计算整个任务,提高容错。
2)对于宽依赖
是划分Stage的依据

6、DAG 的生成和划分 Stage(上面也有提到 DAG)

5b77ba4185c697e6e8ce48bff8fd8897.png

6.1 为什么要划分 Stage? --并行计算
一个复杂的业务逻辑如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照shuffle进行划分(也就是按照宽依赖就行划分),就可以将一个DAG划分成多个Stage/阶段,在同一个Stage中,会有多个算子操作,可以形成一个pipeline流水线,流水线内的多个平行的分区可以并行执行
6.2 如何划分 DAG 的 stage
对于窄依赖,partition的转换处理在stage中完成计算,不划分(将窄依赖尽量放在在同一个stage中,可以实现流水线计算)
对于宽依赖,由于有shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算,也就是说需要要划分stage(出现宽依赖即拆分)
6.3 总结
Spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中
具体的划分算法请参见AMP实验室发表的论文
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se

7、Spark 运行原理及流程

7.1. 基本流程

Spark 运行基本流程

1.当一个Spark应用被提交时,首先需要为这个Spark Application构建基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext,
2.SparkContext向资源管理器注册并申请运行Executor资源;
3.资源管理器为Executor分配资源并启动Executor进程,Executor运行情况将随着心跳发送到资源管理器上;
4.SparkContext根据RDD的依赖关系构建成DAG图,并提交给DAGScheduler进行解析划分成Stage,并把该Stage中的Task组成Taskset发送给TaskScheduler。
5.TaskScheduler将Task发放给Executor运行,同时SparkContext将应用程序代码发放给Executor。
6.Executor将Task丢入到线程池中执行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
7.2 流程图解

ca8d25878ae763c30f7f3fb407ea5e8e.png

ca560a036255964628768dc78e8dcea6.png

6d096dd92cc43daaedc225f7d7a02271.png

7.3 总结
1.Spark应用被提交–>SparkContext向资源管理器注册并申请资源–>启动Executor
2.RDD–>构建DAG–>DAGScheduler划分Stage形成TaskSet–>TaskScheduler提交Task–>Worker上的Executor执行Task

6d17af920a84d50c1051dfc54b295be7.png

8、RDD 累加器和广播变量

在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

为了满足这种需求,Spark 提供了两种类型的变量:

1.累加器accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)
2.广播变量broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。
8.1 累加器

8.1.1 不使用累加器

c13f1e72b25fd2d64240f2e30106ad88.png

8.1.2 使用累加器

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果。
val xx: Accumulator[Int] = sc.accumulator(0)

8.1.3 代码演示

package cn.itcast.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
object AccumulatorTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //使用scala集合完成累加
    var counter1: Int = 0;
    var data = Seq(1,2,3)
    data.foreach(x => counter1 += x )
    println(counter1)//6
    println("+++++++++++++++++++++++++")
    //使用RDD进行累加
    var counter2: Int = 0;
    val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
    dataRDD.foreach(x => counter2 += x)
    println(counter2)//0
    //注意:上面的RDD操作运行结果是0
    //因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量
    //而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2
    //最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系
    //那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!
    //如果解决?---使用累加器
    val counter3: Accumulator[Int] = sc.accumulator(0)
    dataRDD.foreach(x => counter3 += x)
    println(counter3)//6
  }
}
8.2 广播变量

8.2.1 不使用广播变量

a4a2fc8b19fe1f58a72be11fd2aed0e4.png

8.2.2 使用广播变量

2687df8ad6cca02810f328c1c4fce5a2.png

8.2.3 代码演示

package cn.itcast.core
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastVariablesTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")
    //不使用广播变量
    val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
    val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
    //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
    val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
    //根据水果编号取水果名称
    val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
    fruitNames.foreach(println)
    //注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,
    //那么会导致,被各个Task共用到的fruitMap会被多次传输
    //应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可
    //如何做到?---使用广播变量
    println("=====================")
    val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
    val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
    fruitNames2.foreach(println)
  }
}
相关文章
|
3月前
|
存储 编译器 C++
【非常详细!】QT基础【二万字长文】
【非常详细!】QT基础【二万字长文】
|
6月前
|
消息中间件 分布式计算 Kafka
Spark面试干货总结!(8千字长文、27个知识点、21张图)
Spark面试干货总结!(8千字长文、27个知识点、21张图)
141 1
|
9月前
|
信息无障碍 C++
万字长文,深度分析 c++的前景
万字长文,深度分析 c++的前景
89 0
|
12月前
|
存储 缓存 算法
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!2
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
12月前
|
存储 前端开发 安全
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!4
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
12月前
|
存储 JSON 安全
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!1
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
12月前
|
存储 Java 编译器
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!3
【Java虚拟机】万字长文,搞定Java虚拟机方方面面!
|
SQL 分布式计算 监控
5万字长文!搞定Spark方方面面(六)
5万字长文!搞定Spark方方面面
173 0
5万字长文!搞定Spark方方面面(六)
|
SQL 分布式计算 HIVE
5万字长文!搞定Spark方方面面(五)
5万字长文!搞定Spark方方面面
124 0
|
分布式计算 资源调度 算法
5万字长文!搞定Spark方方面面(一)
5万字长文!搞定Spark方方面面
300 0
5万字长文!搞定Spark方方面面(一)