Spark 算子

简介:

==> RDD是什么?

    ---> RDD(Resilient Distributed Dataset) 弹性分布式数据集 , 是 Spark 中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可并行计算的集合

    ---> 特点:

        ---- 自动容错

        ---- 位置感知性高度

        ---- 可伸缩性

        ---- 允许用户在执行多个查询时显示的将工作集缓存在内存中,后续的查询能够重用工作集,极大的提升了查询速度

    ---> RDD 的属性

        ---- A list of partitions


一个组分片,即数据集的基本组成单位
对于 RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度,用户可以在创建 RDD 时指定 RDD的分片个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配 到的 CPU  Core 的数目 


        ---- A function for computing each split

一个计算每个分区的函数
Spark 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现 compute 函数以达到这个目的, compute 函数会对迭代器进行复合,不需要保存每次计算的结果


        ---- A list of dependencies on other RDDs

RDD 之间的依赖关系
RDD 每次转换都会生成一个新的RDD, 所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分数据丢失时, Spark 可以通过这个依赖关系重新计算丢失 的分区数据,而不是对 RDD的所有分区进行重新计算


        ---- Optionally, a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)

一个 Partitioner, 即 RDD的分片函数
Spark 中实现 了两种类型的分片函数, 一个是基于哈希的 HashPartitioner, 另外一个是基于 RangePartitioner, 只有对于 key-value 的 RDD, 才会有 Partitioner, 非 key-value的 RDD的 Partitioner的值是None, Partitioner函数不但决定 了 RDD 本身的分片数量,也决定了 parents RDD Shuffle 输出时的分片数量


        ---- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file )

一个列表,存储存取每个 Partion 的优先位置(preferred location)

对于一个 HDFS 文件来说,这个列表 保存的就是每个Partition 所在的块的位置

按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理数据块的存储位置


==> RDD 的创建方式

    ---> 通过外部的数据文件创建 (HDFS)

1
val  rdd 1  =  sc.textFile( "hdfs://192.168.10.210:9000/data/data.txt" )

    ---> 通过 sc.parallelize 进行创建

1
val  rdd 2  =  sc.parallelize(Array( 1 , 2 , 3 , 4 , 5 , 6 ))

==> RDD的基本原理

    ---> 创建一个 RDD: 

1
2
//                          3代表分三个分区
val  rdd 1  =  sc.parallelize(Array( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ),  3 )

    ---> 一个分区运行在一个Worker 节点上, 一个 Worker 上可以运行多个分区

==> RDD  的类型

    ---> Trasformation

RDD 中的所有转换都是延迟加载的,即,不会返回计算结果,只记住这些应用到基础数据集(如,一个文件,一个列表等)上的转换动作,只有当发生一个要求返回结果给 Driver 时,这些转换才会执行(个人理解,与 Scala 中的 lazy (懒值)比较相似)

转换 含义
map(func)
返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成
filter(func) 返回一个新的RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成
flatMap(func) 类似于 map ,但是每个输入元素可以被映射为 0 或多个输出元素(返回一个序列)
mapPartitions(func) 类似于 map, 但是独立的在RDD 的每一个分片上运行,因此在类型为T  的 RDD 上运行时,func 的函数类型必须 是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为T 的 RDD 上运行时, func 的函数类型必须是(Int, Interator[T])= > Iterator[U]
sample(withReplacement, fraction, seed) 根据 fraction 指定的比例对数据进行采样, 可以选择是否使用随机数进行替换, seed 用于指定随机数生成器种子
union(otherDataset) 对源RDD 和 参数 RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD 和参数 RDD 求交集后返回一个新的RDD
distinct([numTasks]) 对源RDD 去重后返回一个新的RDD
groupByKey([numTasks]) 在 (k, v) 的RDD 上调用,返回一个(K, iterator[V]) 的 RDD
reduceByKey(func, [numTasks]) 在(k, v) 的RDD上调用,返回一个(k, v) 的 RDD, 使用指定的reduce函数,将相同key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一个(k, v)上调用,k 必须实现 Ordered 接口,返回一个按照 key 进行排序的(k, v) 的RDD
sortBy(func, [ascending], [numTasks]) 与 sortByKey 类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(k, v)和(k, w) 的RDD 上调用,返回一个相同key 对应的所有元素堆在一起的(k, (v, w)) 的 RDD
cogroup(otherDataset, [numTasks]) 在类型为(k, v)和(k, w)的 RDD 上调用 ,返回一个(k, (Iterable<v>, Iterable<w>)) 类型的 RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars])
coalesce(numPartitions)
repartitionAndSortWithinPartitions(partitions)

    ---> Action



reduce(fun) 通过 func 函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回元素个数
first() 反回 RDD 的第一个元素(类似于 take(1))
take(n) 返回一个由数据集的前 n 个元素组成的数组
takeSample(withReplacement, num, [seed]) 返回一个数组,该 数组由从数据集中随机采样的num 个元素组成,可以选择是否用随机数替换不足的部分, seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以 textfile 的形式保存到HDFS文件系统或者其它支持的文件系统,对每个元素,Spark 将会调用 toString 方法将它转换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以 Hadoop sequencefile 的格式 保存到指定的目录下,可以使HDFS 或者其它 Hadoop 支持的文件系统
saveAsObjectFile(path)
countByKey() 针对(k, v ) 类型的RDD, 返回一个(k, Int) 的 map, 表示每一个key 对应的元素个数
foreach(func) 在数据集的每一个元素上运行函数 func 进行更新


 

