前言
在RDD中是不存储数据的,如果一个RDD需要重复使用,只是这个RDD对象是可以重用的,但是数据无法重用,那么需要从头再次执行来获取数据进行计算。Spark为了避免这种重复计算的情况,实现了RDD持久化功能。在Spark中,RDD的持久化算子有三个:cache、persist和checkpoint。
缓存
缓存:
- 数据保存位置:保存在task所在主机的内存/本地磁盘
- 应用场景:某个RDD在多个job中重复使用的场景
如何缓存:
- cache
- persist
Cache缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据缓存在内存中。但是这两个方法被调用时并不会立即缓存,而是触发后面的action算子时,该RDD将会被缓存到计算节点的内存中,提供给后面的算子重用。
语法:rdd.cache()
val conf =new SparkConf().setMaster("local[4]").setAppName("test") val sc=new SparkContext(conf) val rdd1=sc.parallelize(List(1,2,3),4) //进行一个map算子操作 val rdd2=rdd1.map(x=>{ println("*"*10) x*10 }) //添加 rdd2的缓存 val rddx=rdd2.cache() //进行一个reduce算子操作 val rdd3=rddx.map(x=>x+10) val rdd4=rddx.map(x=>x+10) //打印 rdd2 的结果 println(rdd3.collect.toList) // 输出 sum值 println(rdd4.collect.toList)
数据直接从缓存中取,运行结果只会打印三次:
********** ********** **********
Persist缓存
用法
语法:rdd.persist()
val conf =new SparkConf().setMaster("local[4]").setAppName("test") val sc=new SparkContext(conf) val rdd1=sc.parallelize(List(1,2,3),4) //进行一个map算子操作 val rdd2=rdd1.map(x=>{ println("*"*10) x*10 }) //添加 rdd2的缓存 val rddx=rdd2.persist() //进行一个reduce算子操作 val rdd3=rddx.map(x=>x+10) val rdd4=rddx.map(x=>x+10) //打印 rdd2 的结果 println(rdd3.collect.toList) // 输出 sum值 println(rdd4.collect.toList)
运行结果还是3次!
其实,cache源码就是调用的persist无参函数。
def cache(): this.type = persist()
缓存级别
StorageLevel 中定义了persist
的缓存级别。
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
说明:
NONE: 不存储
DISK_ONLY : 只保存在磁盘中
DISK_ONLY_2 : 只保存在磁盘中,数据保存两份
MEMORY_ONLY : 只保存在内存中
MEMORY_ONLY_2 : 只保存在内存中,数据保存两份
MEMORY_ONLY_SER :只保存在内存中,以序列化形式存储
MEMORY_ONLY_SER_2 : 只保存在内存中,以序列化形式存储,数据保存两份
MEMORY_AND_DISK : 数据保存在内存/磁盘中,可以动态调整
MEMORY_AND_DISK_2 : 数据保存在内存/磁盘中,可以动态调整,数据保存两份
MEMORY_AND_DISK_SER :数据保存在内存/磁盘中,可以动态调整,以序列化形式存储
MEMORY_AND_DISK_SER_2 : 数据保存在内存/磁盘中,可以动态调整,以序列化形式存储,数据保存两份
OFF_HEAP :数据保存在堆外内存中
常用的存储级别:
- MEMORY_ONLY<只适用于小数据量场景>
- MEMORY_AND_DISK<适用于大数据量场景>
注意:
- 缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
- Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
CheckPoint检查点
检查点:其实就是通过将RDD中间结果写入磁盘(比如分布式文件系统HDFS),同时切断RDD之间的血缘关系。
由于血缘关系过长会造成容错成本过高,就可以在中间阶段做checkpoint容错,如果checkpoint之后有节点出现问题,就可以从checkpoint开始重做血缘,减少开销。
checkpoint操作之后也是不会马上被执行,必须执行action算子后才能触发。
checkpoint需要指定保存路径,当作业执行完成时,路径中保存的文件是不会被删除的。
示例:
val conf =new SparkConf().setMaster("local[4]").setAppName("test") val sc=new SparkContext(conf) //设置ck存储路径 sc.setCheckpointDir("hdfs://hadoop1:9820/output/a") val rdd1=sc.parallelize(List(1,2,3),4) //进行一个map算子操作 val rdd2=rdd1.map(x=>{ println("*"*10) x*10 }) //将rdd2作为检查点 rdd2.checkpoint() //进行一个reduce算子操作 val rdd3=rdd2.map(x=>x+10) val rdd4=rdd2.map(x=>x+10) //打印 rdd2 的结果 println(rdd3.collect.toList) // 输出 sum值 println(rdd4.collect.toList) //释放 rddx.unpersist(true) //关闭链接 sc.stop()
运行结果却打印了6次!这是什么原因呢?
最终在checkpoint的源码中找到了答案:
sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
当程序运行到这里,会再执行一个job任务,并将数据保存到缓存中。该job操作是在checkpoint所属RDD第一个job执行完成之后才会触发。
明明只需要跑一次的任务就可以缓存,现在需要多跑一次,有没有办法只跑一次呢?答案是肯定有的,其实cache可以checkpoint配合使用
val conf =new SparkConf().setMaster("local[4]").setAppName("test") val sc=new SparkContext(conf) //设置ck存储路径 sc.setCheckpointDir("hdfs://hadoop1:9820/output/a") val rdd1=sc.parallelize(List(1,2,3),4) //进行一个map算子操作 val rdd2=rdd1.map(x=>{ println("*"*10) x*10 }) //将rdd2作为检查点 val rddx=rdd2.cache() rddx.checkpoint() //进行一个reduce算子操作 val rdd3=rddx.map(x=>x+10) val rdd4=rddx.map(x=>x+10) //打印 rdd2 的结果 println(rdd3.collect.toList) // 输出 sum值 println(rdd4.collect.toList) //释放 rddx.unpersist(true) //关闭链接 sc.stop()
运行结果就只会跑一次了!
注意:checkpoint
还是会运行一个job,但是程序不用从头开始了,而是直接从rddx
中取。
总结:
1、为了避免checkpoint触发的job重复执行之前的数据处理逻辑,可以在checkpoint之间将rdd通过cache缓存数据,后续checkpoint触发的job就可以直接使用缓存的数据。
2、使用cache时,job结束之后,缓存会被自动释放。
3、使用checkpoint时,需要手动进行释放,需要设置unpersist
为true默认为false。
三个算子的区别
1、cache和persist只是将数据保存起来,不会切断血缘依赖;而checkpoint会切断RDD之间的血缘依赖。
2、cache是将数据临时保存在内存中进行数据重用,可靠性低;
persist是可以将数据临时保存在磁盘文件或者内存中进行数据重用,作业执行完毕,临时保存的文件就会丢失,可靠性低;
checkpoint是将数据永久保存在HDFS等容错、高可用的文件系统,可靠性高。
3、建议对某个RDD执行checkpoint之前,对该RDD执行cache,这样checkpoint的job只需从cache缓存中读取数据并上传到HDFS中即可,不需要重新计算。
4、如果使用完了缓存,可以通过unpersist()方法释放缓存
结语
好了,今天就为大家分享到这里了。咱们下期见!
如果本文对你有帮助的话,欢迎点赞&收藏&分享,这对我继续分享&创作优质文章非常重要。感谢🙏🏻