深入理解Spark:核心思想与源码分析. 3.11 ContextCleaner的创建与启动

简介:

3.11 ContextCleaner的创建与启动

ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。

private[spark] val cleaner: Option[ContextCleaner] = {

    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {

        Some(new ContextCleaner(this))

    } else {

        None

    }

}

cleaner.foreach(_.start())

ContextCleaner的组成如下:

referenceQueue:缓存顶级的AnyRef引用;

referenceBuffer:缓存AnyRef的虚引用;

listeners:缓存清理工作的监听器数组;

cleaningThread:用于具体清理工作的线程。

ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-48。

代码清单3-48 keep Cleaning的实现

private def keepCleaning(): Unit = Utils.logUncaughtExceptions {

    while (!stopped) {

        try {

            val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))

                .map(_.asInstanceOf[CleanupTaskWeakReference])

            // Synchronize here to avoid being interrupted on stop()

            synchronized {

                reference.map(_.task).foreach { task =>

                logDebug("Got cleaning task " + task)

                referenceBuffer -= reference.get

                task match {

                    case CleanRDD(rddId) =>

                        doCleanupRDD(rddId, blocking = blockOnCleanupTasks)

                    case CleanShuffle(shuffleId) =>

                        doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)

                    case CleanBroadcast(broadcastId) =>

                        doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)

                    }

                }

            }

        } catch {

            case ie: InterruptedException if stopped => // ignore

            case e: Exception => logError("Error in cleaning thread", e)

        }

    }

}

相关文章
|
分布式计算 Spark
【Spark】【RDD】从本地文件系统创建RDD
【Spark】【RDD】从本地文件系统创建RDD
219 0
【Spark】【RDD】从本地文件系统创建RDD
|
分布式计算 Spark
【Spark】【RDD】从HDFS创建RDD
【Spark】【RDD】从HDFS创建RDD
166 0
【Spark】【RDD】从HDFS创建RDD
|
SQL JSON 分布式计算
Spark SQL DataFrame创建一文详解运用与方法
Spark SQL DataFrame创建一文详解运用与方法
536 0
Spark SQL DataFrame创建一文详解运用与方法
|
分布式计算 Scala Spark
【Spark】【RDD】从内存(集合)创建RDD
【Spark】【RDD】从内存(集合)创建RDD
208 0
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
2268 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
1020 0
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
1378 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1766 0
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
961 0

热门文章

最新文章