RDD上的持久化操作

简介: RDD上的持久化操作

1. 实验室名称:

大数据实验教学系统

2. 实验项目名称:

RDD上的持久化操作

3. 实验学时:

4. 实验原理:

Spark非常重要的一个功能特性就是可以将RDD 持久化在内存中,当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition,这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD ,而不需要计算多次该RDD

 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。

 要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。

 cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。

5. 实验目的:

掌握RDD Cache缓存的实现。

 掌握RDD persist持久化和unpersist实现。

 掌握RDD checkpoint检查点设置。


6. 实验内容:

1、在内存中缓存RDD。

 2、设置checkpoint检查点。


7. 实验器材(设备、虚拟机名称):

硬件:x86_64 ubuntu 16.04服务器

软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1,Scala-2.11.11


8. 实验步骤:

8.1 环境准备

1 启动Spark集群

 在终端窗口下,输入以下命令,启动Spark集群:

1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh

然后使用jps命令查看进程,确保Spark的Master进程和Worker进程已经启动。


8.2 启动HDFS集群,并上传数据文件

1、在终端窗口下,输入以下命令,启动HDFS集群:

1.  $ start-dfs.sh

然后使用jps命令查看进程,确保HDFS的NameNode进程和DataNode进程已经启动。

 2、在终端窗口下,输入以下命令,将本实验要用到的数据文件上传到HDFS上:

1.  $ hdfs dfs -mkdir -p /data/dataset
2.  $ hdfs dfs -put /data/dataset/resources /data/dataset/

8.3 启动zeppelin服务器

在终端窗口下,输入以下命令,启动zeppelin服务器:

1.  $ zeppelin-daemon.sh start

然后使用jps命令查看进程,确保zeppelin服务器已经启动。


8.4 创建notebook文档

1、首先启动浏览器,在地址栏中输入以下url地址,连接zeppelin服务器:

http://localhost:9090

 2、如果zeppelin服务器已正确建立连接,则会看到如下的zeppelin notebook首页:

5558b711ef7a4f09a45feaa4f4ec307d.png


3、点击【Create new note】链接,创建一个新的笔记本,并命名为”rdd_demo”,解释器默认使用”spark”,如下图所示:

4900ba52dd8b4011b85a27defe354d42.png


8.5 在内存中缓存RDD

将多次调用的RDD缓存起来,可以避免多次访问数据源。

 1、查看创建的rdd是否缓存。在zeppelin中执行如下代码:

1.  // 数据路径
2.  val data = "/data/dataset/resources/people.txt"
3.       
4.  // 创建RDD
5.  val rdd = sc.parallelize(data)
6.       
7.  // 判断是否缓存
8.  rdd.getStorageLevel.useMemory

同时按下【shift+enter】,执行以上代码,输出内容如下:

false

 2、对rdd进行缓存。在zeppelin中执行如下代码:

1.  // 数据路径
2.  val data = "/data/dataset/resources/people.txt"
3.       
4.  // 创建RDD
5.  val rdd = sc.parallelize(data)
6.       
7.  // RDD创建缓存
8.  val rdd1 = rdd.cache()
9.       
10. // 判断RDD是否缓存
11. rdd1.getStorageLevel.useMemory

同时按下【shift+enter】,执行以上代码,输出内容如下:

true

 3、查看RDD的缓存。在zeppelin中执行如下代码:


1.  val data = "/data/dataset/resources/people.txt"
2.       
3.  // 创建RDD
4.  val rdd = sc.parallelize(data)
5.       
6.  // RDD创建缓存
7.  val rdd1 = rdd.cache()
8.       
9.  // 查看RDD的存储级别
10. rdd1.getStorageLevel

同时按下【shift+enter】,执行以上代码,输出内容如下:

StorageLevel(memory, deserialized, 1 replicas)

 4、清除RDD的缓存。在zeppelin中执行如下代码:

1.  val data = "/data/dataset/resources/people.txt"
2.       
3.  // 创建RDD
4.  val rdd = sc.parallelize(data)
5.       
6.  // RDD创建缓存
7.  val rdd1 = rdd.cache()
8.       
9.  // 清除RDD的缓存
10. rdd1.unpersist()
11.      
12. // 查看RDD是否缓存
13. rdd1.getStorageLevel.useMemory

同时按下【shift+enter】,执行以上代码,输出内容如下:

false


8.6 持久化RDD

