Spark RDD持久化的三种方式

简介: Spark RDD持久化的三种方式

前言

在RDD中是不存储数据的,如果一个RDD需要重复使用,只是这个RDD对象是可以重用的,但是数据无法重用,那么需要从头再次执行来获取数据进行计算。Spark为了避免这种重复计算的情况,实现了RDD持久化功能。在Spark中,RDD的持久化算子有三个:cache、persist和checkpoint。

缓存

缓存:

  • 数据保存位置:保存在task所在主机的内存/本地磁盘
  • 应用场景:某个RDD在多个job中重复使用的场景

如何缓存:

  • cache
  • persist

Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据缓存在内存中。但是这两个方法被调用时并不会立即缓存,而是触发后面的action算子时,该RDD将会被缓存到计算节点的内存中,提供给后面的算子重用。

语法:rdd.cache()

val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(1,2,3),4)
//进行一个map算子操作
val rdd2=rdd1.map(x=>{
    println("*"*10)
    x*10
})
//添加 rdd2的缓存
val rddx=rdd2.cache()
//进行一个reduce算子操作
val rdd3=rddx.map(x=>x+10)
val rdd4=rddx.map(x=>x+10)
//打印 rdd2 的结果
println(rdd3.collect.toList)
// 输出 sum值
println(rdd4.collect.toList)

数据直接从缓存中取,运行结果只会打印三次:

**********
**********
**********

Persist缓存

用法

语法:rdd.persist()

val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(1,2,3),4)
//进行一个map算子操作
val rdd2=rdd1.map(x=>{
    println("*"*10)
    x*10
})
//添加 rdd2的缓存
val rddx=rdd2.persist()
//进行一个reduce算子操作
val rdd3=rddx.map(x=>x+10)
val rdd4=rddx.map(x=>x+10)
//打印 rdd2 的结果
println(rdd3.collect.toList)
// 输出 sum值
println(rdd4.collect.toList)

运行结果还是3次!

其实,cache源码就是调用的persist无参函数。

def cache(): this.type = persist()

缓存级别

StorageLevel 中定义了persist的缓存级别。

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

说明:

NONE: 不存储

DISK_ONLY : 只保存在磁盘中

DISK_ONLY_2 : 只保存在磁盘中,数据保存两份

MEMORY_ONLY : 只保存在内存中

MEMORY_ONLY_2 : 只保存在内存中,数据保存两份

MEMORY_ONLY_SER :只保存在内存中,以序列化形式存储

MEMORY_ONLY_SER_2 : 只保存在内存中,以序列化形式存储,数据保存两份

MEMORY_AND_DISK : 数据保存在内存/磁盘中,可以动态调整

MEMORY_AND_DISK_2 : 数据保存在内存/磁盘中,可以动态调整,数据保存两份

MEMORY_AND_DISK_SER :数据保存在内存/磁盘中,可以动态调整,以序列化形式存储

MEMORY_AND_DISK_SER_2 : 数据保存在内存/磁盘中,可以动态调整,以序列化形式存储,数据保存两份

OFF_HEAP :数据保存在堆外内存中

常用的存储级别:

  • MEMORY_ONLY<只适用于小数据量场景>
  • MEMORY_AND_DISK<适用于大数据量场景>

注意:

  • 缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
  • Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

CheckPoint检查点

检查点:其实就是通过将RDD中间结果写入磁盘(比如分布式文件系统HDFS),同时切断RDD之间的血缘关系。

由于血缘关系过长会造成容错成本过高,就可以在中间阶段做checkpoint容错,如果checkpoint之后有节点出现问题,就可以从checkpoint开始重做血缘,减少开销。

checkpoint操作之后也是不会马上被执行,必须执行action算子后才能触发。

checkpoint需要指定保存路径,当作业执行完成时,路径中保存的文件是不会被删除的。

示例:

val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
//设置ck存储路径
sc.setCheckpointDir("hdfs://hadoop1:9820/output/a")
val rdd1=sc.parallelize(List(1,2,3),4)
//进行一个map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//将rdd2作为检查点
rdd2.checkpoint()
//进行一个reduce算子操作
val rdd3=rdd2.map(x=>x+10)
val rdd4=rdd2.map(x=>x+10)
//打印 rdd2 的结果
println(rdd3.collect.toList)
// 输出 sum值
println(rdd4.collect.toList)
//释放
rddx.unpersist(true)
//关闭链接
sc.stop()

运行结果却打印了6次!这是什么原因呢?

最终在checkpoint的源码中找到了答案:

sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

当程序运行到这里,会再执行一个job任务,并将数据保存到缓存中。该job操作是在checkpoint所属RDD第一个job执行完成之后才会触发。

明明只需要跑一次的任务就可以缓存,现在需要多跑一次,有没有办法只跑一次呢?答案是肯定有的,其实cache可以checkpoint配合使用

val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
//设置ck存储路径
sc.setCheckpointDir("hdfs://hadoop1:9820/output/a")
val rdd1=sc.parallelize(List(1,2,3),4)
//进行一个map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//将rdd2作为检查点
val rddx=rdd2.cache()
rddx.checkpoint()
//进行一个reduce算子操作
val rdd3=rddx.map(x=>x+10)
val rdd4=rddx.map(x=>x+10)
//打印 rdd2 的结果
println(rdd3.collect.toList)
// 输出 sum值
println(rdd4.collect.toList)
//释放
rddx.unpersist(true)
//关闭链接
sc.stop()

运行结果就只会跑一次了!

注意:checkpoint还是会运行一个job,但是程序不用从头开始了,而是直接从rddx中取。

总结:

1、为了避免checkpoint触发的job重复执行之前的数据处理逻辑,可以在checkpoint之间将rdd通过cache缓存数据,后续checkpoint触发的job就可以直接使用缓存的数据。

2、使用cache时,job结束之后,缓存会被自动释放。

3、使用checkpoint时,需要手动进行释放,需要设置unpersist为true默认为false。

三个算子的区别

1、cache和persist只是将数据保存起来,不会切断血缘依赖;而checkpoint会切断RDD之间的血缘依赖。

2、cache是将数据临时保存在内存中进行数据重用,可靠性低;

persist是可以将数据临时保存在磁盘文件或者内存中进行数据重用,作业执行完毕,临时保存的文件就会丢失,可靠性低;

checkpoint是将数据永久保存在HDFS等容错、高可用的文件系统,可靠性高。

3、建议对某个RDD执行checkpoint之前,对该RDD执行cache,这样checkpoint的job只需从cache缓存中读取数据并上传到HDFS中即可,不需要重新计算。

4、如果使用完了缓存,可以通过unpersist()方法释放缓存

结语

好了,今天就为大家分享到这里了。咱们下期见!

如果本文对你有帮助的话,欢迎点赞&收藏&分享,这对我继续分享&创作优质文章非常重要。感谢🙏🏻

相关文章
|
1月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
2月前
|
SQL 消息中间件 分布式计算
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(一)
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(一)
55 5
|
2月前
|
分布式计算 大数据 数据处理
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(二)
大数据-84 Spark 集群 RDD创建 RDD-Transformation操作算子 详解(二)
56 4
|
2月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
45 4
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
49 4
|
2月前
|
JSON 分布式计算 大数据
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出
42 1
|
2月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
45 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
38 0
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
85 0