开发者学堂课程【大数据Spark2020最新课程(知识精讲与实战演练)第二阶段:RDD 的缓存_缓存的 API】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/689/detail/11999
RDD 的缓存_缓存的 API
缓存有两个方法,第一个叫做 catche,第二个叫做 Persist。
可以使用 catche 方法进行级存
val conf = new SparkConf (). setMaster (" local [6]"). setAppName (" debug _ string ")
val sc= new SparkContext ( conf )
val interimRDD = sc . textFile (" dataset / access _ log _ sample . txt ")
. map ( item =>( item . split (
"")(0),1))
. filter ( item => StringUtils . isNotBlank ( item ._1)). reduceByKey (( curr , agg )=> curr + agg )
. cache ()①
val resultLess = interimRDD . sortBy ( item => item ._2, ascending = true ). first ()
val resultMore = interimRDD . sortBy ( item => item ._2, ascending = false ). first ()
println ( s "出现次数最少的 IP :$ resultLess ,出现次数最多的 IP : SresultMore ")
sc . stop ()
①缓存
方法签名如下
cache (): this . type = persist ()
Catche 其实是 persist 方法的一个别名
也可以使用persist方法进行缓存
val conf = new SparkConf (). setMaster (" local [6]"). setAppName (" debug _ string ")
val sc= new SparkContext ( conf )
val interimRDD = sc . textFile (" dataset / access _ log _ sample . txt ")
. map ( item =>( item . split (
"")(0),1))
. filter ( item => StringUtils . isNotBlank ( item ._1)). reduceByKey((curr,agg)=>curr+agg)
. cache ()①
val resultLess = interimRDD . sortBy ( item => item ._2, ascending = true ). first ()
val resultMore = interimRDD . sortBy ( item => item ._2, ascending = false ). first ()
println ( s "出现次数最少的 IP :$ resultLess ,出现次数最多的 IP : SresultMore ")
sc . stop ()
①缓存
方法签名如下
cache (): this . type = persist ()
有一个方法叫做 catche,catche 是 RDD 的一个方法,在 RDD 上调用 catche 把 RDD 缓存掉,这个 RDD 会变为一个被缓存过的 RDD。这个方法后面是 persist,所以 catche 可以理解为是 persist 方法的一个别名。
还有一个方法叫做 persist,Persist 和 catche 的调用方法是一模一样的,在一个 RDD 上调用 persist 这个方法。调用这个方法以后,persist 也会变成一个缓存过的 RDD,它们之间的区别是 Persist 可以向其中传入一个存储级别,可以修改缓存的存储级别。
Persist 方法有一个重载,有一个方法的声明叫做 Persist,还有一个叫做 Persist 后面可以传一个 newlevel ,这就是Persist 的方法。其实在用 Persist 方法的时候,Persist 的不传存储级别的这个方法和 catche 是一模一样的,Persist其实就是可以传入级别的一个 Persist。
进入代码以后还是在 Cache Op 这个文件当中复制刚才所编写的内容。这个新的方法把注意力放在缓存上不需要再去看周边的一些东西。首先从这个文件当中读出 SourceRDD,接下来对这个 sourceRDD 做了一个初始词频的赋予,得到了一个 countRDD,接下来对它进行一些清洗,最后对它进行了一个聚合。
前面这一部分是 RDD 的处理部分。后面这个地方是有两个 RDD 的 action 操作,所以其实现在主要在去优化的就是这两个 Action 操作,因为任何一个 Action 都会完整运行一下这个 RDD 的依赖的整个血统。
在 First 调用的时候它是一个 Action,直接拿到这个 aggRDD,把 aggRDD 改成一个 var。
接下来调用 Catche 方法,把这个 aggRDD 再给它赋予回来。注意 catche 也是一个算子,也是一个操作。所以在调用完这个 Catche 以后还要再去返回一个新的 RDD,他也会生成一个新的 RDD。拿到这个 RDD 在进行处理的时候才会真正的使用到那个被缓存的 RDD。所以这个方法也改名叫做 Catche。
结果注意到,这是 lessIP,是最大的 IP 和最小的 IP,能注意到前面是有一个时间的,对比一下这个程序的运行和前面的程序的运行这个程序的运行的速度会更快一些。
但是这个快的不是特别明显,因为现在数据量比较小,但是如果你的数据量相对较大,会发现时间上会节省很多,那么这就是 catcheKPI 的使用。
这次再给它拷贝出来,拷贝出来以后不再使用这个 Catche,可以使用一个叫做 Persist 的方法。
点进去看一下 catche, 进入到上个方法当中点一下 catche 方法,大家注意到这个 catche 方法其实就是一个Persist。
这个时候注意到 Catche 方法其实就是 Persist 方法的一个别名,在 catche 方法当中其实也就返回了一个 Persist,一个就是调用 Persist 这样的方法。在 persist 的方法当中,发现它其实是调用了里面一个参数的 persist 方法。
首先这个方法名要改一下,然后 Persist 方法可以传入一个存储级别,这个存储级别可以通过这个 Storage 来去调用,这个 storagelevel 是一个枚举。可以只存到内存当中或者存在磁盘当中,可以先暂时选择一个 memory only,找到这个 Catche,然后再找到 Persist,默认其实就是一个 memory only 这样的一个存储级别,然后回到 CacheO 当中,接下来去运行一下这个方法。
整个的这个编写方式其实就说了三个方法,一个是 Catche,还有一个是无参 persist,还有一个是有参 persist。