Spark RDD概念学习系列之RDD的checkpoint(九)

简介:

RDD的检查点

  首先,要清楚。为什么spark要引入检查点机制?引入RDD的检查点?

   如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,Spark又引入检查点机制。

 

 

      RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。但是,如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,Spark又引入检查点(checkpoint)机制。

 

 

 

 

RDD的缓存和RDD的checkpoint的区别

      RDD的缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存、本地文件系统和Tachyon)写入不同的介质。

      而RDD的检查点不同,它是在计算完成后,重新建立一个Job来计算。

      为了避免重复计算,推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。

    

 

 

 

 

 

RDD的checkpoint的处理

  在缓存没有命中的情况下,首先会判断是否保存了RDD的checkpoint,如果有,则读取checkpoint。为了理解checkpoint的RDD是如何读取计算结果的,需要先看一下checkpoint的数据是如何写入的。
首先在Job结束后,会判断是否需要checkpoint。如果需要,就调用org.apache.spark.rdd.RDDCheckpointData#doCheckpoint。doCheckpoint首先为数据创建一个目录;然后启动一个新的Job来计算,并且将计算结果写入新创建的目录;接着创建一个org.apache.spark.rdd.CheckpointRDD;最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链(compute chain)等信息都被清除。这个处理逻辑中,数据写入的实现在org.apache.spark.rdd.CheckpointRDD$#writeToFile。

 

简要的核心逻辑如下:

复制代码
//创建一个保存checkpoint数据的目录
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)

if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
//创建广播变量 val broadcastedConf = rdd.context.broadcast( new SerializableWritable(rdd.context.hadoopConfiguration))
//开始一个新的Job进行计算,计算结果存入路径path中 rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
//根据结果的路径path来创建CheckpointRDD val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
//保存结果,清除原始RDD的依赖、Partition信息等 RDDCheckpointData.synchronized { cpFile = Some(path.toString) cpRDD = Some(newRDD) // RDDCheckpointData对应的CheckpointRDD rdd.markCheckpointed(newRDD) //清除原始RDD的依赖,Partition cpState = Checkpointed //标记checkpoint的状态为完成 }
复制代码

  至此,RDD的checkpoint完成,其中checkpoint的数据可以通过checkpointRDD的readFromFile读取。但是,上述逻辑在清除了RDD的依赖后,并没有和check-pointRDD建立联系。

 

 

 

 

 

 

  那么Spark是如何确定一个RDD是否被checkpoint了,而且正确读取checkpoint的数据呢?
答案就在org.apache.spark.rdd.RDD#dependencies的实现,它会首先判断当前的RDD是否已经Checkpoint过,如果有,那么RDD的依赖就变成了对应的Ch

复制代码
eckpointRDD:

privatedefcheckpointRDD: Option[RDD[T]]=checkpointData.flatMap(_.checkpointRDD)
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) { //没有checkpoint
dependencies_ = getDependencies
}

 

dependencies_
}
}
复制代码

  理解了Checkpoint的实现过程。

 

 

 

  接下来看一下computeOrReadCheckpoint的实现。前面提到了,它一共在两个地方被调用,org.apache.spark.rdd.RDD#iterator和org.apache.spark.CacheManager#getOrCompute。它实现的逻辑比较简单,首先检查当前RDD是否被Checkpoint过,如果有,读取Checkpoint的数据;否则开始计算。

 

实现如下:

复制代码
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext)
: Iterator[T] =

 

{
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}
firstParent[T].iterator(split,context)会调用对应CheckpointRDD的iterator,最终调用到它的compute:
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file=new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
CheckpointRDD.readFromFile(file, broadcastedConf, context) //

 
读取Checkpoint的数据
}
复制代码

 

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5723758.html,如需转载请自行联系原作者

相关文章
|
4月前
|
机器学习/深度学习 人工智能 自然语言处理
3 秒音频也能克隆?拆解 Spark-TTS 架构的极致小样本学习
本文深入解析了 Spark-TTS 模型的架构与原理,该模型仅需 3 秒语音样本即可实现高质量的零样本语音克隆。其核心创新在于 BiCodec 单流语音编码架构,将语音信号分解为语义 Token 和全局 Token,实现内容与音色解耦。结合大型语言模型(如 Qwen 2.5),Spark-TTS 能直接生成语义 Token 并还原波形,简化推理流程。实验表明,它不仅能克隆音色、语速和语调,还支持跨语言朗读及情感调整。尽管面临相似度提升、样本鲁棒性等挑战,但其技术突破为定制化 AI 声音提供了全新可能。
361 35
|
10月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
183 0
|
11月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
184 0
|
7月前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
286 15
|
7月前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
156 0
【赵渝强老师】Spark RDD的缓存机制
|
11月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
128 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
11月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
295 0
|
11月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
139 0
|
11月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
123 0
|
11月前
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
271 0

热门文章

最新文章