1. 实验室名称:
大数据实验教学系统
2. 实验项目名称:
RDD上的持久化操作
3. 实验学时:
4. 实验原理:
Spark非常重要的一个功能特性就是可以将RDD 持久化在内存中,当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition,这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD ,而不需要计算多次该RDD
巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。
要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。
5. 实验目的:
掌握RDD Cache缓存的实现。
掌握RDD persist持久化和unpersist实现。
掌握RDD checkpoint检查点设置。
6. 实验内容:
1、在内存中缓存RDD。
2、设置checkpoint检查点。
7. 实验器材(设备、虚拟机名称):
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1,Scala-2.11.11
8. 实验步骤:
8.1 环境准备
1 启动Spark集群
在终端窗口下,输入以下命令,启动Spark集群:
1. $ cd /opt/spark 2. $ ./sbin/start-all.sh
然后使用jps命令查看进程,确保Spark的Master进程和Worker进程已经启动。
8.2 启动HDFS集群,并上传数据文件
1、在终端窗口下,输入以下命令,启动HDFS集群:
1. $ start-dfs.sh
然后使用jps命令查看进程,确保HDFS的NameNode进程和DataNode进程已经启动。
2、在终端窗口下,输入以下命令,将本实验要用到的数据文件上传到HDFS上:
1. $ hdfs dfs -mkdir -p /data/dataset 2. $ hdfs dfs -put /data/dataset/resources /data/dataset/
8.3 启动zeppelin服务器
在终端窗口下,输入以下命令,启动zeppelin服务器:
1. $ zeppelin-daemon.sh start
然后使用jps命令查看进程,确保zeppelin服务器已经启动。
8.4 创建notebook文档
1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器:
http://localhost:9090
2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页:
3、点击【Create new note】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示:
8.5 在内存中缓存RDD
将多次调用的RDD缓存起来,可以避免多次访问数据源。
1、查看创建的rdd是否缓存。在zeppelin中执行如下代码:
1. // 数据路径 2. val data = "/data/dataset/resources/people.txt" 3. 4. // 创建RDD 5. val rdd = sc.parallelize(data) 6. 7. // 判断是否缓存 8. rdd.getStorageLevel.useMemory
同时按下【shift+enter】,执行以上代码,输出内容如下:
false
2、对rdd进行缓存。在zeppelin中执行如下代码:
1. // 数据路径 2. val data = "/data/dataset/resources/people.txt" 3. 4. // 创建RDD 5. val rdd = sc.parallelize(data) 6. 7. // RDD创建缓存 8. val rdd1 = rdd.cache() 9. 10. // 判断RDD是否缓存 11. rdd1.getStorageLevel.useMemory
同时按下【shift+enter】,执行以上代码,输出内容如下:
true
3、查看RDD的缓存。在zeppelin中执行如下代码:
1. val data = "/data/dataset/resources/people.txt" 2. 3. // 创建RDD 4. val rdd = sc.parallelize(data) 5. 6. // RDD创建缓存 7. val rdd1 = rdd.cache() 8. 9. // 查看RDD的存储级别 10. rdd1.getStorageLevel
同时按下【shift+enter】,执行以上代码,输出内容如下:
StorageLevel(memory, deserialized, 1 replicas)
4、清除RDD的缓存。在zeppelin中执行如下代码:
1. val data = "/data/dataset/resources/people.txt" 2. 3. // 创建RDD 4. val rdd = sc.parallelize(data) 5. 6. // RDD创建缓存 7. val rdd1 = rdd.cache() 8. 9. // 清除RDD的缓存 10. rdd1.unpersist() 11. 12. // 查看RDD是否缓存 13. rdd1.getStorageLevel.useMemory
同时按下【shift+enter】,执行以上代码,输出内容如下:
false
8.6 持久化RDD
在Spark中,RDD采用惰性求值的机制,每次遇到action操作,Spark都会从头重新计算RDD及其所有的依赖。
这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。可以通过持久化(缓存)机制避免这种重复计算的开销。
1、在zeppelin中编写读取数据代码。代码如下:
1. // 数据路径 2. val data_path = "/data/dataset/resources/people.txt" 3. 4. // 读取txt文件 5. val rdd = sc.textFile(data_path)
2、如果不对数据进行持久化,则因为每次都要重新从HDFS读取数据,所以程序运行时间会较长。请在zeppelin中执行如下的代码:
1. val time1=System.currentTimeMillis() 2. rdd.count 3. val time2=System.currentTimeMillis() 4. 5. val res=time2-time1 6. print(res)
为了更好查看程序运行时间,获取开始与结束时的系统当前时间。
同时按下【shift+enter】,执行以上代码,输出内容如下:
res2: Long = 1212
3、使用cache()函数对RDD进行缓存(数据持久化)。在zeppelin中执行如下的代码:
1. rdd.cache()
然后再次对rdd执行count操作。在zeppelin中执行如下的代码:
1. val time1=System.currentTimeMillis() 2. rdd.count 3. val time2=System.currentTimeMillis() 4. 5. val res2=time2-time1 6. print(res2)
同时按下【shift+enter】,执行以上代码,输出内容如下:
res2: Long = 139
可以看出,当对rdd进行了缓存后,以后再多次执行count操作的话,就不必再从头加载文件,而是直接在缓存的rdd上进行计算,因此性能有了很大的改善。
5、当cache()不满足需求时,可以使用persist函数开更改缓存级别,代码如下:
1. // 导入spark 缓存级别类库 2. import org.apache.spark.storage.StorageLevel 3. 4. val time1=System.currentTimeMillis() 5. 6. val data_path = "/data/dataset/resources/people.txt" 7. val rdd = sc.textFile(data_path) 8. 9. # 对rdd进行缓存 参数中可以对级别进行修改 10. rdd.persist(StorageLevel.MEMORY_AND_DISK_2 ) 11. 12. rdd.count 13. 14. val time2=System.currentTimeMillis() 15. 16. val res2=time2-time1 17. print(res2)
同时按下【shift+enter】,执行以上代码,输出内容如下:
res2: Long = 295
8.6 checkpoint检查点实现
对于执行大量数据时RDD缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。但是,多次迭代后数据丢失的重新计算,会影响效率。因此,RDD的缓存容错机制保证了即使缓存丢失也能保证快速的恢复,而不是重新计算。
1、设置检查点目录,用来存储中间结果数据。在zeppelin中执行如下代码:
1. sc.setCheckpointDir("/data/bigdata/checkpoint_2019")
2、代码提交后检查点目录会自动创建,并且已经创建完成。当rdd执行action操作时触发checkpoint,然将结果保存到目录中去。在终端窗口中执行如下代码,查看创建的检查点目录:
1. $ hdfs dfs -ls /data/bigdata/checkpoint_2019/
3、上面结果可以看出在目录有一个文件夹,此文件夹是自动生成的一个id文件夹,用来存储任务结果。对RDD执行count()操作,会把结果缓存到checkpoint_2019目录下。在zeppelin中执行如下代码:
1. val data_path = "/data/dataset/resources/people.txt" 2. val rdd = sc.textFile(data_path) 3. 4. rdd.checkpoint() 5. println(rdd.count())
4、再此查看checkpoint_2019目录下的id文件下会发现有part开头的文件生成,说明结果保存成功。(注意:换成本机实际的路径)
hdfs dfs -ls /data/bigdata/checkpoint_2019//a4cb2bf8-f312-4c83-b54c-7e62eb45f827/rdd-25
5、当再次执行count时,会直接去checkpoint中查找结果,提示一定运行速度。
1. println(rdd.count())
6、使用checkpoint持久化数据是,最好保存检查目录到hdfs中。
9. 实验结果及分析:
实验结果运行准确,无误
10. 实验结论:
经过本节实验的学习,通过学习RDD上的持久化操作,进一步巩固了我们的Spark基础。
11. 总结及心得体会:
在Spark中,RDD采用惰性求值的机制,每次遇到action操作,Spark都会从头重新计算RDD及其所有的依赖。
这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。可以通过持久化(缓存)机制避免这种重复计算的开销。
12、实验知识测试
下列哪个算子能够触发checkpoint操作()
A、 map
B、 flatMap
C、 filter
D、 collect
13、实验拓展
自主查阅spark缓存相关的源码




