大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


Spark Super Word Count 程序 Scala语言编写

将数据写入MySQL、不写入MySQL等编码方式

代码的详细解释与结果

背景介绍

这涉及到进程通信,是需要序列化的,可以简单的认为:SparkContext代表Drive

在实际的开发过程中会自定一些RDD的操作,此时需要注意的是:

  • 初始化工作是Driver端进行的
  • 实际运行程序是Executor端进行的

测试代码

遇到问题

class MyClass1(x: Int) {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)
  }
}

解决方案1

case class MyClass2(num: Int)
val object2 = MyClass2(20)
rdd1.map(x => object2.num + x).foreach(println)

解决方案2

class MyClass3(x: Int) extends Serializable {
  val num = x
}
val object3 = new MyClass3(30)
rdd1.map(x => object3.num + x).foreach(println)

解决方案3

class MyClass1(x: Int) {
  val num = x
}

lazy val object4 = new MyClass1(40)
rdd1.map(x => object4.num + x).foreach(println)

完整代码

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

class MyClass1(x: Int) {
  val num = x
}

case class MyClass2(num: Int)

class MyClass3(x: Int) extends Serializable {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)

    // 解决方案1 使用 case class
    val object2 = MyClass2(20)
    rdd1.map(x => object2.num + x).foreach(println)

    // 解决方案2 实现 Serializable
    val object3 = new MyClass3(30)
    rdd1.map(x => object3.num + x).foreach(println)

    // 解决方法3 延迟创建
    lazy val object4 = new MyClass1(40)
    rdd1.map(x => object4.num + x).foreach(println)

    sc.stop()
  }
}

注意事项

如果在方法、函数的定义中引入了不可序列化的对象,也会导致任务不能够序列化

延迟创建的解决方案比较简单,且实用性广

RDD依赖关系

基本概念

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。

RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。


RDD和它的依赖的父RDDs的关系有两种不同的类型:

  • 窄依赖(narrow dependency):1:1或n:1
  • 宽依赖(wide dependency):n:m 意味着有 shuflle

RDD任务切分中间分为:Driver program、Job、Stage(TaskSet) 和 Task


Driver program:初始化一个SparkContext即生成一个Spark应用

Job:一个Action算子就会生成一个Job

Stage:根据RDD之间的依赖关系不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage

Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task

Task是Spark中任务调度的最小单位,每个Stage包含许多Task,这些Task执行的计算逻辑是相同的,计算的数据是不同的

DriverProgram -> Job -> Stage -> Task 每一层都是 1 对 N 的关系

再回WordCount

代码部分

你可以用代码执行,也可以在 SparkShell 中执行。

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

object ReWordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkFindFriends")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.textFile("goodtbl.java")
    val rdd2 = rdd1.flatMap(_.split("\\+"))
    val rdd3 = rdd2.map((_, 1))
    val rdd4 = rdd3.reduceByKey(_ + _)
    val rdd5 = rdd4.sortByKey()
    rdd5.count()

    // 查看RDD的血缘关系
    rdd1.toDebugString
    rdd5.toDebugString

    // 查看依赖
    rdd1.dependencies
    rdd1.dependencies(0).rdd

    rdd5.dependencies
    rdd5.dependencies(0).rdd
    
    sc.stop()
  }

}

提出问题

上面的WordCount中,一共有几个Job,几个Stage,几个Task?

答案:1个Job,3个Stage,6个Task

RDD持久化/缓存

基本概念

涉及到的算子:persis、cache、unpersist 都是 Transformation


缓存是将计算结果写入不同的介质,用户定义可定义存储级别(存储级别定义了缓存存储的介质,目前支持内存、堆外内存、磁盘)

通过缓存,Spark避免了RDD上的重复计算,能够极大提升计算的速度。

RDD持久化或缓存,是Spark最重要的特征之一,Spark构建迭代算法和快速交互式查询的关键所在

Spark非常快的原因之一就是在内存、缓存中持久化,当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集进行其他动作(Action),这使得后续更加迅速

使用 persist() 方法将一个RDD标记为持久化,之所以说“标记持久化”是因为使用persist()的地方,并不会马上计算生成RDD并把它持久化,而是要等遇到第一个行动操作出发真正计算后,才会把计算结果进行持久化。

一般情况下,如果多个动作需要用到某个RDD,而它的计算代价又比较高,那么就应该把这个RDD缓存起来

缓存有可能丢失,或者存储在内存由于空间不足而被删除,RDD的缓存的容错机制保证了即使缓存丢失也可以完成计算。通过基于RDD的一系列的转换,丢失的数据会被重算。

RDD各个Partition是相对独立的,因为只需要计算丢失的部分即可,而不是需要重算全部的Partition

持久化级别

使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即

cache() == persist(StorageLevel.Memory.ONLY)

对于其他的存储级别,如下图:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • DISK_ONLY_2
相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
6月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
5月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
306 0
|
2月前
|
存储 并行计算 算法
【动态多目标优化算法】基于自适应启动策略的混合交叉动态约束多目标优化算法(MC-DCMOEA)求解CEC2023研究(Matlab代码实现)
【动态多目标优化算法】基于自适应启动策略的混合交叉动态约束多目标优化算法(MC-DCMOEA)求解CEC2023研究(Matlab代码实现)
168 4
|
5月前
|
机器学习/深度学习 数据挖掘 大数据
大数据集特征工程实践:将54万样本预测误差降低68%的技术路径与代码实现详解
本文通过实际案例演示特征工程在回归任务中的应用效果,重点分析包含数值型、分类型和时间序列特征的大规模表格数据集的处理方法。
201 0
大数据集特征工程实践:将54万样本预测误差降低68%的技术路径与代码实现详解
|
8月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
395 79
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
864 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
10月前
|
存储 缓存 数据处理
Pandas高级数据处理:缓存与持久化
本文介绍 Pandas 中的缓存与持久化技术,涵盖其意义、常见方式及问题解决方案。缓存可提高效率、减少重复计算;持久化则优化资源使用。文中探讨内存缓存、文件系统和数据库持久化,并提供代码示例,如 LRU 缓存、Parquet 格式保存及 SQLite 数据库交互,帮助读者理解和应用这些技术。
397 73
|
9月前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
260 0
【赵渝强老师】Spark RDD的缓存机制
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
246 5
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
539 2

热门文章

最新文章

下一篇
oss云网关配置