《深入理解Spark:核心思想与源码分析》——3.10节创建和启动ExecutorAllocationManager

简介:

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.10节创建和启动ExecutorAllocationManager,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.10 创建和启动ExecutorAllocationManager

ExecutorAllocationManager用于对已分配的Executor进行管理,创建和启动Executor-AllocationManager的代码如下。
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
    if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
        Some(new ExecutorAllocationManager(this, listenerBus, conf))
    } else {
        None
    }
executorAllocationManager.foreach(_.start())

默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态添加、删除Executor。并且通过Thread不断添加Executor,遍历Executor,将超时的Executor杀掉并移除。ExecutorAllocationListener的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码见代码清单3-47。
代码清单3-47 ExecutorAllocationManager的关键代码

private val intervalMillis: Long = 100
private var clock: Clock = new RealClock
private val listener = new ExecutorAllocationListener
def start(): Unit = {
    listenerBus.addListener(listener)
    startPolling()
}

private def startPolling(): Unit = {
    val t = new Thread {
        override def run(): Unit = {
            while (true) {
                try {
                    schedule()
                } catch {
                    case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
                }
                Thread.sleep(intervalMillis)
            }
        }
    }
    t.setName("spark-dynamic-executor-allocation")
    t.setDaemon(true)
    t.start()
}

根据3.4.1节的内容,我们知道listenerBus内置了线程listenerThread,此线程不断从eventQueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerBus的start方法,代码如下。

listenerBus.start()
相关文章
|
分布式计算 Spark
【Spark】【RDD】从本地文件系统创建RDD
【Spark】【RDD】从本地文件系统创建RDD
151 0
【Spark】【RDD】从本地文件系统创建RDD
|
分布式计算 Spark
【Spark】【RDD】从HDFS创建RDD
【Spark】【RDD】从HDFS创建RDD
115 0
【Spark】【RDD】从HDFS创建RDD
|
SQL JSON 分布式计算
Spark SQL DataFrame创建一文详解运用与方法
Spark SQL DataFrame创建一文详解运用与方法
394 0
Spark SQL DataFrame创建一文详解运用与方法
|
分布式计算 Scala Spark
【Spark】【RDD】从内存(集合)创建RDD
【Spark】【RDD】从内存(集合)创建RDD
165 0
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
2041 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
941 0
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
1292 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1627 0
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
899 0