Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: Spark学习--day04、RDD依赖关系、RDD持久化、RDD分区器、RDD文件读取与保存

RDD依赖关系

查看血缘关系

RDD只支持粗粒度转换,每一个转换操作都是对上游RDD的元素执行函数f得到一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系。

将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算丢失的RDD的数据分区所依赖的父RDD分区数据以实现恢复,这样就避免了从头再次开始计算了。

image.png

image.png

package dep

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_dep {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local")
    val sc = new SparkContext(sparkConf)


    val datas: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\2.txt")
    println(datas.dependencies) //OneToOneDependency
    // 窄依赖 : 上游RDD的一个分区的数据只能被下有RDD一个分区的数据独享.
    //println(datas.toDebugString)
    println("-------------------------------")
    val words = datas.flatMap(_.split(" "))
    println(words.dependencies) //OneToOneDependency
    //println(words.toDebugString)
    println("-------------------------------")


    val word2One = words.map((_, 1))
    println(word2One.dependencies) //OneToOneDependency
    //println(word2One.toDebugString)
    println("-------------------------------")

    val wordcount = word2One.reduceByKey(_ + _)
    println(wordcount.dependencies) //ShuffleDependency
    // 宽依赖(shuffle依赖): 上游RDD的一个分区的数据被下游RDD的多个分区共享.
    //println(wordcount.toDebugString)
    println("-------------------------------")


    wordcount.collect().foreach(println)

    //wordcount.collect()


    sc.stop()
  }

}

image.png

RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。

image.png

image.png

RDD 任务划分

RDD任务切分中间分为:Application、Job、Stage和Task

l Application:初始化一个SparkContext即生成一个Application;

l Job:一个Action算子就会生成一个Job;

l Stage:Stage等于宽依赖(ShuffleDependency)的个数加1;

Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

image.png

RDD持久化

1) RDD Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用,LRU算法,当内存过多的时候,spark内存中会自动采用LRU机制去删除元素。

// cache操作会增加血缘关系,不改变原有的血缘关系
println(wordToOneRdd.toDebugString)

// 数据缓存。
wordToOneRdd.cache()

// 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)

存储级别

image.png

image.png

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

image.png

package dep
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_persist {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local")
    val sc = new SparkContext(sparkConf)

    val fileRDD: RDD[String] = sc.textFile("data/word.txt")

    val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))

    val word2OneRDD: RDD[(String, Int)] = wordRDD.map(
      word => {
        println("**********")
        (word, 1)
      }
    )


    //缓存
    //word2OneRDD.cache()

    //持久化
    word2OneRDD.persist(StorageLevel.MEMORY_ONLY)

    val reduceRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_ + _)
    println(reduceRDD.toDebugString)

    reduceRDD.collect().foreach(println)

    println("--------------------------------------------------")

    val groupRDD: RDD[(String, Iterable[Int])] = word2OneRDD.groupByKey()

    println(groupRDD.toDebugString)
    groupRDD.collect().foreach(println)

    sc.stop()
  }

}

RDD CheckPoint检查点

所谓的检查点其实就是通过将RDD中间结果写入磁盘

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少开销。

对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

package dep

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_checkpoint {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local")
    val sc = new SparkContext(sparkConf)

    sc.setCheckpointDir("cp")
    //sc.setCheckpointDir("hdfs://hadoop102:8020/spark/cp")

    val fileRDD: RDD[String] = sc.textFile("data/word.txt")

    val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))

    val word2OneRDD: RDD[(String, Int)] = wordRDD.map(
      word => {
        println("**********")
        (word, 1)
      }
    )


    //缓存
    word2OneRDD.cache()

    //持久化
    //word2OneRDD.persist(StorageLevel.MEMORY_ONLY)

    //检查点  配合 cache使用, 检查点可以从cache中获取数据
    word2OneRDD.checkpoint()

    val reduceRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_ + _)
    println(reduceRDD.toDebugString)

    reduceRDD.collect().foreach(println)

    println("--------------------------------------------------")

    val groupRDD: RDD[(String, Iterable[Int])] = word2OneRDD.groupByKey()

    println(groupRDD.toDebugString)
    groupRDD.collect().foreach(println)

    sc.stop()
  }

}

