bigdata-37-SparkRDD

简介: bigdata-37-SparkRDD

RDD持久化原理

Spark中有一个非常重要的功能就是可以对RDD进行持久化。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition数据持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存中缓存的partition数据。

这样的话,针对一个RDD反复执行多个操作的场景,就只需要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。

因为正常情况下这个RDD的数据使用过后内存中是不会一直保存的。

例如这样的操作:针对mapRDD需要多次使用的

val dataRDD = sc.parallelize(Array(1,2,3,4,5))

val mapRDD = dataRDD.map(...)

mapRDD.foreach(...)

mapRDD.saveAsTextFile(...)

mapRDD.collect()

巧妙使用RDD持久化,在某些场景下,对spark应用程序的性能有很大提升。

特别是对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。

要持久化一个RDD,只需要调用它的cache()或者persist()方法就可以了。

在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition数据丢失了,那么Spark会自动通过其源RDD,使用transformation算子重新计算该partition的数据。

cache()和persist()的区别在于:

cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,也就是调用persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,那么可以使用unpersist()方法。

RDD持久化策略

下面看一下目前Spark支持的一些持久化策略

MEMORY_ONLY:以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算。

MEMORY_AND_DISK:当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取,不需要重新计算

MEMORY_ONLY_SER:同MEMORY_ONLY,但是会使用Java的序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是在使用的时候需要进行反序列化,因此会增加CPU开销。

MEMORY_AND_DISK_SER:同MEMORY_AND_DSK。但是会使用序列化方式持久化Java对象。

DISK_ONLY:使用非序列化Java对象的方式持久化,完全存储到磁盘上。

MEMORY_ONLY_2、MEMORY_AND_DISK_2等:如果是尾部加了2的持久化级别,表示会将持久化数据复制一份,保存到其它节点,从而在数据丢失时,不需要重新计算,只需要使用备份数据即可。

如何选择RDD持久化策略

Spark提供了多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。

下面是一些通用的持久化级别的选择建议:

  1. 优先使用MEMORY_ONLY,纯内存速度最快,而且没有序列化不需要消耗CPU进行反序列化操作,缺点就是比较耗内存
  2. MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快,只是在使用的时候需要消耗CPU进行反序列化

如果需要进行数据的快速失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了能不使用DISK相关的策略,就不要使用,因为有的时候,从磁盘读取数据,还不如重新计算一次。

案例:Scala实现RDD持久化

package com.bigdata.scala

import org.apache.spark.{SparkConf, SparkContext}

/**

* 需求:RDD持久化

*/

object PersistRddScala {

def main(args: Array[String]): Unit = {

   val conf = new SparkConf()

   conf.setAppName("PersistRddScala")

     .setMaster("local")

   val sc = new SparkContext(conf)

   //注意cache的用法和位置

   //cache默认是基于内存的持久化

   // cache()=persist()=persist(StorageLevel.MEMORY_ONLY)

   val dataRDD = sc.textFile("/Users/a/keep-learning/test_data/data.txt").cache()

   var start_time = System.currentTimeMillis()

   var count = dataRDD.count()

   println(count)

   var end_time = System.currentTimeMillis()

   println("第一次耗时:"+(end_time-start_time))

   start_time = System.currentTimeMillis()

   count = dataRDD.count()

   println(count)

   end_time = System.currentTimeMillis()

   println("第二次耗时:"+(end_time-start_time))

   sc.stop()

}

}

这里通过对比我们可以看出

在没有添加cache之前,每一次都耗时很长

加上cache之后,第二次计算耗时就很少了

共享变量

共享变量的工作原理

Spark还有一个非常重要的特性就是共享变量

默认情况下,如果在一个算子函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量数据。如果多个task想要共享某个变量,那么这种方式是做不到的。

Spark为此提供了两种共享变量

一种是Broadcast Variable(广播变量)

另一种是Accumulator(累加变量)

Broadcast Variable

Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,而不会为每个task都拷贝一份副本,因此其最大的作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗

通过调用SparkContext的broadcast()方法,针对某个变量创建广播变量

注意:广播变量,是只读的

然后在算子函数内,使用到广播变量时,每个节点只会拷贝一份副本。可以使用广播变量的value()方法获取值。

从左边分析:

如果是普通的外部变量,算子函数内如果使用到,就会往每个task中拷贝一份,假设这个外部变量是一个集合,集合中有上亿数据,每个task都拷贝一份就会极大的增加网络传输的时间以及内存空间的占用。

从右边分析:

假设使用广播变量,每个变量只会拷贝到一个节点上,存在于每个节点的task变量会共享这一广播变量,从而极大的减少网络传输以及内存占用,提升效率。

基于Scala实现广播变量案例

package com.bigdata.scala

import org.apache.spark.{SparkConf, SparkContext}

object BoradcastOpScala {

def main(args: Array[String]): Unit = {

   val conf = new SparkConf()

   conf.setAppName("BoradcastOpScala")

     .setMaster("local")

   val sc = new SparkContext(conf)

   val dataRDD = sc.parallelize(Array(1,2,3,4,5))

   val varable = 2

  //dataRDD.map(_ * varable)

   //1:定义广播变量

   val varableBroadcast = sc.broadcast(varable)

   //2:使用广播变量,调用其value方法

   dataRDD.map(_ * varableBroadcast.value).foreach(println(_))

   sc.stop()

}

}

结果:

Accumulator

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。

正常情况下在Spark的任务中,由于一个算子可能会产生多个task并行执行,所以在这个算子内部执行的聚合计算都是局部的,想要实现多个task进行全局聚合计算,此时需要使用到Accumulator这个共享的累加变量。

