本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.10节创建和启动ExecutorAllocationManager,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看
3.10 创建和启动ExecutorAllocationManager
ExecutorAllocationManager用于对已分配的Executor进行管理,创建和启动Executor-AllocationManager的代码如下。
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
Some(new ExecutorAllocationManager(this, listenerBus, conf))
} else {
None
}
executorAllocationManager.foreach(_.start())
默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态添加、删除Executor。并且通过Thread不断添加Executor,遍历Executor,将超时的Executor杀掉并移除。ExecutorAllocationListener的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码见代码清单3-47。
代码清单3-47 ExecutorAllocationManager的关键代码
private val intervalMillis: Long = 100
private var clock: Clock = new RealClock
private val listener = new ExecutorAllocationListener
def start(): Unit = {
listenerBus.addListener(listener)
startPolling()
}
private def startPolling(): Unit = {
val t = new Thread {
override def run(): Unit = {
while (true) {
try {
schedule()
} catch {
case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
}
Thread.sleep(intervalMillis)
}
}
}
t.setName("spark-dynamic-executor-allocation")
t.setDaemon(true)
t.start()
}
根据3.4.1节的内容,我们知道listenerBus内置了线程listenerThread,此线程不断从eventQueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerBus的start方法,代码如下。
listenerBus.start()