缓存和检查点区别

1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

4)persist 涉及到磁盘I/O,性能较低,但是数据安全,会独立运行,所以要和cache联合使用

image.png

 RDD分区器

Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。

Ø 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

Ø 每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

1) Hash分区:对于给定的key,计算其hashCode,并除以分区个数取余

1) Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

package dep
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

/**
 * Spark - 分区
 *
 * HashPartitioner:
 *    MR    :  key % 分区数  (key.hashCode() & 2147483647) % numReduceTasks
 *
 *    Kafka : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
 *
 *    Spark : Utils.nonNegativeMod(key.hashCode, numPartitions)
 *
 *
 * HashMap :
 *    key.hashcode  & (length -1 )    ,长度必须是2^n
 *
 *    01010001
 *  &
 *    00000100
 *  ------------------
 *   000000100    ->  4
 *  000000000000  ->  0
 *
 */
object Spark01_RDD_partitioner {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local")
    val sc = new SparkContext(sparkConf)


    val rdd: RDD[(Int, String)] = sc.makeRDD(List(
      (1, "a"), (3, "d"), (2, "b"), (4, "e")
    ), 2)

    //val partitionRDD: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))

    //partitionRDD.saveAsTextFile("output1")

    val myRDD: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))
    myRDD.saveAsTextFile("output2")


    sc.stop()
  }

  class MyPartitioner(num: Int) extends Partitioner {
    //分区数量
    override def numPartitions: Int = num
    //根据数据的key值返回数据的分区索引(从0开始)
    override def getPartition(key: Any): Int = {
      val keyInt: Int = key.asInstanceOf[Int]
//      key match {
//        case 
//      }
      if (keyInt <= 2) {
        0
      } else {
        1
      }

    }
  }

}

 RDD文件读取与保存

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

      文件格式分为:text文件、csv文件、sequence文件以及Object文件;

      文件系统分为:本地文件系统、HDFS、HBASE以及数据库。

text文件

// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
 
// 保存数据
inputRDD.saveAsTextFile("output")

 sequence文件

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。

// 保存数据为SequenceFile

dataRDD.saveAsSequenceFile("output")
 
// 读取SequenceFile文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)

object对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[T: ClassTag](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)
package dep
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_ReadAndWrite {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("wc").setMaster("local")
    val sc = new SparkContext(sparkConf)

    //val rdd: RDD[String] = sc.makeRDD( List("Hello","Spark", "Scala","Hello"),1)

    //写text
    //rdd.saveAsTextFile("textoutput")

    //写sequence
    //rdd.map((_,1)).saveAsSequenceFile("sequenceoutput")

    //写object
    //rdd.saveAsObjectFile("objoutput")


    //读text
    //    val rdd: RDD[String] = sc.textFile("textoutput")
    //    rdd.collect().foreach(println)

    //读sequence
    //    val rdd: RDD[(String, Int)] = sc.sequenceFile[String,Int]("sequenceoutput")
    //    rdd.collect().foreach(println)

    //读object
    val rdd: RDD[String] = sc.objectFile[String]("objoutput")
    rdd.collect().foreach(println)


    sc.stop()
  }

}
相关文章
|
1月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
74 0
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
2月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
65 0
|
2月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
24 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
38 0
|
2月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
87 0
|
存储 缓存 分布式计算
《Spark与Hadoop大数据分析》——3.5 持久化与缓存
本节书摘来自华章计算机《Spark与Hadoop大数据分析》一书中的第3章,第3.5节,作者 [美]文卡特·安卡姆(Venkat Ankam),译 吴今朝,更多章节内容可以访问云栖社区“华章计算机”公众号查看。
1223 0
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
144 2
ClickHouse与大数据生态集成:Spark & Flink 实战