在Spark中,RDD采用惰性求值的机制,每次遇到action操作,Spark都会从头重新计算RDD及其所有的依赖。

这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。可以通过持久化(缓存)机制避免这种重复计算的开销。

 1、在zeppelin中编写读取数据代码。代码如下:

1.  // 数据路径
2.  val data_path = "/data/dataset/resources/people.txt"
3.       
4.  // 读取txt文件
5.  val rdd = sc.textFile(data_path)

2、如果不对数据进行持久化,则因为每次都要重新从HDFS读取数据,所以程序运行时间会较长。请在zeppelin中执行如下的代码:

1.  val time1=System.currentTimeMillis()
2.  rdd.count
3.  val time2=System.currentTimeMillis()
4.       
5.  val res=time2-time1
6.  print(res)

为了更好查看程序运行时间,获取开始与结束时的系统当前时间。

 同时按下【shift+enter】,执行以上代码,输出内容如下:

res2: Long = 1212

3、使用cache()函数对RDD进行缓存(数据持久化)。在zeppelin中执行如下的代码:

1.  rdd.cache()

然后再次对rdd执行count操作。在zeppelin中执行如下的代码:

1.  val time1=System.currentTimeMillis()
2.  rdd.count
3.  val time2=System.currentTimeMillis()
4.       
5.  val res2=time2-time1
6.  print(res2)

同时按下【shift+enter】,执行以上代码,输出内容如下:

res2: Long = 139

可以看出,当对rdd进行了缓存后,以后再多次执行count操作的话,就不必再从头加载文件,而是直接在缓存的rdd上进行计算,因此性能有了很大的改善。

 5、当cache()不满足需求时,可以使用persist函数开更改缓存级别,代码如下:

1.  // 导入spark 缓存级别类库
2.  import org.apache.spark.storage.StorageLevel
3.       
4.  val time1=System.currentTimeMillis()
5.       
6.  val data_path = "/data/dataset/resources/people.txt"
7.  val rdd = sc.textFile(data_path)
8.       
9.  # 对rdd进行缓存 参数中可以对级别进行修改
10. rdd.persist(StorageLevel.MEMORY_AND_DISK_2 )
11.      
12. rdd.count
13.      
14. val time2=System.currentTimeMillis()
15.      
16. val res2=time2-time1
17. print(res2)

同时按下【shift+enter】,执行以上代码,输出内容如下:


res2: Long = 295

8.6 checkpoint检查点实现

对于执行大量数据时RDD缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。但是,多次迭代后数据丢失的重新计算,会影响效率。因此,RDD的缓存容错机制保证了即使缓存丢失也能保证快速的恢复,而不是重新计算。

 1、设置检查点目录,用来存储中间结果数据。在zeppelin中执行如下代码:


1.  sc.setCheckpointDir("/data/bigdata/checkpoint_2019")

2、代码提交后检查点目录会自动创建,并且已经创建完成。当rdd执行action操作时触发checkpoint,然将结果保存到目录中去。在终端窗口中执行如下代码,查看创建的检查点目录:

1.  $ hdfs dfs -ls  /data/bigdata/checkpoint_2019/

3、上面结果可以看出在目录有一个文件夹,此文件夹是自动生成的一个id文件夹,用来存储任务结果。对RDD执行count()操作,会把结果缓存到checkpoint_2019目录下。在zeppelin中执行如下代码:

1.  val data_path = "/data/dataset/resources/people.txt"
2.  val rdd = sc.textFile(data_path)
3.       
4.  rdd.checkpoint()
5.  println(rdd.count())

4、再此查看checkpoint_2019目录下的id文件下会发现有part开头的文件生成,说明结果保存成功。(注意:换成本机实际的路径)


hdfs dfs -ls /data/bigdata/checkpoint_2019//a4cb2bf8-f312-4c83-b54c-7e62eb45f827/rdd-25

 5、当再次执行count时,会直接去checkpoint中查找结果,提示一定运行速度。

1.  println(rdd.count())

6、使用checkpoint持久化数据是,最好保存检查目录到hdfs中。


9. 实验结果及分析:

8ebb656fe0e4432985928c76e876b5a8.png


b3bf07bf79f94e5abc5801b2d14b48e9.png



实验结果运行准确,无误


10. 实验结论:

经过本节实验的学习,通过学习RDD上的持久化操作,进一步巩固了我们的Spark基础。


11. 总结及心得体会:

