Spark RDD持久化的三种方式

简介: Spark RDD持久化的三种方式

前言

在RDD中是不存储数据的,如果一个RDD需要重复使用,只是这个RDD对象是可以重用的,但是数据无法重用,那么需要从头再次执行来获取数据进行计算。Spark为了避免这种重复计算的情况,实现了RDD持久化功能。在Spark中,RDD的持久化算子有三个:cache、persist和checkpoint。

缓存

缓存:

  • 数据保存位置:保存在task所在主机的内存/本地磁盘
  • 应用场景:某个RDD在多个job中重复使用的场景

如何缓存:

  • cache
  • persist

Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据缓存在内存中。但是这两个方法被调用时并不会立即缓存,而是触发后面的action算子时,该RDD将会被缓存到计算节点的内存中,提供给后面的算子重用。

语法:rdd.cache()

val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(1,2,3),4)
//进行一个map算子操作
val rdd2=rdd1.map(x=>{
    println("*"*10)
    x*10
})
//添加 rdd2的缓存
val rddx=rdd2.cache()
//进行一个reduce算子操作
val rdd3=rddx.map(x=>x+10)
val rdd4=rddx.map(x=>x+10)
//打印 rdd2 的结果
println(rdd3.collect.toList)
// 输出 sum值
println(rdd4.collect.toList)

数据直接从缓存中取,运行结果只会打印三次:

**********
**********
**********

Persist缓存

用法

语法:rdd.persist()

val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(1,2,3),4)
//进行一个map算子操作
val rdd2=rdd1.map(x=>{
    println("*"*10)
    x*10
})
//添加 rdd2的缓存
val rddx=rdd2.persist()
//进行一个reduce算子操作
val rdd3=rddx.map(x=>x+10)
val rdd4=rddx.map(x=>x+10)
//打印 rdd2 的结果
println(rdd3.collect.toList)
// 输出 sum值
println(rdd4.collect.toList)

运行结果还是3次!

其实,cache源码就是调用的persist无参函数。

def cache(): this.type = persist()

缓存级别

StorageLevel 中定义了persist的缓存级别。

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

说明:

NONE: 不存储

DISK_ONLY : 只保存在磁盘中

DISK_ONLY_2 : 只保存在磁盘中,数据保存两份

MEMORY_ONLY : 只保存在内存中

MEMORY_ONLY_2 : 只保存在内存中,数据保存两份

MEMORY_ONLY_SER :只保存在内存中,以序列化形式存储

MEMORY_ONLY_SER_2 : 只保存在内存中,以序列化形式存储,数据保存两份

MEMORY_AND_DISK : 数据保存在内存/磁盘中,可以动态调整

MEMORY_AND_DISK_2 : 数据保存在内存/磁盘中,可以动态调整,数据保存两份

MEMORY_AND_DISK_SER :数据保存在内存/磁盘中,可以动态调整,以序列化形式存储

MEMORY_AND_DISK_SER_2 : 数据保存在内存/磁盘中,可以动态调整,以序列化形式存储,数据保存两份

OFF_HEAP :数据保存在堆外内存中

常用的存储级别:

  • MEMORY_ONLY<只适用于小数据量场景>
  • MEMORY_AND_DISK<适用于大数据量场景>

注意:

  • 缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
  • Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

CheckPoint检查点

检查点:其实就是通过将RDD中间结果写入磁盘(比如分布式文件系统HDFS),同时切断RDD之间的血缘关系。

由于血缘关系过长会造成容错成本过高,就可以在中间阶段做checkpoint容错,如果checkpoint之后有节点出现问题,就可以从checkpoint开始重做血缘,减少开销。

checkpoint操作之后也是不会马上被执行,必须执行action算子后才能触发。

checkpoint需要指定保存路径,当作业执行完成时,路径中保存的文件是不会被删除的。

示例:

