开发者社区> 问答> 正文

调用CoarseGrainedExecutorBackend的生命周期方法以及作用是什么?

调用CoarseGrainedExecutorBackend的生命周期方法以及作用是什么?

展开
收起
游客fbdr25iajcjto 2021-12-06 22:16:39 595 0
1 条回答
写回答
取消 提交回答
  • CoarseGrainedExecutorBackend的生命周期方法 调用CoarseGrainedExecutorBackend的生命周期方法,在preStart()方法主要代码如下:

    //TODO 生命周期方法,首先和Driver建立连接 override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) //TODO Executor向Driver发送消息,注册Executor driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }

    这个方法主要是和Driver进行通信,向Driver发送信息,注册Executor我们这里需要看的是DriverActor的代码

    //TODO Executor向DriverActor发送的消息 case RegisterExecutor(executorId, hostPort, cores, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { //TODO DriverActor向Executor发送消息,告诉Executor注册失败 sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) //TODO DriverActor向Executor发送消息,告诉Executor注册成功 sender ! RegisteredExecutor

          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val (host, _) = Utils.parseHostPort(hostPort)
          val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            executorDataMap.put(executorId, data)
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          //TODO 查看是否有任务需要提交(DriverActor->Executor)
          makeOffers()
        }
    

    这里进行一个判断后,向Executor发送注册成功后,然后调用makeOffers()查看是否有任务需要提交。这里我们首先看DriverActor向Executor 向Executor发送消息,表示注册成功,然后再次查看makeOffers()方法。

    2021-12-06 22:17:35
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
建立联系方法之一 立即下载
对象的生命期管理 立即下载
fibjs 模块重构从回调到协程--陈垒 立即下载