大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节完成的内容如下:


Spark Super Word Count 程序 Scala语言编写

将数据写入MySQL、不写入MySQL等编码方式

代码的详细解释与结果

背景介绍

这涉及到进程通信,是需要序列化的,可以简单的认为:SparkContext代表Drive

在实际的开发过程中会自定一些RDD的操作,此时需要注意的是:

  • 初始化工作是Driver端进行的
  • 实际运行程序是Executor端进行的

测试代码

遇到问题

class MyClass1(x: Int) {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)
  }
}

解决方案1

case class MyClass2(num: Int)
val object2 = MyClass2(20)
rdd1.map(x => object2.num + x).foreach(println)

解决方案2

class MyClass3(x: Int) extends Serializable {
  val num = x
}
val object3 = new MyClass3(30)
rdd1.map(x => object3.num + x).foreach(println)

解决方案3

class MyClass1(x: Int) {
  val num = x
}

lazy val object4 = new MyClass1(40)
rdd1.map(x => object4.num + x).foreach(println)

完整代码

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

class MyClass1(x: Int) {
  val num = x
}

case class MyClass2(num: Int)

class MyClass3(x: Int) extends Serializable {
  val num = x
}

object SerializableDemo {
  def main (args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SerializableDemo")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 10
    val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)

    // 普通的类不具备序列化能力
    val object1 = new MyClass1(10)
    // 报错 提示无法序列化
    // rdd1.map(x => object1.num + x).foreach(println)

    // 解决方案1 使用 case class
    val object2 = MyClass2(20)
    rdd1.map(x => object2.num + x).foreach(println)

    // 解决方案2 实现 Serializable
    val object3 = new MyClass3(30)
    rdd1.map(x => object3.num + x).foreach(println)

    // 解决方法3 延迟创建
    lazy val object4 = new MyClass1(40)
    rdd1.map(x => object4.num + x).foreach(println)

    sc.stop()
  }
}

注意事项

如果在方法、函数的定义中引入了不可序列化的对象,也会导致任务不能够序列化

延迟创建的解决方案比较简单,且实用性广

RDD依赖关系

基本概念

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。

RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。


RDD和它的依赖的父RDDs的关系有两种不同的类型:

  • 窄依赖(narrow dependency):1:1或n:1
  • 宽依赖(wide dependency):n:m 意味着有 shuflle

RDD任务切分中间分为:Driver program、Job、Stage(TaskSet) 和 Task


Driver program:初始化一个SparkContext即生成一个Spark应用

Job:一个Action算子就会生成一个Job

Stage:根据RDD之间的依赖关系不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage

Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task

Task是Spark中任务调度的最小单位,每个Stage包含许多Task,这些Task执行的计算逻辑是相同的,计算的数据是不同的

DriverProgram -> Job -> Stage -> Task 每一层都是 1 对 N 的关系

再回WordCount

代码部分

你可以用代码执行,也可以在 SparkShell 中执行。

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

object ReWordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("SparkFindFriends")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.textFile("goodtbl.java")
    val rdd2 = rdd1.flatMap(_.split("\\+"))
    val rdd3 = rdd2.map((_, 1))
    val rdd4 = rdd3.reduceByKey(_ + _)
    val rdd5 = rdd4.sortByKey()
    rdd5.count()

    // 查看RDD的血缘关系
    rdd1.toDebugString
    rdd5.toDebugString

    // 查看依赖
    rdd1.dependencies
    rdd1.dependencies(0).rdd

    rdd5.dependencies
    rdd5.dependencies(0).rdd
    
    sc.stop()
  }

}

提出问题

上面的WordCount中,一共有几个Job,几个Stage,几个Task?

答案:1个Job,3个Stage,6个Task

RDD持久化/缓存

基本概念

涉及到的算子:persis、cache、unpersist 都是 Transformation


缓存是将计算结果写入不同的介质,用户定义可定义存储级别(存储级别定义了缓存存储的介质,目前支持内存、堆外内存、磁盘)

通过缓存,Spark避免了RDD上的重复计算,能够极大提升计算的速度。

RDD持久化或缓存,是Spark最重要的特征之一,Spark构建迭代算法和快速交互式查询的关键所在

Spark非常快的原因之一就是在内存、缓存中持久化,当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集进行其他动作(Action),这使得后续更加迅速

使用 persist() 方法将一个RDD标记为持久化,之所以说“标记持久化”是因为使用persist()的地方,并不会马上计算生成RDD并把它持久化,而是要等遇到第一个行动操作出发真正计算后,才会把计算结果进行持久化。

一般情况下,如果多个动作需要用到某个RDD,而它的计算代价又比较高,那么就应该把这个RDD缓存起来

缓存有可能丢失,或者存储在内存由于空间不足而被删除,RDD的缓存的容错机制保证了即使缓存丢失也可以完成计算。通过基于RDD的一系列的转换,丢失的数据会被重算。

RDD各个Partition是相对独立的,因为只需要计算丢失的部分即可,而不是需要重算全部的Partition

持久化级别

使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即

cache() == persist(StorageLevel.Memory.ONLY)

对于其他的存储级别,如下图:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • DISK_ONLY_2
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
2月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
3月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
50 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
43 0
|
3月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
94 0
|
19天前
|
存储 缓存 NoSQL
解决Redis缓存数据类型丢失问题
解决Redis缓存数据类型丢失问题
164 85
|
3月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
87 6
|
16天前
|
缓存 监控 NoSQL
Redis经典问题:缓存穿透
本文详细探讨了分布式系统和缓存应用中的经典问题——缓存穿透。缓存穿透是指用户请求的数据在缓存和数据库中都不存在,导致大量请求直接落到数据库上,可能引发数据库崩溃或性能下降。文章介绍了几种有效的解决方案,包括接口层增加校验、缓存空值、使用布隆过滤器、优化数据库查询以及加强监控报警机制。通过这些方法,可以有效缓解缓存穿透对系统的影响,提升系统的稳定性和性能。
|
2月前
|
缓存 NoSQL 关系型数据库
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
本文详解缓存雪崩、缓存穿透、缓存并发及缓存预热等问题,提供高可用解决方案,帮助你在大厂面试和实际工作中应对这些常见并发场景。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
|
2月前
|
存储 缓存 NoSQL
【赵渝强老师】基于Redis的旁路缓存架构
本文介绍了引入缓存后的系统架构,通过缓存可以提升访问性能、降低网络拥堵、减轻服务负载和增强可扩展性。文中提供了相关图片和视频讲解,并讨论了数据库读写分离、分库分表等方法来减轻数据库压力。同时,文章也指出了缓存可能带来的复杂度增加、成本提高和数据一致性问题。
【赵渝强老师】基于Redis的旁路缓存架构
|
2月前
|
缓存 NoSQL Redis
Redis 缓存使用的实践
《Redis缓存最佳实践指南》涵盖缓存更新策略、缓存击穿防护、大key处理和性能优化。包括Cache Aside Pattern、Write Through、分布式锁、大key拆分和批量操作等技术,帮助你在项目中高效使用Redis缓存。
356 22
下一篇
开通oss服务