Spark源码分析 – Executor

简介:

ExecutorBackend

很简单的接口

package org.apache.spark.executor
/**
 * A pluggable interface used by the Executor to send updates to the cluster scheduler.
 */
private[spark] trait ExecutorBackend {
  def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
}

 

StandaloneExecutorBackend

维护executor, 并负责注册executor以及executor和driver之间的通信

private[spark] class StandaloneExecutorBackend(
    driverUrl: String,
    executorId: String,
    hostPort: String,
    cores: Int)
  extends Actor
  with ExecutorBackend
  with Logging {
  var executor: Executor = null
  var driver: ActorRef = null

  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    driver = context.actorFor(driverUrl) // 创建driver actor ref, 以便于和driver通信
    driver ! RegisterExecutor(executorId, hostPort, cores) // 向driver注册executor
  }

  override def receive = {
    case RegisteredExecutor(sparkProperties) =>
      logInfo("Successfully registered with driver")
      // Make this host instead of hostPort ? 
      executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) // 当注册成功后, 创建Executor

    case RegisterExecutorFailed(message) =>
      logError("Slave registration failed: " + message)
      System.exit(1)

    case LaunchTask(taskDesc) =>
      logInfo("Got assigned task " + taskDesc.taskId)
      if (executor == null) {
        logError("Received launchTask but executor was null")
        System.exit(1)
      } else {
        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) // 调用executor.launchTask,启动task
      }

    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
      logError("Driver terminated or disconnected! Shutting down.")
      System.exit(1)
  }

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    driver ! StatusUpdate(executorId, taskId, state, data) // 当task状态变化时, 报告给driver actor
  }
}

Executor

对于Executor, 维护一个threadPool, 可以run多个task, 取决于core的个数 
所以对于launchTask, 就是在threadPool中挑个thread去run TaskRunner

private[spark] class Executor(
    executorId: String,
    slaveHostname: String,
    properties: Seq[(String, String)])
  extends Logging
{
  // Initialize Spark environment (using system properties read above)
  val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
  SparkEnv.set(env)

  // Start worker thread pool
  val threadPool = new ThreadPoolExecutor(
    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])

  def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
    threadPool.execute(new TaskRunner(context, taskId, serializedTask))
  }
}

 

TaskRunner

  class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
    extends Runnable {

    override def run() {
      try {
        SparkEnv.set(env)
        Accumulators.clear()
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) // 反序列化
        updateDependencies(taskFiles, taskJars)
        val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // 反序列化 
        attemptedTask = Some(task)
        logInfo("Its epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch)
        taskStart = System.currentTimeMillis()
        val value = task.run(taskId.toInt)  // 调用task.run执行真正的逻辑
        val taskFinish = System.currentTimeMillis()
        val accumUpdates = Accumulators.values
        val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null)) // 生成TaskResult
        val serializedResult = ser.serialize(result) // 将TaskResult序列化
        logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
        context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) // 将任务完成和taskresult,通过statusUpdate报告给driver
        logInfo("Finished task ID " + taskId)
      } catch { // 处理各种fail, 同样也要用statusUpdate event通知driver
        case ffe: FetchFailedException => {
          val reason = ffe.toTaskEndReason
          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
        }

        case t: Throwable => {
          val serviceTime = (System.currentTimeMillis() - taskStart).toInt
          val metrics = attemptedTask.flatMap(t => t.metrics)
          for (m <- metrics) {
            m.executorRunTime = serviceTime
            m.jvmGCTime = getTotalGCTime - startGCTime
          }
          val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
          context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

          // TODO: Should we exit the whole executor here? On the one hand, the failed task may
          // have left some weird state around depending on when the exception was thrown, but on
          // the other hand, maybe we could detect that when future tasks fail and exit then.
          logError("Exception in task ID " + taskId, t)
          //System.exit(1)
        }
      }
    }
  }


本文章摘自博客园,原文发布日期:2014-01-07
目录
相关文章
|
存储 缓存 分布式计算
Spark的Driver和Executor
Spark的Driver和Executor
871 0
|
7月前
|
分布式计算 Java Spark
Spark Driver和Executor数据传递使用问题
Spark Driver和Executor数据传递使用问题
86 0
|
分布式计算 监控 Java
Spark - Executor 初始化 && 报警都进行1次
程序启动 M个 Executor,每个 Executor 共 N core,即每个 Executor 上的 task = N,现在有一个 object 需要在每一个 task 上初始化公用变量,本文介绍如何只初始化一次以及异常情况下只报警一次的方法。
277 0
Spark - Executor 初始化 && 报警都进行1次
|
存储 分布式计算 Kubernetes
SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)
SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)
247 0
|
分布式计算 Spark C++
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
Spark的一个经典问题(1个Core5个Executor和5个Core1个Executor有什么区别)
|
分布式计算 Scala Spark
Spark源码分析之ResultTask处理
ResultTask 执行当前分区的计算,首先从ShuffleMapTask拿到当前分区的数据,会从所有的ShuffleMapTask都拿一遍当前的分区数据,然后调用reduceByKey自定义的函数进行计算,最后合并所有的ResultTask输出结果,进行输出
2309 0
|
Apache 分布式计算 Spark
Apache Spark Delta Lake 事务日志实现源码分析
Apache Spark Delta Lake 事务日志实现源码分析 我们已经在这篇文章详细介绍了 Apache Spark Delta Lake 的事务日志是什么、主要用途以及如何工作的。那篇文章已经可以很好地给大家介绍 Delta Lake 的内部工作原理,原子性保证,本文为了学习的目的,带领大家从源码级别来看看 Delta Lake 事务日志的实现。
2041 0
|
分布式计算 Java Shell
Spark源码分析之Spark Shell(上)
终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。
941 0
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
Spark2.4.0源码分析之WorldCount 默认shuffling并行度为200(九)
1292 0
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1627 0