注意:Accumulator只提供了累加的功能。在task只能对Accumulator进行累加操作,不能读取它的值。只有在Driver进程中才可以读取Accumulator的值。

package com.bigdata.scala

import org.apache.spark.{SparkConf, SparkContext}

/**

* 需求:使用累加变量

*/

object AccumulatorOpScala {

def main(args: Array[String]): Unit = {

   val conf = new SparkConf()

   conf.setAppName("AccumulatorOpScala")

     .setMaster("local")

   val sc = new SparkContext(conf)

   val dataRDD = sc.parallelize(Array(1,2,3,4,5))

   //这种写法是错误的,因为foreach代码是在worker节点上执行的

   //var total = 0 和 println(total) 是在Driver进程中执行的

   //所以无法实现累加操作

   //并且foreach算子可能会在多个task中执行,这样foreach内部实现的累加也不是最终全局累加的结果

   /*var total = 0

   dataRDD.foreach(num=>total += num)

   println(total)*/

   //所以此时想要实现累加操作就需要使用累加变量了

   //1:定义累加变量

   val sumAccumulator = sc.longAccumulator

   //2:使用累加变量

   dataRDD.foreach(num=>sumAccumulator.add(num))

   //注意:只能在Driver进程中获取累加变量的结果

   println(sumAccumulator.value)

   sc.stop()

}

}

目录
相关文章
淘宝批量复制宝贝提示“当前类目大于48小时发货的发货时间不能大于15天,请调整”怎么解决?
要复制这个宝贝上传到淘宝店铺,只需要重新复制一次,然后在大淘营淘宝宝贝复制专家下载配置的第二步,选择一个小于或等于15天的发货时间(见下图),这样就可以复制宝贝上传到淘宝店铺了。
|
边缘计算 运维 容灾
重磅发布!阿里云发布《应用多活技术白皮书》,并开源首个多活项目AppActive
1月11日,在上海的云原生实战峰会上,阿里云智能研究员丁宇发布了“应用多活技术白皮书”,同时为了推动业界容灾的发展,建立云原生业务容灾标准,阿里云开源了“应用多活”项目AppActive。
62823 101
重磅发布!阿里云发布《应用多活技术白皮书》,并开源首个多活项目AppActive
|
存储 监控 Cloud Native
物联网时代的数据挑战|学习笔记
快速学习物联网时代的数据挑战
305 21
物联网时代的数据挑战|学习笔记
|
jenkins Java 持续交付
Jenkins----CentOS7系统搭安装与卸载Jenkins
Jenkins----CentOS7系统搭安装与卸载Jenkins
1177 2
Jenkins----CentOS7系统搭安装与卸载Jenkins
|
网络协议 物联网 数据安全/隐私保护
Wifi-nodeMCU-esp8266 建立热点同时作为服务器完成设备连接控制 | 学习笔记
快速学习 Wifi-nodeMCU-esp8266 建立热点同时作为服务器完成设备连接控制
Wifi-nodeMCU-esp8266 建立热点同时作为服务器完成设备连接控制 | 学习笔记
|
缓存 前端开发 安全
SpringBoot 开发抖音开放平台获取用户的粉丝统计和短视频数据(二篇)
最近有朋友问起我有没有做过抖音开放平台,让我有了些思考,其实之前做过的。虽然抖音APP很火,但是毕竟不像微信开放平台那样,已沉淀多年,基本上每个API只要肯用心查找,网上都有很多资料可以参考。而抖音开放平台则不然,刚面世不久,资料比较少。即使对于一个开发人员来说,接入第三方接口都大同小异,不会太难,但我还是想把这些记录下来,特别是遇到的坑,会列在下面,一起参考学习。限于水平有限,若有错误,不吝赐教哈。那么,我们就开始正文吧。
1242 0
SpringBoot 开发抖音开放平台获取用户的粉丝统计和短视频数据(二篇)
|
开发工具 Android开发
RxBus 一个简易、非反射的Android事件通知库
RxBus 一个简易、非反射的Android事件通知库
1034 0
RxBus 一个简易、非反射的Android事件通知库
|
安全 Java 开发者
并行流ParallelStream中隐藏的陷阱
这篇文章介绍一下日常开发中并行流ParallelStream中隐藏的陷阱,这个问题其实离我们很近,特别是喜欢使用JDK1.8+的流式编程的伙伴,应该会深有感触。标题中所谓的"陷阱",其实并不是ParallelStream自身的陷阱,而一般是开发者错误使用ParallelStream给自己埋下的陷阱。
781 0
并行流ParallelStream中隐藏的陷阱
|
监控 前端开发 JavaScript
SVG实现流程动态效果
可以通过 div 的方式 拼凑 一张图,然后 js 代码控制不同的 div 部件进行颜色变化。可以通过 canvas 的方式绘制整张图,然后对画布内的元素进行控制。可以找个第三方库,自定义形状 组合 一张图出来,然后进行控制。可以通过 svg 图的方式展示内容,然后对 svg 中的元素进行控制。
514 0
SVG实现流程动态效果
|
机器学习/深度学习 Web App开发 人工智能
当自监督遇上语言-图像预训练,UC伯克利提出多任务框架SLIP
为了探究 CV 领域的自监督学习是否会影响 NLP 领域,来自加州大学伯克利分校和 Facebook AI 研究院的研究者提出了一种结合语言监督和图像自监督的新框架 SLIP。
447 0
当自监督遇上语言-图像预训练,UC伯克利提出多任务框架SLIP

热门文章

最新文章