==> RDD  的缓存机制

    ---> 作用:缓存有可能丢失,或由于存储于内存中的数据由于内存不足而被删除,缓存容错机制保证了即使缓存丢失也能保证计算的正确执行

    ---> 实现原理:通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可, 不用全部重新计算

    ---> 运行方式:RDD通过 persist方法或 cache方法可以将前面的计算结果缓存,但并不会调用时便立缓存,而是触发后面的action 时,此RDD会被缓存到计算机内存中,供后面重用

    ---> 通过查看源码可以发现,cache 最终调用的也是 parsist

1
2
3
def  persist() : this . type  =  persist(StorageLevel.MEMORY _ ONLY)
 
def  cache() : this . type  =  persist()

    ---> 缓存使用:

1
2
3
4
5
6
val  rdd 1  =  sc.textFile( "hdfs://192.168.10.210:9000/data/data.txt" )
rdd 1 .count           // 没有缓存,直接执行
 
rdd 1 .cache
rdd 1 .count         // 第一次执行会慢一些
rdd 1 .count         // 第二次会很快


    ---> 存储级别在 object  StorageLevel 中定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object  StorageLevel{
     val  NONE  =  new  StorageLevel( false false false false )
     val  DISK _ ONLY  =  new  StorageLevel( true false false false )
     val  DISK _ ONLY _ 2  =  new  StorageLevel( true false false false )
     val  MEMORY _ ONLY  =  new  StorageLevel( false true false true )
     val  MEMORY _ ONLY _ 2  =  new  StorageLevel( false true false true )
     val  MEMORY _ ONLY _ SET  =  new  StorageLevel( false true false false )
     val  MEMORY _ ONLY _ SET _ 2  =  new  StorageLevel( false true false false )
     val  MEMORY _ AND _ DISK  =  new  StorageLevel( true true false true )
     val  MEMORY _ AND _ DISK _ 2  =  new  StorageLevel( true true false true )
     val  MEMORY _ AND _ DISK _ SET  =  new  StorageLevel( true true false false )
     val  MEMORY _ ADN _ DISK _ SET _ 2  =  new  StorageLevel( true true false false )
     val  OFF _ HEAP  =  new  StorageLevel  =  new  StorageLevel( true true true false )
 
}


==> RDD的 Checkpoint(检查点)机制: 容错机制

    ---> 检查点本质是通过将 RDD 写入 Disk 做检查点

    ---> 作用: 通过做 lineage 做容错的辅助

    ---> 运行机制: 在RDD 的中间阶段做检查点容错,之后如果有节点出现问题而丢失分区,从做检查点的 RDD 开始重新做 Lineage,以达到减少开销的目的

    ---> 设置检查点的方式: 本地目录, HDFS

        ---- 本地目录(需要将 spark-shell 运行在本地模式上)

1
2
3
4
5
6
7
8
// 设置检查点目录 
sc.setCheckpointDir( "/data/checkpoint" )
// 创建一个RDD
val  rdd 1  =  sc.textFile( "hdfs://192.168.10.210:9000/data/data.txt" )
// 设置检查点
rdd 1 .checkpoint
// 执行,触发 Action ,会在检查点目录生成检查点
rdd 1 .count


        ---- HDFS(需要将 Spark-shell 运行在集群模式上)

1
2
3
4
5
6
7
8
// 设置检查点目录 
sc.setCheckpointDir( "hdfs://192.168.10.210:9000/data/checkpoint" )
// 创建一个RDD
val  rdd 1  =  sc.textFile( "hdfs://192.168.10.210:9000/data/data.txt" )
// 设置检查点
rdd 1 .checkpoint
// 执行,触发 Action ,会在检查点目录生成检查点
rdd 1 .count


==> RDD 的依赖关系 和 Spark 任务中的 Stage

    ---> RDD 依赖关系    RDD和它的父 RDD(s)的关系有两种不同的类型

        ---- 窄依赖    每个 父 RDD 的 partition 只能被子 RDD 的一个 partition 使用    一个子RDD

        ---- 宽依赖    多个子RDD 的 partition 会依赖同一个父 RDD        多个子RDD

    ---> Stage    划分Stage 的依据是:宽依赖

 image.png



本文转自 菜鸟的征程 51CTO博客,原文链接:http://blog.51cto.com/songqinglong/2074380


相关文章
|
2月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
128 1
|
2月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
2月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
118 2
|
2月前
|
分布式计算 Hadoop Java
Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
【2月更文挑战第14天】Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
79 1
|
2月前
|
存储 分布式计算 Scala
bigdata-36-Spark转换算子与动作算子
bigdata-36-Spark转换算子与动作算子
17 0
|
2月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
11月前
|
存储 分布式计算 并行计算
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---2、SparkCore(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
|
2月前
|
机器学习/深度学习 分布式计算 数据库连接
[Spark精进]必须掌握的4个RDD算子之filter算子
[Spark精进]必须掌握的4个RDD算子之filter算子
74 2
|
2月前
|
分布式计算 Spark
[Spark精进]必须掌握的4个RDD算子之flatMap算子
[Spark精进]必须掌握的4个RDD算子之flatMap算子
66 0
|
2月前
|
机器学习/深度学习 分布式计算 数据处理
[Spark精进]必须掌握的4个RDD算子之mapPartitions算子
[Spark精进]必须掌握的4个RDD算子之mapPartitions算子
72 0