深入理解Spark:核心思想与源码分析. 3.3 创建metadataCleaner

简介:

3.3 创建metadataCleaner

SparkContext为了保持对所有持久化的RDD的跟踪,使用类型是TimeStamped-WeakValueHashMap的persistentRdds缓存。metadataCleaner的功能是清除过期的持久化RDD。创建metadataCleaner的代码如下。

private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]

private[spark] val metadataCleaner =

    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

我们仔细看看MetadataCleaner的实现,见代码清单3-14。

代码清单3-14 MetadataCleaner的实现

private[spark] class MetadataCleaner(

        cleanerType: MetadataCleanerType.MetadataCleanerType,

        cleanupFunc: (Long) => Unit,

        conf: SparkConf)

    extends Logging

{

    val name = cleanerType.toString

 

    private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)

    private val periodSeconds = math.max(10, delaySeconds / 10)

    private val timer = new Timer(name + " cleanup timer", true)

 

    private val task = new TimerTask {

        override def run() {

        try {

            cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))

            logInfo("Ran metadata cleaner for " + name)

        } catch {

            case e: Exception => logError("Error running cleanup task for " + name, e)

        }

      }

    }

 

    if (delaySeconds > 0) {

        timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)

    }

 

    def cancel() {

        timer.cancel()

    }

}

从MetadataCleaner的实现可以看出其实质是一个用TimerTask实现的定时器,不断调用cleanupFunc: (Long) => Unit这样的函数参数。构造metadataCleaner时的函数参数是cleanup,用于清理persistentRdds中的过期内容,代码如下。

private[spark] def cleanup(cleanupTime: Long) {

    persistentRdds.clearOldValues(cleanupTime)

}

相关文章
|
分布式计算 Spark
【Spark】【RDD】从本地文件系统创建RDD
【Spark】【RDD】从本地文件系统创建RDD
194 0
【Spark】【RDD】从本地文件系统创建RDD
|
分布式计算 Spark
【Spark】【RDD】从HDFS创建RDD
【Spark】【RDD】从HDFS创建RDD
151 0
【Spark】【RDD】从HDFS创建RDD
|
SQL JSON 分布式计算
Spark SQL DataFrame创建一文详解运用与方法
Spark SQL DataFrame创建一文详解运用与方法
514 0
Spark SQL DataFrame创建一文详解运用与方法
|
分布式计算 Scala Spark
【Spark】【RDD】从内存(集合)创建RDD
【Spark】【RDD】从内存(集合)创建RDD
195 0
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
2201 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
977 0
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
1337 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1711 0
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
941 0

热门文章

最新文章