调用CoarseGrainedExecutorBackend的生命周期方法以及作用是什么?
CoarseGrainedExecutorBackend的生命周期方法 调用CoarseGrainedExecutorBackend的生命周期方法,在preStart()方法主要代码如下:
//TODO 生命周期方法,首先和Driver建立连接 override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) //TODO Executor向Driver发送消息,注册Executor driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }
这个方法主要是和Driver进行通信,向Driver发送信息,注册Executor我们这里需要看的是DriverActor的代码
//TODO Executor向DriverActor发送的消息 case RegisterExecutor(executorId, hostPort, cores, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { //TODO DriverActor向Executor发送消息,告诉Executor注册失败 sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) //TODO DriverActor向Executor发送消息,告诉Executor注册成功 sender ! RegisteredExecutor
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
//TODO 查看是否有任务需要提交(DriverActor->Executor)
makeOffers()
}
这里进行一个判断后,向Executor发送注册成功后,然后调用makeOffers()查看是否有任务需要提交。这里我们首先看DriverActor向Executor 向Executor发送消息,表示注册成功,然后再次查看makeOffers()方法。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。