《深入理解Spark:核心思想与源码分析》——3.3节创建metadataCleaner

简介:

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.3节创建metadataCleaner,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.3 创建metadataCleaner
SparkContext为了保持对所有持久化的RDD的跟踪,使用类型是TimeStamped-WeakValueHashMap的persistentRdds缓存。metadataCleaner的功能是清除过期的持久化RDD。创建metadataCleaner的代码如下。

private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
private[spark] val metadataCleaner =
    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
我们仔细看看MetadataCleaner的实现,见代码清单3-14。
代码清单3-14 MetadataCleaner的实现
private[spark] class MetadataCleaner(
        cleanerType: MetadataCleanerType.MetadataCleanerType,
        cleanupFunc: (Long) => Unit,
        conf: SparkConf)
    extends Logging
{
    val name = cleanerType.toString

    private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
    private val periodSeconds = math.max(10, delaySeconds / 10)
    private val timer = new Timer(name + " cleanup timer", true)

    private val task = new TimerTask {
        override def run() {
        try {
            cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
            logInfo("Ran metadata cleaner for " + name)
        } catch {
            case e: Exception => logError("Error running cleanup task for " + name, e)
        }
      }
    }

    if (delaySeconds > 0) {
        timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
    }

    def cancel() {
        timer.cancel()
    }
}
从MetadataCleaner的实现可以看出其实质是一个用TimerTask实现的定时器,不断调用cleanupFunc: (Long) => Unit这样的函数参数。构造metadataCleaner时的函数参数是cleanup,用于清理persistentRdds中的过期内容,代码如下。
private[spark] def cleanup(cleanupTime: Long) {
    persistentRdds.clearOldValues(cleanupTime)
}
相关文章
|
分布式计算 Scala Spark
【Spark】【RDD】从内存(集合)创建RDD
【Spark】【RDD】从内存(集合)创建RDD
113 0
|
分布式计算 Spark
【Spark】【RDD】从本地文件系统创建RDD
【Spark】【RDD】从本地文件系统创建RDD
103 0
【Spark】【RDD】从本地文件系统创建RDD
|
分布式计算 Spark
【Spark】【RDD】从HDFS创建RDD
【Spark】【RDD】从HDFS创建RDD
85 0
【Spark】【RDD】从HDFS创建RDD
|
SQL JSON 分布式计算
Spark SQL DataFrame创建一文详解运用与方法
Spark SQL DataFrame创建一文详解运用与方法
316 0
Spark SQL DataFrame创建一文详解运用与方法
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
1962 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
901 0
|
分布式计算 Spark Hadoop
Spark MapOutputTracker源码分析
## 技能标签 - Spark ShuffleMapTask处理完成后,把MapStatus数据(BlockManagerId,[compressSize])发送给MapOutputTrackerMaster.
1663 0
|
分布式计算 搜索推荐 Spark
Spark 源码分析之ShuffleMapTask内存数据Spill和合并
- Spark ShuffleMapTask 内存中的数据Spill到临时文件 - 临时文件中的数据是如何定入的,如何按partition升序排序,再按Key升序排序写入(key,value)数据 - 每个临时文件,都存入对应的每个分区有多少个(key,value)对,有多少次流提交数组,数组中...
1778 0
|
分布式计算 Scala Spark
Spark源码分析之ResultTask处理
ResultTask 执行当前分区的计算,首先从ShuffleMapTask拿到当前分区的数据,会从所有的ShuffleMapTask都拿一遍当前的分区数据,然后调用reduceByKey自定义的函数进行计算,最后合并所有的ResultTask输出结果,进行输出
2275 0