val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
//设置ck存储路径
sc.setCheckpointDir("hdfs://hadoop1:9820/output/a")
val rdd1=sc.parallelize(List(1,2,3),4)
//进行一个map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//将rdd2作为检查点
rdd2.checkpoint()
//进行一个reduce算子操作
val rdd3=rdd2.map(x=>x+10)
val rdd4=rdd2.map(x=>x+10)
//打印 rdd2 的结果
println(rdd3.collect.toList)
// 输出 sum值
println(rdd4.collect.toList)
//释放
rddx.unpersist(true)
//关闭链接
sc.stop()

运行结果却打印了6次!这是什么原因呢?

最终在checkpoint的源码中找到了答案:

sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

当程序运行到这里,会再执行一个job任务,并将数据保存到缓存中。该job操作是在checkpoint所属RDD第一个job执行完成之后才会触发。

明明只需要跑一次的任务就可以缓存,现在需要多跑一次,有没有办法只跑一次呢?答案是肯定有的,其实cache可以checkpoint配合使用

val conf =new SparkConf().setMaster("local[4]").setAppName("test")
val sc=new SparkContext(conf)
//设置ck存储路径
sc.setCheckpointDir("hdfs://hadoop1:9820/output/a")
val rdd1=sc.parallelize(List(1,2,3),4)
//进行一个map算子操作
val rdd2=rdd1.map(x=>{
println("*"*10)
x*10
})
//将rdd2作为检查点
val rddx=rdd2.cache()
rddx.checkpoint()
//进行一个reduce算子操作
val rdd3=rddx.map(x=>x+10)
val rdd4=rddx.map(x=>x+10)
//打印 rdd2 的结果
println(rdd3.collect.toList)
// 输出 sum值
println(rdd4.collect.toList)
//释放
rddx.unpersist(true)
//关闭链接
sc.stop()

运行结果就只会跑一次了!

注意:checkpoint还是会运行一个job,但是程序不用从头开始了,而是直接从rddx中取。

总结:

1、为了避免checkpoint触发的job重复执行之前的数据处理逻辑,可以在checkpoint之间将rdd通过cache缓存数据,后续checkpoint触发的job就可以直接使用缓存的数据。

2、使用cache时,job结束之后,缓存会被自动释放。

3、使用checkpoint时,需要手动进行释放,需要设置unpersist为true默认为false。

三个算子的区别

1、cache和persist只是将数据保存起来,不会切断血缘依赖;而checkpoint会切断RDD之间的血缘依赖。

2、cache是将数据临时保存在内存中进行数据重用,可靠性低;

persist是可以将数据临时保存在磁盘文件或者内存中进行数据重用,作业执行完毕,临时保存的文件就会丢失,可靠性低;

checkpoint是将数据永久保存在HDFS等容错、高可用的文件系统,可靠性高。

3、建议对某个RDD执行checkpoint之前,对该RDD执行cache,这样checkpoint的job只需从cache缓存中读取数据并上传到HDFS中即可,不需要重新计算。

4、如果使用完了缓存,可以通过unpersist()方法释放缓存

结语

好了,今天就为大家分享到这里了。咱们下期见!

如果本文对你有帮助的话,欢迎点赞&收藏&分享,这对我继续分享&创作优质文章非常重要。感谢🙏🏻

相关文章
|
1月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
70 1
|
1月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
41 1
|
1月前
|
分布式计算 Spark
Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
【2月更文挑战第14天】Spark【Spark学习大纲】简介+生态+RDD+安装+使用(xmind分享)
30 1
|
1月前
|
分布式计算 Hadoop Java
Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
【2月更文挑战第14天】Spark【基础知识 03】【RDD常用算子详解】(图片来源于网络)
56 1
|
1月前
|
存储 缓存 分布式计算
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存
39 1
|
2月前
|
分布式计算 并行计算 Hadoop
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
41 1
|
2月前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
120 0
|
3月前
|
缓存 分布式计算 监控
Spark RDD操作性能优化技巧
Spark RDD操作性能优化技巧
|
3月前
|
分布式计算 监控 大数据
Spark RDD分区和数据分布:优化大数据处理
Spark RDD分区和数据分布:优化大数据处理
|
3月前
|
存储 缓存 分布式计算
Spark RDD持久化与缓存:提高性能的关键
Spark RDD持久化与缓存:提高性能的关键