《深入理解Spark:核心思想与源码分析》——3.8节TaskScheduler的启动

简介:

本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.8节TaskScheduler的启动,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看

3.8 TaskScheduler的启动
3.6节介绍了任务调度器TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码如下。
taskScheduler.start()
TaskScheduler在启动的时候,实际调用了backend的start方法。

override def start() {
        backend.start()
    }

以LocalBackend为例,启动LocalBackend时向actorSystem注册了LocalActor,见代码清单3-30所示。
3.8.1 创建LocalActor
创建LocalActor的过程主要是构建本地的Executor,见代码清单3-36。
代码清单3-36 LocalActor的实现

private[spark] class LocalActor(scheduler: TaskSchedulerImpl, executorBackend: LocalBackend,
    private val totalCores: Int) extends Actor with ActorLogReceive with Logging {
    import context.dispatcher   // to use Akka's scheduler.scheduleOnce()
    private var freeCores = totalCores
    private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
    private val localExecutorHostname = "localhost"

    val executor = new Executor(
        localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)

    override def receiveWithLogging = {
        case ReviveOffers =>
            reviveOffers()

        case StatusUpdate(taskId, state, serializedData) =>
            scheduler.statusUpdate(taskId, state, serializedData)
            if (TaskState.isFinished(state)) {
                freeCores += scheduler.CPUS_PER_TASK
                reviveOffers()
            }

        case KillTask(taskId, interruptThread) =>
            executor.killTask(taskId, interruptThread)

        case StopExecutor =>
            executor.stop()
    }

}

Executor的构建,见代码清单3-37,主要包括以下步骤。
1)创建并注册ExecutorSource。ExecutorSource是做什么的呢?笔者将在3.8.2节详细介绍。
2)获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置Executor中的ActorSystem的端口号。
3)创建并注册ExecutorActor。ExecutorActor负责接受发送给Executor的消息。
4)urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。
5)创建Executor执行Task的线程池。此线程池用于执行任务。
6)启动Executor的心跳线程。此线程用于向Driver发送心跳。
此外,还包括Akka发送消息的帧大小(10 485 760字节)、结果总大小的字节限制(1 073 741 824字节)、正在运行的task的列表、设置serializer的默认ClassLoader为创建的ClassLoader等。
代码清单3-37 Executor的构建

   val executorSource = new ExecutorSource(this, executorId)
private val env = {
        if (!isLocal) {
            val port = conf.getInt("spark.executor.port", 0)
            val _env = SparkEnv.createExecutorEnv(
                conf, executorId, executorHostname, port, numCores, isLocal, actorSystem)
            SparkEnv.set(_env)
            _env.metricsSystem.registerSource(executorSource)
            _env.blockManager.initialize(conf.getAppId)
            _env
        } else {
            SparkEnv.get
        }
    }

    private val executorActor = env.actorSystem.actorOf(
        Props(new ExecutorActor(executorId)), "ExecutorActor")

    private val urlClassLoader = createClassLoader()
    private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
    env.serializer.setDefaultClassLoader(urlClassLoader)

    private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
    private val maxResultSize = Utils.getMaxResultSize(conf)

    val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
    private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
    startDriverHeartbeater()

3.8.2 ExecutorSource的创建与注册
ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、threadpool.completeTasks、threadpool.currentPool_size、thread-pool.maxPool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.write_ops等,ExecutorSource的实现见代码清单3-38。Metric接口的具体实现,参考附录D。
代码清单3-38 ExecutorSource的实现

private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source {
    private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
        FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption

    private def registerFileSystemStat[T](
            scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
        metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
            override def getValue: T = fileStats(scheme).map(f).getOrElse (defaultValue)
        })
    }
    override val metricRegistry = new MetricRegistry()
    override val sourceName = "executor"

metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
        override def getValue: Int = executor.threadPool.getActiveCount()
    })
    metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
        override def getValue: Long = executor.threadPool.getCompletedTaskCount()
    })
    metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
        override def getValue: Int = executor.threadPool.getPoolSize()
    })
    metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
        override def getValue: Int = executor.threadPool.getMaximumPoolSize()
    })

    // Gauge for file system stats of this executor
    for (scheme <- Array("hdfs", "file")) {
        registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
        registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
        registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
        registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
        registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
    }
}

创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将Source注册到MetricRegistry,见代码清单3-39。关于MetricRegistry,具体参阅附录D。
代码清单3-39 MetricsSystem注册Source的实现

def registerSource(source: Source) {
    sources += source
    try {
        val regName = buildRegistryName(source)
        registry.register(regName, source.metricRegistry)
    } catch {
        case e: IllegalArgumentException => logInfo("Metrics already registered", e)
    }
}

