本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.13节创建DAGSchedulerSource和BlockManagerSource,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看
3.13 创建DAGSchedulerSource和BlockManagerSource
在创建DAGSchedulerSource、BlockManagerSource之前首先调用taskScheduler的post-StartHook方法,其目的是为了等待backend就绪,见代码清单3-53。postStartHook的实现见代码清单3-54。
创建DAGSchedulerSource和BlockManagerSource的过程类似于ExecutorSource,只不过DAGSchedulerSource测量的信息是stage. failedStages、stage. runningStages、stage. waiting-Stages、stage. allJobs、stage. activeJobs,BlockManagerSource测量的信息是memory. maxMem_MB、memory. remainingMem_MB、memory. memUsed_MB、memory. diskSpace-Used_MB。
代码清单3-53 创建DAGSchedulerSource和BlockManagerSource
taskScheduler.postStartHook()
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriverMetrics()
代码清单3-54 postStartHook的实现
override def postStartHook() {
waitBackendReady()
}
private def waitBackendReady(): Unit = {
if (backend.isReady) {
return
}
while (!backend.isReady) {
synchronized {
this.wait(100)
}
}
}