大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


Spark Super Word Count 程序 Scala语言编写

将数据写入MySQL、不写入MySQL等编码方式

代码的详细解释与结果

背景介绍

这涉及到进程通信,是需要序列化的,可以简单的认为:SparkContext代表Drive

在实际的开发过程中会自定一些RDD的操作,此时需要注意的是:

  • 初始化工作是Driver端进行的
  • 实际运行程序是Executor端进行的

测试代码

遇到问题

class MyClass1(x: Int) {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)
  }
}

解决方案1

case class MyClass2(num: Int)
val object2 = MyClass2(20)
rdd1.map(x => object2.num + x).foreach(println)

解决方案2

class MyClass3(x: Int) extends Serializable {
  val num = x
}
val object3 = new MyClass3(30)
rdd1.map(x => object3.num + x).foreach(println)

解决方案3

class MyClass1(x: Int) {
  val num = x
}

lazy val object4 = new MyClass1(40)
rdd1.map(x => object4.num + x).foreach(println)

完整代码

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

class MyClass1(x: Int) {
  val num = x
}

case class MyClass2(num: Int)

class MyClass3(x: Int) extends Serializable {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)

    // 解决方案1 使用 case class
    val object2 = MyClass2(20)
    rdd1.map(x => object2.num + x).foreach(println)

    // 解决方案2 实现 Serializable
    val object3 = new MyClass3(30)
    rdd1.map(x => object3.num + x).foreach(println)

    // 解决方法3 延迟创建
    lazy val object4 = new MyClass1(40)
    rdd1.map(x => object4.num + x).foreach(println)

    sc.stop()
  }
}

注意事项

如果在方法、函数的定义中引入了不可序列化的对象,也会导致任务不能够序列化

延迟创建的解决方案比较简单,且实用性广

RDD依赖关系

基本概念

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。

RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。


RDD和它的依赖的父RDDs的关系有两种不同的类型:

  • 窄依赖(narrow dependency):1:1或n:1
  • 宽依赖(wide dependency):n:m 意味着有 shuflle

RDD任务切分中间分为:Driver program、Job、Stage(TaskSet) 和 Task


Driver program:初始化一个SparkContext即生成一个Spark应用

Job:一个Action算子就会生成一个Job

Stage:根据RDD之间的依赖关系不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage

Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task

Task是Spark中任务调度的最小单位,每个Stage包含许多Task,这些Task执行的计算逻辑是相同的,计算的数据是不同的

DriverProgram -> Job -> Stage -> Task 每一层都是 1 对 N 的关系

再回WordCount

代码部分

你可以用代码执行,也可以在 SparkShell 中执行。

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

object ReWordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkFindFriends")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.textFile("goodtbl.java")
    val rdd2 = rdd1.flatMap(_.split("\\+"))
    val rdd3 = rdd2.map((_, 1))
    val rdd4 = rdd3.reduceByKey(_ + _)
    val rdd5 = rdd4.sortByKey()
    rdd5.count()

    // 查看RDD的血缘关系
    rdd1.toDebugString
    rdd5.toDebugString

    // 查看依赖
    rdd1.dependencies
    rdd1.dependencies(0).rdd

    rdd5.dependencies
    rdd5.dependencies(0).rdd
    
    sc.stop()
  }

}

提出问题

上面的WordCount中,一共有几个Job,几个Stage,几个Task?

答案:1个Job,3个Stage,6个Task

RDD持久化/缓存

基本概念

涉及到的算子:persis、cache、unpersist 都是 Transformation


缓存是将计算结果写入不同的介质,用户定义可定义存储级别(存储级别定义了缓存存储的介质,目前支持内存、堆外内存、磁盘)

通过缓存,Spark避免了RDD上的重复计算,能够极大提升计算的速度。

RDD持久化或缓存,是Spark最重要的特征之一,Spark构建迭代算法和快速交互式查询的关键所在

Spark非常快的原因之一就是在内存、缓存中持久化,当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集进行其他动作(Action),这使得后续更加迅速

使用 persist() 方法将一个RDD标记为持久化,之所以说“标记持久化”是因为使用persist()的地方,并不会马上计算生成RDD并把它持久化,而是要等遇到第一个行动操作出发真正计算后,才会把计算结果进行持久化。

一般情况下,如果多个动作需要用到某个RDD,而它的计算代价又比较高,那么就应该把这个RDD缓存起来

缓存有可能丢失,或者存储在内存由于空间不足而被删除,RDD的缓存的容错机制保证了即使缓存丢失也可以完成计算。通过基于RDD的一系列的转换,丢失的数据会被重算。

RDD各个Partition是相对独立的,因为只需要计算丢失的部分即可,而不是需要重算全部的Partition

持久化级别

使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即

cache() == persist(StorageLevel.Memory.ONLY)

对于其他的存储级别,如下图:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • DISK_ONLY_2
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
5天前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
1月前
|
缓存 资源调度 持续交付
在清空NPM缓存后,我如何检查是否所有依赖都已正确安装?
【10月更文挑战第5天】在清空NPM缓存后,我如何检查是否所有依赖都已正确安装?
|
1月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
36 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
29 0
|
1月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
49 0
|
1月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
36 0
|
存储 缓存 分布式计算
《Spark与Hadoop大数据分析》——3.5 持久化与缓存
本节书摘来自华章计算机《Spark与Hadoop大数据分析》一书中的第3章,第3.5节,作者 [美]文卡特·安卡姆(Venkat Ankam),译 吴今朝,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1215 0
|
14天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
46 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
58 0