3.8.3 ExecutorActor的构建与注册
ExecutorActor很简单,当接收到SparkUI发来的消息时,将所有线程的栈信息发送回去,代码实现如下。

override def receiveWithLogging = {
    case TriggerThreadDump =>
        sender ! Utils.getThreadDump()
}
``
**3.8.4 Spark自身ClassLoader的创建**
获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下加载,最后创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,见代码清单3-40。
代码清单3-40 Spark自身ClassLoader的创建

private def createClassLoader(): MutableURLClassLoader = {

val currentLoader = Utils.getContextOrSparkClassLoader

val urls = currentJars.keySet.map { uri =>
    new File(uri.split("/").last).toURI.toURL
}.toArray
val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
userClassPathFirst match {
    case true => new ChildExecutorURLClassLoader(urls, currentLoader)
    case false => new ExecutorURLClassLoader(urls, currentLoader)
}

}
Utils.getContextOrSparkClassLoader的实现见附录A。ExecutorURLClassLoader或者Child-ExecutorURLClassLoader实际上都继承了URLClassLoader,见代码清单3-41。
代码清单3-41 ChildExecutorURLClassLoader和ExecutorLIRLClassLoader的实现
private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)

extends MutableURLClassLoader {

private object userClassLoader extends URLClassLoader(urls, null){
    override def addURL(url: URL) {
        super.addURL(url)
    }
override def findClass(name: String): Class[_] = {
    super.findClass(name)
}

}

private val parentClassLoader = new ParentClassLoader(parent)

override def findClass(name: String): Class[_] = {

try {
    userClassLoader.findClass(name)
} catch {
    case e: ClassNotFoundException => {
        parentClassLoader.loadClass(name)
    }
}

}

def addURL(url: URL) {
    userClassLoader.addURL(url)
}

def getURLs() = {
    userClassLoader.getURLs()
}

}

private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)

extends URLClassLoader(urls, parent) with MutableURLClassLoader {

override def addURL(url: URL) {
    super.addURL(url)
}

}
如果需要REPL交互,还会调用addReplClassLoaderIfNeeded创建replClassLoader,见代码清单3-42。
代码清单3-42 addReplClassLoaderIfNeeded的实现
private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {

val classUri = conf.get("spark.repl.class.uri", null)
if (classUri != null) {
    logInfo("Using REPL class URI: " + classUri)
    val userClassPathFirst: java.lang.Boolean =
    conf.getBoolean("spark.files.userClassPathFirst", false)
try {
    val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
        .asInstanceOf[Class[_ <: ClassLoader]]
    val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
        classOf[ClassLoader], classOf[Boolean])
    constructor.newInstance(conf, classUri, parent, userClassPathFirst)
} catch {
    case _: ClassNotFoundException =>
        logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
        System.exit(1)
        null
    }
} else {
    parent
}

}

**3.8.5 启动Executor的心跳线程**
Executor的心跳由startDriverHeartbeater启动,见代码清单3-43。Executor心跳线程的间隔由属性spark.executor.heartbeatInterval配置,默认是10 000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒,使用actorSystem.actorSelection (url)方法查找到匹配的Actor引用, url是akka.tcp://sparkDriver@ $driverHost:$driverPort/user/Heartbeat-Receiver,最终创建一个运行过程中,每次会休眠10 000~20 000毫秒的线程。此线程从runningTasks获取最新的有关Task的测量信息,将其与executorId、blockManagerId封装为Heartbeat消息,向HeartbeatReceiver发送Heartbeat消息。
代码清单3-43 启动Executor的心跳线程

def startDriverHeartbeater() {

val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
val timeout = AkkaUtils.lookupTimeout(conf)
val retryAttempts = AkkaUtils.numRetries(conf)
val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf,env.actorSystem)
val t = new Thread() {
    override def run() {
        // Sleep a random interval so the heartbeats don't end up in sync
        Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
        while (!isStopped) {
            val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
            val curGCTime = gcTime
            for (taskRunner <- runningTasks.values()) {
                if (!taskRunner.attemptedTask.isEmpty) {
                    Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
                        metrics.updateShuffleReadMetrics
                        metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
                        if (isLocal) {
                            val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
                            tasksMetrics += ((taskRunner.taskId, copiedMetrics))
                    } else {
                        // It will be copied by serialization
                        tasksMetrics += ((taskRunner.taskId, metrics))
                    }
                }
            }
        }
        val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
        try {
            val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
                retryAttempts, retryIntervalMs, timeout)
            if (response.reregisterBlockManager) {
                logWarning("Told to re-register on heartbeat")
                env.blockManager.reregister()
            }
        } catch {
            case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
        }

Thread.sleep(interval)

        }
    }
}
t.setDaemon(true)
t.setName("Driver Heartbeater")
t.start()

}

