1 缓存函数
在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
可以将RDD数据直接缓存到内存中,函数声明如下:
但是实际项目中,不会直接使用上述的缓存函数,RDD数据量往往很多,内存放不下的。在实
际的项目中缓存RDD数据时,往往使用如下函数,依据具体的业务和数据量,指定缓存的级别:
2 缓存级别
在Spark框架中对数据缓存可以指定不同的级别,对于开发来说至关重要,如下所示:
实际项目中缓存数据时,往往选择如下两种级别:
缓存函数与Transformation函数一样,都是Lazy操作,需要Action函数触发,通常使用count
函数触发。
3 释放缓存
当缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:
4 何时缓存数据
在实际项目开发中,什么时候缓存RDD数据,最好呢???
- 第一点:当某个RDD被使用多次的时候,建议缓存此RDD数据
- 比如,从HDFS上读取网站行为日志数据,进行多维度的分析,最好缓存数据
- 第二点:当某个RDD来之不易,并且使用不止一次,建议缓存此RDD数据
- 比如,从HBase表中读取历史订单数据,与从MySQL表中商品和用户维度信息数据,进行
关联Join等聚合操作,获取RDD:etlRDD,后续的报表分析使用此RDD,此时建议缓存RDD
数据 - 案例:etlRDD.persist(StoageLeval.MEMORY_AND_DISK_2)
演示范例代码:
import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存 */ object SparkCacheTest { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } // 读取文本文件数据 val inputRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data", minPartitions = 2) // 缓存数据: 将数据缓存至内存 inputRDD.cache() inputRDD.persist() // 使用Action函数触发缓存 println(s"Count = ${inputRDD.count()}") // 释放缓存 inputRDD.unpersist() // 缓存数据:选择缓存级别 /* val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) */ inputRDD.persist(StorageLevel.MEMORY_AND_DISK) println(s"count: ${inputRDD.count()}") // 应用程序运行结束,关闭资源 sc.stop() } }
5 RDD Checkpoint
RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。
Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。
在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据
保存到可靠存储(如HDFS)以便数据恢复;
演示范例代码如下:
import org.apache.spark.{SparkConf, SparkContext} /** * RDD数据Checkpoint设置,案例演示 */ object SparkCkptTest { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel("WARN") // TODO: 设置检查点目录,将RDD数据保存到那个目录 sc.setCheckpointDir("datas/spark/ckpt/") // 读取文件数据 val datasRDD = sc.textFile("datas/wordcount/wordcount.data") // TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发 datasRDD.checkpoint() datasRDD.count() // TODO: 再次执行count函数, 此时从checkpoint读取数据 datasRDD.count() // 应用程序运行结束,关闭资源 Thread.sleep(100000) sc.stop() } }
持久化和Checkpoint的区别:
1)、存储位置
Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存);
Checkpoint 可以保存数据到 HDFS 这类可靠的存储上;
2)、生命周期
Cache和Persist的RDD会在程序结束后会被清除或者手动调用unpersist方法;
Checkpoint的RDD在程序结束后依然存在,不会被删除;
3)、Lineage(血统、依赖链、依赖关系)
Persist和Cache,不会丢掉RDD间的依赖链/依赖关系,因为这种缓存是不可靠的,如果出
现了一些错误(例如 Executor 宕机),需要通过回溯依赖链重新计算出来;
Checkpoint会斩断依赖链,因为Checkpoint会把结果保存在HDFS这类存储中,更加的安全可靠,一般不需要回溯依赖链;