目前有两种情况:
A
val rdd1 = sc.textFile("...")
rdd1.cache() // rdd1缓存
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()
B
val rdd1 = sc.textFile("...")
rdd1.cache() // rdd1缓存
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
rdd1.unpersist()
由于Spark在执行过程中是懒加载模式,RDD转换仅仅是构建DAG描述而不执行,只有遇到action算子才会真正的运行。因此在选项A中,调用cache后,对算子进行转换操作,再调用unpersist时,仍然只有作业描述,而没有正在运行的执行。
我们在实际开发中应该选择B写法。在action算子之前调用cache进行缓存rdd,在action执行完成后才真正的调用unpersist释放缓存。
备注:cache 和 persist调用只是将RDD添加到在作业执行期间标记为持久化的RDD的Map中。但是,unpersist直接告诉blockManger从存储中释放RDD并且删除持久化RDD Map中的引用。