这个心跳线程的作用是什么呢?其作用有两个:
更新正在处理的任务的测量信息;
通知BlockManagerMaster,此Executor上的BlockManager依然活着。
下面对心跳线程的实现详细分析下,读者可以自行选择是否需要阅读。
初始化TaskSchedulerImpl后会创建心跳接收器HeartbeatReceiver。HeartbeatReceiver接收所有分配给当前Driver Application的Executor的心跳,并将Task、Task计量信息、心跳等交给TaskSchedulerImpl和DAGScheduler作进一步处理。创建心跳接收器的代码如下。

private val heartbeatReceiver = env.actorSystem.actorOf(

Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

HeartbeatReceiver在收到心跳消息后,会调用TaskScheduler的executorHeartbeatReceived方法,代码如下。
override def receiveWithLogging = {

case Heartbeat(executorId, taskMetrics, blockManagerId) =>
    val response = HeartbeatResponse(
        !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
    sender ! response

}
executorHeartbeatReceived的实现代码如下。
val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {

taskMetrics.flatMap { case (id, metrics) =>
    taskIdToTaskSetId.get(id)
        .flatMap(activeTaskSets.get)
        .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))
}

}
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
这段程序通过遍历taskMetrics,依据taskIdToTaskSetId和activeTaskSets找到TaskSet-Manager。然后将taskId、TaskSetManager.stageId、TaskSetManager .taskSet.attempt、TaskMetrics封装到类型为Array[(Long, Int, Int, TaskMetrics)]的数组metricsWithStageIds中。最后调用了dag-Scheduler的executorHeartbeatReceived方法,其实现如下。
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
implicit val timeout = Timeout(600 seconds)

Await.result(

blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId),
timeout.duration).asInstanceOf[Boolean]

dagScheduler将executorId、metricsWithStageIds封装为SparkListenerExecutorMetricsUpdate事件,并post到listenerBus中,此事件用于更新Stage的各种测量数据。最后给BlockManagerMaster持有的BlockManagerMasterActor发送BlockManagerHeartbeat消息。BlockManagerMasterActor在收到消息后会匹配执行heartbeatReceived方法(参见4.3.1节)。heartbeatReceived最终更新BlockManagerMaster对BlockManger的最后可见时间(即更新Block-ManagerId对应的BlockManagerInfo的_lastSeenMs,见代码清单3-44)。
代码清单3-44 BlockManagerMasterActor的心跳处理
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {

if (!blockManagerInfo.contains(blockManagerId)) {
    blockManagerId.isDriver && !isLocal
} else {
    blockManagerInfo(blockManagerId).updateLastSeenMs()
    true
}

}

local模式下Executor的心跳通信过程,可以用图3-3来表示。
在非local模式中,Executor发送心跳的过程是一样的,主要的区别是Executor进程与Driver不在同一个进程,甚至不在同一个节点上。
接下来会初始化块管理器BlockManager,代码如下。

<div style="text-align: center">
 <img src="https://yqfile.alicdn.com/93f63c658f44effe7c306f7f257d9daf7ab51f0d.png" >
</div>
相关文章
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
2041 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
941 0
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
1292 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1627 0
|
调度 算法
Spark2.4.0源码分析之WorldCount 任务调度器(七)
- 理解TaskSet是如何提交到任务调度器池,任务集如何被调度 - 理解Worker可用资源算法,Worker可用资源分配任务调度池中的任务 - 任务发送给executor去执行
899 0
|
分布式计算 Spark Hadoop
Spark2.4.0源码分析之WorldCount Stage提交(DAGScheduler)(六)
- 理解ShuffuleMapStage是如何转化为ShuffleMapTask并作为TaskSet提交 - 理解ResultStage是如何转化为ResultTask并作为TaskSet提交
1185 0
|
分布式计算 Apache Spark
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)
理解FinalStage是如何按stage从前到后依次提交顺序
2234 0
|
缓存 分布式计算 Scala
Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)
理解FinalStage的转化(即Stage的划分)
900 0
|
分布式计算 Spark
Spark2.4.0源码分析之WorldCount 事件循环处理器(三)
理解DAG事件循环处理器处理事件流程
1049 0
|
分布式计算
Spark2.4.0源码分析之WorldCount 触发作业提交(二)
Final RDD作为参数,通过RDD.collect()函数触发作业提交
1380 0