在Spark中,RDD采用惰性求值的机制,每次遇到action操作,Spark都会从头重新计算RDD及其所有的依赖。

这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。可以通过持久化(缓存)机制避免这种重复计算的开销。


12、实验知识测试

下列哪个算子能够触发checkpoint操作()

 A、 map

 B、 flatMap

 C、 filter

 D、 collect


13、实验拓展

自主查阅spark缓存相关的源码


a7039e8387c04315bb09064a369428f7.png

相关文章
|
安全 算法 区块链
花无涯带你走进黑客世界之Tor洋葱网络
Tor本来是为用户提供匿名上网保护用户隐私的工具,但是对于一些用户来说,他们可以利用Tor的隐蔽性进行黑客攻击或非法交易活动。总结Tor的恶意应用主要表现在以下几方面。
830 0
|
Arthas 运维 Java
arthas 的使用场景、优点和缺点
Arthas 是Alibaba开源的Java诊断工具,它可以帮助开发人员或者运维人员查找问题、分析性能和bug追踪。以下是Arthas的一些使用场景: 1. 查看目标服务器应用程序的JVM信息。 2. 方法性能的排查和跟踪。例如,在实际使用过程中发现某个接口很耗时,但是无法在本地环境复现的时候,可以通过Arthas的trace来跟踪,它会输出方法内部路径每个节点的耗时。 3. 查找全局视角查看系统的运行状况、健康状况的信息。 4. 反编译源码,查看JVM加载的是否为预期的文件内容。
1267 0
|
4月前
|
监控 Linux 测试技术
C++零拷贝网络编程实战:从理论到生产环境的性能优化之路
🌟 蒋星熠Jaxonic,技术宇宙中的星际旅人。深耕C++与零拷贝网络编程,从sendfile到DPDK,实战优化服务器性能,毫秒级响应、CPU降60%。分享架构思维,共探代码星辰大海!
|
人工智能 自然语言处理 搜索推荐
智能语音助手的发展与未来:开启人机交互的新篇章
智能语音助手的发展与未来:开启人机交互的新篇章
2183 28
|
7月前
|
缓存 NoSQL 数据库
什么是缓存击穿
缓存击穿是指热点缓存key突然失效,导致大量并发请求直接冲击数据库,造成巨大压力。常见于高并发场景,如热门商品信息失效时。解决方法包括设置热点key永不过期、使用分布式锁、预热数据、熔断降级等,以保障系统稳定性。
807 0
|
缓存 算法 数据挖掘
深入理解缓存更新策略:从LRU到LFU
【10月更文挑战第7天】 在本文中,我们将探讨计算机系统中缓存机制的核心——缓存更新策略。缓存是提高数据检索速度的关键技术之一,无论是在硬件还是软件层面都扮演着重要角色。我们会详细介绍最常用的两种缓存算法:最近最少使用(LRU)和最少使用频率(LFU),并讨论它们的优缺点及适用场景。通过对比分析,旨在帮助读者更好地理解如何选择和实现适合自己需求的缓存策略,从而优化系统性能。
539 3
|
缓存 NoSQL 关系型数据库
redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿
本文深入探讨了Redis缓存的相关知识,包括缓存的概念、使用场景、可能出现的问题(缓存预热、缓存穿透、缓存雪崩、缓存击穿)及其解决方案。
973 0
redis和缓存及相关问题和解决办法 什么是缓存预热、缓存穿透、缓存雪崩、缓存击穿
|
10月前
|
BI Windows
目前企业用得比较多的AD域管理工具是什么?
随着互联网发展,传统工作习惯已无法满足高效需求。企业AD域管理中,人工处理方式效率低下,而AD域管理工具成为优选。ManageEngine卓豪ADManager Plus是一款广受青睐的AD域管理软件,具备高效事件处理能力、强大的报表生成功能及批量用户管理等优势。
281 1
|
存储 算法
算法系列之搜索算法-广度优先搜索BFS
广度优先搜索(BFS)是一种非常强大的算法,特别适用于解决最短路径、层次遍历和连通性问题。在面试中,掌握BFS的基本实现和应用场景,能够帮助你高效解决许多与图或树相关的问题。
1190 1
算法系列之搜索算法-广度优先搜索BFS
|
存储 缓存 安全
阿里云EMR数据湖文件系统: 面向开源和云打造下一代 HDFS
本文作者详细地介绍了阿里云EMR数据湖文件系统JindoFS的起源、发展迭代以及性能。
73191 79