RDD
RDD的概念在Spark中十分重要,上面只是简单的介绍了一下,下面详细的对RDD展开介绍。
RDD是“Resilient Distributed Dataset”的缩写,从全称就可以了解到RDD的一些典型特性:
- Resilient(弹性):RDD之间会形成有向无环图(DAG),如果RDD丢失了或者失效了,可以从父RDD重新计算得到。即容错性。
- Distributed(分布式):RDD的数据是以逻辑分区的形式分布在集群的不同节点的。
- Dataset(数据集):即RDD存储的数据记录,可以从外部数据生成RDD,例如Json文件,CSV文件,文本文件,数据库等。
RDD里面的数据集会被逻辑分成若干个分区,这些分区是分布在集群的不同节点的,基于这样的特性,RDD才能在集群不同节点并行计算。
RDD特性
- 内存计算
Spark RDD运算数据是在内存中进行的,在内存足够的情况下,不会把中间结果存储在磁盘,所以计算速度非常高效。
- 惰性求值
所有的转换操作都是惰性的,也就是说不会立即执行任务,只是把对数据的转换操作记录下来而已。只有碰到action操作才会被真正的执行。
- 容错性
Spark RDD具备容错特性,在RDD失效或者数据丢失的时候,可以根据DAG从父RDD重新把数据集计算出来,以达到数据容错的效果。
- 不变性
RDD是进程安全的,因为RDD是不可修改的。它可以在任何时间点被创建和查询,使得缓存,共享,备份都非常简单。在计算过程中,是RDD的不可修改特性保证了数据的一致性。
- 持久化
可以调用cache或者persist函数,把RDD缓存在内存、磁盘,下次使用的时候不需要重新计算而是直接使用。
RDD操作
RDD支持两种操作:
- 转换操作(Transformation)
- 行动操作(Actions)
转换操作(Transformation)
转换操作以RDD做为输入参数,然后输出一个或者多个RDD。转换操作不会修改输入RDD。Map()
、Filter()
这些都属于转换操作。
转换操作是惰性求值操作,只有在碰到行动操作(Actions)的时候,转换操作才会真正实行。转换操作分两种:窄依赖和宽依赖(上文提到过)。
下面是一些常见的转换操作:
转换操作 | 描述 |
map | 将函数应用于 RDD 中的每个元素,并返回一个新的 RDD |
filter | 返回一个新的 RDD,其中包含满足给定谓词的元素 |
flatMap | 将函数应用于 RDD 中的每个元素,并将返回的迭代器展平为一个新的 RDD |
union | 返回一个新的 RDD,其中包含两个 RDD 的元素 |
distinct | 返回一个新的 RDD,其中包含原始 RDD 中不同的元素 |
groupByKey | 将键值对 RDD 中具有相同键的元素分组到一起,并返回一个新的 RDD |
reduceByKey | 将键值对 RDD 中具有相同键的元素聚合到一起,并返回一个新的 RDD |
sortByKey | 返回一个新的键值对 RDD,其中元素按照键排序 |
行动操作(Action)
Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。
Action 操作 | 描述 |
reduce | 通过函数聚合 RDD 中的所有元素 |
collect | 将 RDD 中的所有元素返回到驱动程序 |
count | 返回 RDD 中的元素个数 |
first | 返回 RDD 中的第一个元素 |
take | 返回 RDD 中的前 n 个元素 |
takeOrdered | 返回 RDD 中的前 n 个元素,按照自然顺序或指定的顺序排序 |
saveAsTextFile | 将 RDD 中的元素保存到文本文件中 |
foreach | 将函数应用于 RDD 中的每个元素 |
RDD 的创建方式
创建RDD有3种不同方式:
- 从外部存储系统
- 从其他RDD
- 由一个已经存在的 Scala 集合创建
从外部存储系统
由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop
支持的数据集,比如 HDFS、Cassandra、HBase
等:
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
从其他RDD
通过已有的 RDD 经过算子转换生成新的 RDD:
val rdd2=rdd1.flatMap(_.split(" "))
由一个已经存在的 Scala 集合创建
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8)) 或者 val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
其实makeRDD
方法底层调用了 parallelize
方法:
RDD 缓存机制
RDD 缓存是在内存存储RDD计算结果的一种优化技术。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能。
要持久化一个RDD,只要调用其cache()
或者persist()
方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
val rdd1 = sc.textFile("hdfs://node01:8020/words.txt") val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_) rdd2.cache //缓存/持久化 rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化 rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了
需要注意的是,在触发action的时候,才会去执行持久化。
cache()
和persist()
的区别在于,cache()
是persist()
的一种简化方式,cache()
的底层就是调用的persist()
的无参版本,就是调用persist(MEMORY_ONLY)
,将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()
方法。
rdd.persist(StorageLevel.MEMORY_ONLY) rdd.unpersist()
存储级别
RDD存储级别主要有以下几种。
级别 | 使用空间 | CPU时间 | 是否在内存中 | 是否在磁盘上 | 备注 |
MEMORY_ONLY | 高 | 低 | 是 | 否 | 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。 |
MEMORY_ONLY_2 | 高 | 低 | 是 | 否 | 数据存2份 |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化。这种方式更加节省内存 |
MEMORY_ONLY_SER_2 | 低 | 高 | 是 | 否 | 数据序列化,数据存2份 |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果数据在内存中放不下,则溢写到磁盘 |
MEMORY_AND_DISK_2 | 高 | 中等 | 部分 | 部分 | 数据存2份 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化 |
MEMORY_AND_DISK_SER_2 | 低 | 高 | 部分 | 部分 | 数据存2份 |
DISK_ONLY | 低 | 高 | 否 | 是 | 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。 |
DISK_ONLY_2 | 低 | 高 | 否 | 是 | 数据存2份 |
OFF_HEAP | 这个目前是试验型选项,类似MEMORY_ONLY_SER,但是数据是存储在堆外内存的。 |
对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉了,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。
RDD的血缘关系
血缘关系是指 RDD 之间的依赖关系。当你对一个 RDD 执行转换操作时,Spark 会生成一个新的 RDD,并记录这两个 RDD 之间的依赖关系。这种依赖关系就是血缘关系。
血缘关系可以帮助 Spark 在发生故障时恢复数据。当一个分区丢失时,Spark 可以根据血缘关系重新计算丢失的分区,而不需要从头开始重新计算整个 RDD。
血缘关系还可以帮助 Spark 优化计算过程。Spark 可以根据血缘关系合并多个连续的窄依赖转换,减少数据传输和通信开销。
我们可以执行toDebugString
打印RDD的依赖关系。
下面是一个简单的例子:
val conf = new SparkConf().setAppName("Lineage Example").setMaster("local") val sc = new SparkContext(conf) val data = sc.parallelize(List(1, 2, 3, 4, 5)) val mappedData = data.map(x => x + 1) val filteredData = mappedData.filter(x => x % 2 == 0) println(filteredData.toDebugString)
在这个例子中,我们首先创建了一个包含 5 个元素的 RDD,并对它执行了两个转换操作:map
和 filter
。然后,我们使用 toDebugString
方法打印了最终 RDD 的血缘关系。
运行这段代码后,你会看到类似下面的输出:
(2) MapPartitionsRDD[2] at filter at <console>:26 [] | MapPartitionsRDD[1] at map at <console>:24 [] | ParallelCollectionRDD[0] at parallelize at <console>:22 []
这个输出表示最终的 RDD 是通过两个转换操作(map
和 filter
)从原始的 ParallelCollectionRDD
转换而来的。