上万字详解Spark Core(建议收藏) (二)

简介: 详解Spark Core

4. Action动作算子


动作算子 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统
saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数
foreach(func) 在数据集的每一个元素上,运行函数func进行更新
foreachPartition(func) 在数据集的每一个分区上,运行函数func


统计操作:


算子 含义
count 个数
mean 均值
sum 求和
max 最大值
min 最小值
variance 方差
sampleVariance 从采样中计算方差
stdev 标准差:衡量数据的离散程度
sampleStdev 采样的标准差
stats 查看统计结果


三、RDD的持久化/缓存


在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。


val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了


持久化/缓存API详解


  • ersist方法和cache方法


RDD通过persist或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。


通过查看RDD的源码发现cache最终也是调用了persist无参方法(默认存储只存在内存中):


image.png


  • 存储级别


默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。


持久化级别 说明
MORY_ONLY(默认) 将RDD以非序列化的Java对象存储在JVM中。 如果没有足够的内存存储RDD,则某些分区将不会被缓存,每次需要时都会重新计算。 这是默认级别
MORY_AND_DISK(开发中可以使用这个) 将RDD以非序列化的Java对象存储在JVM中。如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取
MEMORY_ONLY_SER (Java and Scala) 将RDD以序列化的Java对象(每个分区一个字节数组)的方式存储.这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的CPU
MEMORY_AND_DISK_SER (Java and Scala) 与MEMORY_ONLY_SER类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们
DISK_ONLY 将RDD分区存储在磁盘上
MEMORY_ONLY_2, MEMORY_AND_DISK_2等 与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上
OFF_HEAP(实验中) 与MEMORY_ONLY_SER类似,但将数据存储在堆外内存中。 (即不是直接存储在JVM内存中)


总结:


  1. RDD持久化/缓存的目的是为了提高后续操作的速度
  2. 缓存的级别有很多,默认只存在内存中,开发中使用memory_and_disk
  3. 只有执行action操作的时候才会真正将RDD数据进行持久化/缓存
  4. 实际开发中如果某一个RDD后续会被频繁的使用,可以将该RDD进行持久化/缓存


四、RDD容错机制Checkpoint


  • 持久化的局限:


持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。


  • 问题解决:


Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。


用法:


SparkContext.setCheckpointDir("目录") //HDFS的目录
RDD.checkpoint


  • 总结:


  • 开发中如何保证数据的安全性性及读取效率:


可以对频繁使用且重要的数据,先做缓存/持久化,再做checkpint操作。


  • 持久化和Checkpoint的区别:


  1. 位置:


Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存--实验中)
Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。


  1. 生命周期:


Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法
Checkpoint的RDD在程序结束后依然存在,不会被删除。


五、RDD依赖关系


1. 宽窄依赖


  • 两种依赖关系类型:


RDD和它依赖的父RDD的关系有两种不同的类型,即


宽依赖(wide dependency/shuffle dependency)


窄依赖(narrow dependency)


image.png


  • 图解:


image.png


  • 如何区分宽窄依赖:


窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖;


宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)。


2. 为什么要设计宽窄依赖


  1. 对于窄依赖:


窄依赖的多个分区可以并行计算;

窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。


  1. 对于宽依赖:


划分Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。


六、DAG的生成和划分Stage


1. DAG介绍


  • DAG是什么:


DAG(Directed Acyclic Graph有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是RDD执行的流程);


原始的RDD通过一系列的转换操作就形成了DAG有向无环图,任务执行时,可以按照DAG的描述,执行真正的计算(数据被操作的一个过程)。


  • DAG的边界


开始:通过SparkContext创建的RDD;


结束:触发Action,一旦触发Action就形成了一个完整的DAG。


2.DAG划分Stage


image.png


一个Spark程序可以有多个DAG(有几个Action,就有几个DAG,上图最后只有一个Action(图中未表现),那么就是一个DAG)。


一个DAG可以有多个Stage(根据宽依赖/shuffle进行划分)。


同一个Stage可以有多个Task并行执行(task数=分区数,如上图,Stage1 中有三个分区P1、P2、P3,对应的也有三个 Task)。


可以看到这个DAG中只reduceByKey操作是一个宽依赖,Spark内核会以此为边界将其前后划分成不同的Stage。


同时我们可以注意到,在图中Stage1中,从textFile到flatMap到map都是窄依赖,这几步操作可以形成一个流水线操作,通过flatMap操作生成的partition可以不用等待整个RDD计算结束,而是继续进行map操作,这样大大提高了计算的效率。


  • 为什么要划分Stage? --并行计算


一个复杂的业务逻辑如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照shuffle进行划分(也就是按照宽依赖就行划分),就可以将一个DAG划分成多个Stage/阶段,在同一个Stage中,会有多个算子操作,可以形成一个pipeline流水线,流水线内的多个平行的分区可以并行执行。


  • 如何划分DAG的stage?


对于窄依赖,partition的转换处理在stage中完成计算,不划分(将窄依赖尽量放在在同一个stage中,可以实现流水线计算)。


对于宽依赖,由于有shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算,也就是说需要要划分stage。


总结:


Spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中


具体的划分算法请参见AMP实验室发表的论文:


《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》


http://xueshu.baidu.com/usercenter/paper/show?


paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se

相关文章
|
4月前
|
设计模式 SQL 分布式计算
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
43 0
|
4月前
|
分布式计算 监控 分布式数据库
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
Spark Day05:Spark Core之Sougou日志分析、外部数据源和共享变量
54 0
|
4月前
|
存储 缓存 分布式计算
【Spark】Spark Core Day04
【Spark】Spark Core Day04
33 1
|
SQL 机器学习/深度学习 存储
Spark Core
Spark Core
196 0
|
存储 SQL 分布式计算
|
分布式计算 Spark C++
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
|
消息中间件 JSON 分布式计算
Spark Core读取ES的分区问题分析
写这篇文章的原因是前两天星球球友去面试,面试管问了一下,Spark 分析ES的数据,生成的RDD分区数跟什么有关系呢? 稍微猜测一下就能想到跟分片数有关,但是具体是什么关系呢? 可想的具体关系可能是以下两种: 1).就像KafkaRDD的分区与kafka topic分区数的关系一样,一对一。 2).ES支持游标查询,那么是不是也可以对比较大ES 索引的分片进行拆分成多个RDD分区呢? 那么下面浪尖带着大家翻一下源码看看具体情况。
231 0
|
分布式计算 Spark 存储
Spark中Task,Partition,RDD、节点数、Executor数、core数目的关系
梳理一下Spark中关于并发度涉及的几个概念File,Block,Split,Task,Partition,RDD以及节点数、Executor数、core数目的关系。
1496 0
|
存储 分布式计算 API
Spark Core组件:RDD、DataFrame和DataSet
1. 介绍 spark生态系统中,Spark Core,包括各种Spark的各种核心组件,它们能够对内存和硬盘进行操作,或者调用CPU进行计算。
1580 0