大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


Spark Super Word Count 程序 Scala语言编写

将数据写入MySQL、不写入MySQL等编码方式

代码的详细解释与结果

背景介绍

这涉及到进程通信,是需要序列化的,可以简单的认为:SparkContext代表Drive

在实际的开发过程中会自定一些RDD的操作,此时需要注意的是:

  • 初始化工作是Driver端进行的
  • 实际运行程序是Executor端进行的

测试代码

遇到问题

class MyClass1(x: Int) {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)
  }
}

解决方案1

case class MyClass2(num: Int)
val object2 = MyClass2(20)
rdd1.map(x => object2.num + x).foreach(println)

解决方案2

class MyClass3(x: Int) extends Serializable {
  val num = x
}
val object3 = new MyClass3(30)
rdd1.map(x => object3.num + x).foreach(println)

解决方案3

class MyClass1(x: Int) {
  val num = x
}

lazy val object4 = new MyClass1(40)
rdd1.map(x => object4.num + x).foreach(println)

完整代码

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

class MyClass1(x: Int) {
  val num = x
}

case class MyClass2(num: Int)

class MyClass3(x: Int) extends Serializable {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)

    // 解决方案1 使用 case class
    val object2 = MyClass2(20)
    rdd1.map(x => object2.num + x).foreach(println)

    // 解决方案2 实现 Serializable
    val object3 = new MyClass3(30)
    rdd1.map(x => object3.num + x).foreach(println)

    // 解决方法3 延迟创建
    lazy val object4 = new MyClass1(40)
    rdd1.map(x => object4.num + x).foreach(println)

    sc.stop()
  }
}

注意事项

如果在方法、函数的定义中引入了不可序列化的对象,也会导致任务不能够序列化

延迟创建的解决方案比较简单,且实用性广

RDD依赖关系

基本概念

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。

RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。


RDD和它的依赖的父RDDs的关系有两种不同的类型:

  • 窄依赖(narrow dependency):1:1或n:1
  • 宽依赖(wide dependency):n:m 意味着有 shuflle

RDD任务切分中间分为:Driver program、Job、Stage(TaskSet) 和 Task


Driver program:初始化一个SparkContext即生成一个Spark应用

Job:一个Action算子就会生成一个Job

Stage:根据RDD之间的依赖关系不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage

Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task

Task是Spark中任务调度的最小单位,每个Stage包含许多Task,这些Task执行的计算逻辑是相同的,计算的数据是不同的

DriverProgram -> Job -> Stage -> Task 每一层都是 1 对 N 的关系

再回WordCount

代码部分

你可以用代码执行,也可以在 SparkShell 中执行。

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

object ReWordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkFindFriends")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.textFile("goodtbl.java")
    val rdd2 = rdd1.flatMap(_.split("\\+"))
    val rdd3 = rdd2.map((_, 1))
    val rdd4 = rdd3.reduceByKey(_ + _)
    val rdd5 = rdd4.sortByKey()
    rdd5.count()

    // 查看RDD的血缘关系
    rdd1.toDebugString
    rdd5.toDebugString

    // 查看依赖
    rdd1.dependencies
    rdd1.dependencies(0).rdd

    rdd5.dependencies
    rdd5.dependencies(0).rdd
    
    sc.stop()
  }

}

提出问题

上面的WordCount中,一共有几个Job,几个Stage,几个Task?

答案:1个Job,3个Stage,6个Task

RDD持久化/缓存

基本概念

涉及到的算子:persis、cache、unpersist 都是 Transformation


缓存是将计算结果写入不同的介质,用户定义可定义存储级别(存储级别定义了缓存存储的介质,目前支持内存、堆外内存、磁盘)

通过缓存,Spark避免了RDD上的重复计算,能够极大提升计算的速度。

RDD持久化或缓存,是Spark最重要的特征之一,Spark构建迭代算法和快速交互式查询的关键所在

Spark非常快的原因之一就是在内存、缓存中持久化,当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集进行其他动作(Action),这使得后续更加迅速

使用 persist() 方法将一个RDD标记为持久化,之所以说“标记持久化”是因为使用persist()的地方,并不会马上计算生成RDD并把它持久化,而是要等遇到第一个行动操作出发真正计算后,才会把计算结果进行持久化。

一般情况下,如果多个动作需要用到某个RDD,而它的计算代价又比较高,那么就应该把这个RDD缓存起来

缓存有可能丢失,或者存储在内存由于空间不足而被删除,RDD的缓存的容错机制保证了即使缓存丢失也可以完成计算。通过基于RDD的一系列的转换,丢失的数据会被重算。

RDD各个Partition是相对独立的,因为只需要计算丢失的部分即可,而不是需要重算全部的Partition

持久化级别

使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即

cache() == persist(StorageLevel.Memory.ONLY)

对于其他的存储级别,如下图:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • DISK_ONLY_2
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
52 0
|
1月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
69 0
|
1月前
|
数据采集 数据可视化 大数据
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
这篇文章介绍了如何使用Python中的matplotlib和numpy库来创建箱线图,以检测和处理数据集中的异常值。
43 1
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
|
16天前
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
17天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
38 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
1月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
37 0
|
1月前
|
缓存 分布式计算 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(一)
45 0
|
1月前
|
分布式计算 算法 大数据
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式(二)
50 0