《深入理解Spark:核心思想与源码分析》——3.11节ContextCleaner的创建与启动

简介:

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.11节ContextCleaner的创建与启动,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.11 ContextCleaner的创建与启动
ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。

private[spark] val cleaner: Option[ContextCleaner] = {
    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
    } else {
        None
    }
}
cleaner.foreach(_.start())
``
ContextCleaner的组成如下:
referenceQueue:缓存顶级的AnyRef引用;
referenceBuffer:缓存AnyRef的虚引用;
listeners:缓存清理工作的监听器数组;
cleaningThread:用于具体清理工作的线程。
ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-48。
代码清单3-48 keep Cleaning的实现

private def keepCleaning(): Unit = Utils.logUncaughtExceptions {

while (!stopped) {
    try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
            .map(_.asInstanceOf[CleanupTaskWeakReference])
        // Synchronize here to avoid being interrupted on stop()
        synchronized {
            reference.map(_.task).foreach { task =>
            logDebug("Got cleaning task " + task)
            referenceBuffer -= reference.get
            task match {
                case CleanRDD(rddId) =>
                    doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
                case CleanShuffle(shuffleId) =>
                    doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
                case CleanBroadcast(broadcastId) =>
                    doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
                }
            }
        }
    } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
    }
}

}

相关文章
|
分布式计算 Spark
【Spark】【RDD】从本地文件系统创建RDD
【Spark】【RDD】从本地文件系统创建RDD
156 0
【Spark】【RDD】从本地文件系统创建RDD
|
分布式计算 Spark
【Spark】【RDD】从HDFS创建RDD
【Spark】【RDD】从HDFS创建RDD
117 0
【Spark】【RDD】从HDFS创建RDD
|
SQL JSON 分布式计算
Spark SQL DataFrame创建一文详解运用与方法
Spark SQL DataFrame创建一文详解运用与方法
398 0
Spark SQL DataFrame创建一文详解运用与方法
|
分布式计算 Scala Spark
【Spark】【RDD】从内存(集合)创建RDD
【Spark】【RDD】从内存(集合)创建RDD
167 0
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
2050 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
943 0
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
1296 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1632 0
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
905 0