大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

简介: 大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


Spark Super Word Count 程序 Scala语言编写

将数据写入MySQL、不写入MySQL等编码方式

代码的详细解释与结果

背景介绍

这涉及到进程通信,是需要序列化的,可以简单的认为:SparkContext代表Drive

在实际的开发过程中会自定一些RDD的操作,此时需要注意的是:

  • 初始化工作是Driver端进行的
  • 实际运行程序是Executor端进行的

测试代码

遇到问题

class MyClass1(x: Int) {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)
  }
}

解决方案1

case class MyClass2(num: Int)
val object2 = MyClass2(20)
rdd1.map(x => object2.num + x).foreach(println)

解决方案2

class MyClass3(x: Int) extends Serializable {
  val num = x
}
val object3 = new MyClass3(30)
rdd1.map(x => object3.num + x).foreach(println)

解决方案3

class MyClass1(x: Int) {
  val num = x
}

lazy val object4 = new MyClass1(40)
rdd1.map(x => object4.num + x).foreach(println)

完整代码

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

class MyClass1(x: Int) {
  val num = x
}

case class MyClass2(num: Int)

class MyClass3(x: Int) extends Serializable {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)

    // 解决方案1 使用 case class
    val object2 = MyClass2(20)
    rdd1.map(x => object2.num + x).foreach(println)

    // 解决方案2 实现 Serializable
    val object3 = new MyClass3(30)
    rdd1.map(x => object3.num + x).foreach(println)

    // 解决方法3 延迟创建
    lazy val object4 = new MyClass1(40)
    rdd1.map(x => object4.num + x).foreach(println)

    sc.stop()
  }
}

注意事项

如果在方法、函数的定义中引入了不可序列化的对象,也会导致任务不能够序列化

延迟创建的解决方案比较简单,且实用性广

RDD依赖关系

基本概念

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。

RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。


RDD和它的依赖的父RDDs的关系有两种不同的类型:

  • 窄依赖(narrow dependency):1:1或n:1
  • 宽依赖(wide dependency):n:m 意味着有 shuflle

RDD任务切分中间分为:Driver program、Job、Stage(TaskSet) 和 Task


Driver program:初始化一个SparkContext即生成一个Spark应用

Job:一个Action算子就会生成一个Job

Stage:根据RDD之间的依赖关系不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage

Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task

Task是Spark中任务调度的最小单位,每个Stage包含许多Task,这些Task执行的计算逻辑是相同的,计算的数据是不同的

DriverProgram -> Job -> Stage -> Task 每一层都是 1 对 N 的关系

再回WordCount

代码部分

你可以用代码执行,也可以在 SparkShell 中执行。

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

object ReWordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkFindFriends")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.textFile("goodtbl.java")
    val rdd2 = rdd1.flatMap(_.split("\\+"))
    val rdd3 = rdd2.map((_, 1))
    val rdd4 = rdd3.reduceByKey(_ + _)
    val rdd5 = rdd4.sortByKey()
    rdd5.count()

    // 查看RDD的血缘关系
    rdd1.toDebugString
    rdd5.toDebugString

    // 查看依赖
    rdd1.dependencies
    rdd1.dependencies(0).rdd

    rdd5.dependencies
    rdd5.dependencies(0).rdd
    
    sc.stop()
  }

}

提出问题

上面的WordCount中,一共有几个Job,几个Stage,几个Task?

答案:1个Job,3个Stage,6个Task

RDD持久化/缓存

基本概念

涉及到的算子:persis、cache、unpersist 都是 Transformation


缓存是将计算结果写入不同的介质,用户定义可定义存储级别(存储级别定义了缓存存储的介质,目前支持内存、堆外内存、磁盘)

通过缓存,Spark避免了RDD上的重复计算,能够极大提升计算的速度。

RDD持久化或缓存,是Spark最重要的特征之一,Spark构建迭代算法和快速交互式查询的关键所在

Spark非常快的原因之一就是在内存、缓存中持久化,当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集进行其他动作(Action),这使得后续更加迅速

使用 persist() 方法将一个RDD标记为持久化,之所以说“标记持久化”是因为使用persist()的地方,并不会马上计算生成RDD并把它持久化,而是要等遇到第一个行动操作出发真正计算后,才会把计算结果进行持久化。

一般情况下,如果多个动作需要用到某个RDD,而它的计算代价又比较高,那么就应该把这个RDD缓存起来

缓存有可能丢失,或者存储在内存由于空间不足而被删除,RDD的缓存的容错机制保证了即使缓存丢失也可以完成计算。通过基于RDD的一系列的转换,丢失的数据会被重算。

RDD各个Partition是相对独立的,因为只需要计算丢失的部分即可,而不是需要重算全部的Partition

持久化级别

使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即

cache() == persist(StorageLevel.Memory.ONLY)

对于其他的存储级别,如下图:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • DISK_ONLY_2
相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
12月前
|
存储 缓存 数据处理
Pandas高级数据处理:缓存与持久化
本文介绍 Pandas 中的缓存与持久化技术,涵盖其意义、常见方式及问题解决方案。缓存可提高效率、减少重复计算;持久化则优化资源使用。文中探讨内存缓存、文件系统和数据库持久化,并提供代码示例,如 LRU 缓存、Parquet 格式保存及 SQLite 数据库交互,帮助读者理解和应用这些技术。
523 73
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
246 0
|
11月前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
586 15
|
11月前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
339 0
【赵渝强老师】Spark RDD的缓存机制
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
279 5
|
缓存 监控 安全
检测 Webpack 5 持久化缓存是否存在安全漏洞
【10月更文挑战第23天】通过全面、系统地检测和评估,能够及时发现 Webpack 5 持久化缓存的安全漏洞,并采取有效的措施进行修复,保障项目的安全稳定运行。同时,要持续关注安全技术的发展和变化,不断提升安全检测能力,以应对日益复杂的安全挑战。
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
存储 缓存 监控
配置 Webpack 5 持久化缓存时需要注意哪些安全问题?
【10月更文挑战第23天】通过全面、系统地分析和应对安全问题,能够更好地保障 Webpack 5 持久化缓存的安全,为项目的成功构建和运行提供坚实的安全基础。同时,要保持对安全技术的关注和学习,不断提升安全防范能力,以应对日益复杂的安全挑战。
|
存储 缓存 前端开发
利用 Webpack 5 的持久化缓存来提高构建效率
【10月更文挑战第23天】利用 Webpack 5 的持久化缓存是提高构建效率的有效手段。通过合理的配置和管理,我们可以充分发挥缓存的优势,为项目的构建和开发带来更大的便利和效率提升。你可以根据项目的实际情况,结合以上步骤和方法,进一步优化和完善利用持久化缓存的策略,以达到最佳的构建效果。同时,不断探索和实践新的方法和技术,以适应不断变化的前端开发环境和需求。
|
4月前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
359 14