Spark修炼之道(高级篇)——Spark源码阅读:第十节 Standalone运行模式解析

简介: Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括:类:org.apache.spark.deploy.master.Master说明:负责整个集群的资源调度及Application的管理。消息类型:接收Worker发送的消息1. RegisterWorker2. ExecutorStateChanged3. Work

Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括:

类:org.apache.spark.deploy.master.Master
说明:负责整个集群的资源调度及Application的管理。
消息类型:
接收Worker发送的消息
1. RegisterWorker
2. ExecutorStateChanged
3. WorkerSchedulerStateResponse
4. Heartbeat

向Worker发送的消息
1. RegisteredWorker
2. RegisterWorkerFailed
3. ReconnectWorker
4. KillExecutor
5.LaunchExecutor
6.LaunchDriver
7.KillDriver
8.ApplicationFinished

向AppClient发送的消息
1. RegisteredApplication
2. ExecutorAdded
3. ExecutorUpdated
4. ApplicationRemoved
接收AppClient发送的消息
1. RegisterApplication
2. UnregisterApplication
3. MasterChangeAcknowledged
4. RequestExecutors
5. KillExecutors

向Driver Client发送的消息
1.SubmitDriverResponse
2.KillDriverResponse
3.DriverStatusResponse

接收Driver Client发送的消息
1.RequestSubmitDriver 
2.RequestKillDriver         
3.RequestDriverStatus
类org.apache.spark.deploy.worker.Worker
说明:向Master注册自己并启动CoarseGrainedExecutorBackend,在运行时启动Executor运行Task任务
消息类型:
向Master发送的消息
1. RegisterWorker
2. ExecutorStateChanged
3. WorkerSchedulerStateResponse
4. Heartbeat
接收Master发送的消息
1. RegisteredWorker
2. RegisterWorkerFailed
3. ReconnectWorker
4. KillExecutor
5.LaunchExecutor
6.LaunchDriver
7.KillDriver
8.ApplicationFinished
类org.apache.spark.deploy.client.AppClient.ClientEndpoint
说明:向Master注册并监控Application,请求或杀死Executors等
消息类型:
向Master发送的消息
1. RegisterApplication
2. UnregisterApplication
3. MasterChangeAcknowledged
4. RequestExecutors
5. KillExecutors
接收Master发送的消息
1. RegisteredApplication
2. ExecutorAdded
3. ExecutorUpdated
4. ApplicationRemoved
类:org.apache.spark.scheduler.cluster.DriverEndpoint
说明:运行时注册Executor并启动Task的运行并处理Executor发送来的状态更新等
消息类型:
向Executor发送的消息
1.LaunchTask
2.KillTask
3.RegisteredExecutor
4.RegisterExecutorFailed
接收Executor发送的消息
1.RegisterExecutor
2.StatusUpdate
类:org.apache.spark.deploy.ClientEndpoint 
说明:管理Driver包括提交Driver、Kill掉Driver及获取Driver状态信息
向Master发送的消息
1.RequestSubmitDriver 
2.RequestKillDriver         
3.RequestDriverStatus

接收Master 发送的消息
1.SubmitDriverResponse
2.KillDriverResponse
3.DriverStatusResponse

上面所有的类都继承自org.apache.spark.rpc.ThreadSafeRpcEndpoint,其底层实现目前都是通过AKKA来实现的,具体如下图所示:

这里写图片描述

各类之间的交互关系如下图所示:
这里写图片描述

1. AppClient与Master间的交互

SparkContext在创建时,会调用,createTaskScheduler方法创建相应的TaskScheduler及SchedulerBackend

 // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s
    // constructor
    _taskScheduler.start()
Standalone运行模式创建的TaskScheduler及SchedulerBackend具体源码如下:

/**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {

   //省略其它非关键代码
   case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
    //省略其它非关键代码

}

创建完成TaskScheduler及SchedulerBackend后,调用TaskScheduler的start方法,启动SchedulerBackend(Standalone模式对应SparkDeploySchedulerBackend)

//TaskSchedulerImpl中的start方法 
override def start() {
    //调用SchedulerBackend的start方法 
    backend.start()
    //省略其它非关键代码
  }

对应SparkDeploySchedulerBackend中的start方法源码如下:

override def start() {
    super.start()

    //省略其它非关键代码
    //Application相关信息(包括应用程序名、executor运行内存等)
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    //创建AppClient,传入相应启动参数
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    waitForRegistration()
  }

AppClient类中的start方法原码如下:

//AppClient Start方法
  def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    //ClientEndpoint,该ClientEndpoint为AppClient的内部类
    //它是AppClient的rpcEndpoint
    endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
  }
ClientEndpoint在启动时,会向Master注册Application

override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

registerWithMaster方法便是向Master注册Application,其源码如下:

/**
     * Register with all masters asynchronously. It will call `registerWithMaster` every
     * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
     * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
     *
     * nthRetry means this is the nth attempt to register with master.
     */
    private def registerWithMaster(nthRetry: Int) {
      registerMasterFutures = tryRegisterAllMasters()
      //注册失败时重试
      registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = {
          Utils.tryOrExit {
            if (registered) {
              registerMasterFutures.foreach(_.cancel(true))
              registerMasterThreadPool.shutdownNow()
            } else if (nthRetry >= REGISTRATION_RETRIES) {
              markDead("All masters are unresponsive! Giving up.")
            } else {
              registerMasterFutures.foreach(_.cancel(true))
              registerWithMaster(nthRetry + 1)
            }
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
    }

向所有Masters注册,因为Master可能实现了高可靠(HA),例如ZooKeeper的HA方式,所以存在多个Master,但最终只有Active Master响应,具体源码如下:

 /**
     *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
     */
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            //获取Master rpcEndpoint
            val masterRef =
              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            //向Master发送RegisterApplication信息            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

Master会接收来自AppClient的RegisterApplication消息,具体源码如下:

//org.apache.spark.deploy.master.Master.receive方法接受AppClient发送来的RegisterApplication消息
override def receive: PartialFunction[Any, Unit] = {

    case RegisterApplication(description, driver) => {
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don‘t send response
      } else {
        logInfo("Registering app " + description.name)
        //创建ApplicationInfo
        val app = createApplication(description, driver)
        //注册Application
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        //向AppClient发送RegisteredApplication消息
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }
    }

AppClient内部类ClientEndpoint接收Master发来的RegisteredApplication消息

 override def receive: PartialFunction[Any, Unit] = {
      case RegisteredApplication(appId_, masterRef) =>
        // FIXME How to handle the following cases?
        // 1. A master receives multiple registrations and sends back multiple
        // RegisteredApplications due to an unstable network.
        // 2. Receive multiple RegisteredApplication from different masters because the master is
        // changing.
        appId = appId_
        registered = true
        master = Some(masterRef)
        listener.connected(appId)
       //省略其它非关键代码
}

通过上述过程便完成Application的注册。其它交互信息如下

//------------------AppClient 向 Master  发送的消息------------------//

  //AppClient向Master注册Application
  case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    extends DeployMessage

 //AppClient向Master注销Application
  case class UnregisterApplication(appId: String)

 //Master从故障中恢复后,发送MasterChange消息给AppClient,AppClient接收到该消息后,更改保存的Master信息,然后发送MasterChangeAcknowledged给Master
  case class MasterChangeAcknowledged(appId: String)

//为Application的运行申请数量为requestedTotal的Executor
  case class RequestExecutors(appId: String, requestedTotal: Int)

//杀死Application对应的Executors
  case class KillExecutors(appId: String, executorIds: Seq[String])
//------------------Master  向 AppClient 发送的消息------------------//

  //向AppClient发送Application注册成功的消息
  case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage

  // TODO(matei): replace hostPort with host
  //Worker启动了Executor后,发送该消息通知AppClient
  case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
    Utils.checkHostPort(hostPort, "Required hostport")
  }
   //Executor状态更新后,发送该消息通知AppClient
  case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
    exitStatus: Option[Int])

  //Application成功运行或失败时,Master发送该消息给AppClient
  //AppClient接收该消息后,停止Application的运行
  case class ApplicationRemoved(message: String)

   // Master 发生变化时,会利用MasterChanged消息通知Worker及AppClient
  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)

2. Master与Worker间的交互

这里只给出其基本的消息交互,后面有时间再来具体分析。

 //------------------Worker向Master发送的消息------------------//

  //向Master注册Worker,Master在完成Worker注册后,向Worker发送RegisteredWorker消息,此后便可接收来自Master的调度
  case class RegisterWorker(
      id: String,
      host: String,
      port: Int,
      worker: RpcEndpointRef,
      cores: Int,
      memory: Int,
      webUiPort: Int,
      publicAddress: String)
    extends DeployMessage {
    Utils.checkHost(host, "Required hostname")
    assert (port > 0)
  }

  //向Master汇报Executor的状态变化
  case class ExecutorStateChanged(
      appId: String,
      execId: Int,
      state: ExecutorState,
      message: Option[String],
      exitStatus: Option[Int])
    extends DeployMessage

  //向Master汇报Driver状态变化
  case class DriverStateChanged(
      driverId: String,
      state: DriverState,
      exception: Option[Exception])
    extends DeployMessage

  //Worker向Master汇报其运行的Executor及Driver信息
  case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
     driverIds: Seq[String])

  //Worker向Master发送的心跳信息,主要向Master报活
  case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
 //------------------Master向Worker发送的消息------------------//

  //Worker发送RegisterWorker消息注册Worker,注册成功后Master回复RegisteredWorker消息给Worker
  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage

  //Worker发送RegisterWorker消息注册Worker,注册失败后Master回复RegisterWorkerFailed消息给Worker
  case class RegisterWorkerFailed(message: String) extends DeployMessage

   //Worker心跳超时后,Master向Worker发送ReconnectWorker消息,通知Worker节点需要重新注册
  case class ReconnectWorker(masterUrl: String) extends DeployMessage

  //application运行完毕后,Master向Worker发送KillExecutor消息,Worker接收到消息后,删除对应execId的Executor
  case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage
   //向Worker节点发送启动Executor消息
  case class LaunchExecutor(
      masterUrl: String,
      appId: String,
      execId: Int,
      appDesc: ApplicationDescription,
      cores: Int,
      memory: Int)
    extends DeployMessage

  //向Worker节点发送启动Driver消息
  case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage

  //杀死对应Driver
  case class KillDriver(driverId: String) extends DeployMessage

  case class ApplicationFinished(id: String)

3. Driver Client与 Master间的消息交互

Driver Client主要是管理Driver,包括向Master提交Driver、请求杀死Driver等,其源码位于org.apache.spark.deploy.client.scala源码文件当中,类名为:org.apache.spark.deploy.ClientEndpoint。要注意其与org.apache.spark.deploy.client.AppClient.ClientEndpoint类的本质不同。

 //------------------Driver Client间Master信息的交互------------------//

  //Driver Client向Master请求提交Driver
  case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
  //Master向Driver Client返回注册是否成功的消息
  case class SubmitDriverResponse(
      master: RpcEndpointRef, success: Boolean, driverId: Option[String], message: String)
    extends DeployMessage


  //Driver Client向Master请求Kill Driver
  case class RequestKillDriver(driverId: String) extends DeployMessage
  // Master回复Kill Driver是否成功
  case class KillDriverResponse(
      master: RpcEndpointRef, driverId: String, success: Boolean, message: String)
    extends DeployMessage


  //Driver Client向Master请求Driver状态
  case class RequestDriverStatus(driverId: String) extends DeployMessage
  //Master向Driver Client返回状态请求信息
  case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
    workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception])

4. Driver与Executor间的消息交互

//------------------Driver向Executor发送的消息------------------//
  //启动Task
  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
  //杀死Task
  case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
    extends CoarseGrainedClusterMessage
 //Executor注册成功
case object RegisteredExecutor extends CoarseGrainedClusterMessage
 //Executor注册失败
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage



//------------------Executor向Driver发送的消息------------------//
 //向Driver注册Executor
 case class RegisterExecutor(
      executorId: String,
      executorRef: RpcEndpointRef,
      hostPort: String,
      cores: Int,
      logUrls: Map[String, String])
    extends CoarseGrainedClusterMessage {
    Utils.checkHostPort(hostPort, "Expected host port")
  }

 //向Driver汇报状态变化
 case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
    data: SerializableBuffer) extends CoarseGrainedClusterMessage

  object StatusUpdate {
    /** Alternate factory method that takes a ByteBuffer directly for the data field */
    def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer)
      : StatusUpdate = {
      StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
    }
  }

作者:周志湖
网名:摇摆少年梦

目录
相关文章
|
9月前
|
数据挖掘 vr&ar C++
让UE自动运行Python脚本:实现与实例解析
本文介绍如何配置Unreal Engine(UE)以自动运行Python脚本,提高开发效率。通过安装Python、配置UE环境及使用第三方插件,实现Python与UE的集成。结合蓝图和C++示例,展示自动化任务处理、关卡生成及数据分析等应用场景。
841 5
|
10月前
|
分布式计算 资源调度 Hadoop
Spark Standalone与YARN的区别?
本文详细解析了 Apache Spark 的两种常见部署模式:Standalone 和 YARN。Standalone 模式自带轻量级集群管理服务,适合小规模集群;YARN 模式与 Hadoop 生态系统集成,适合大规模生产环境。文章通过示例代码展示了如何在两种模式下运行 Spark 应用程序,并总结了两者的优缺点,帮助读者根据需求选择合适的部署模式。
420 3
|
11月前
|
前端开发 Java 应用服务中间件
21张图解析Tomcat运行原理与架构全貌
【10月更文挑战第2天】本文通过21张图详细解析了Tomcat的运行原理与架构。Tomcat作为Java Web开发中最流行的Web服务器之一,其架构设计精妙。文章首先介绍了Tomcat的基本组件:Connector(连接器)负责网络通信,Container(容器)处理业务逻辑。连接器内部包括EndPoint、Processor和Adapter等组件,分别处理通信、协议解析和请求封装。容器采用多级结构(Engine、Host、Context、Wrapper),并通过Mapper组件进行请求路由。文章还探讨了Tomcat的生命周期管理、启动与停止机制,并通过源码分析展示了请求处理流程。
|
12月前
|
移动开发 Android开发 数据安全/隐私保护
移动应用与系统的技术演进:从开发到操作系统的全景解析随着智能手机和平板电脑的普及,移动应用(App)已成为人们日常生活中不可或缺的一部分。无论是社交、娱乐、购物还是办公,移动应用都扮演着重要的角色。而支撑这些应用运行的,正是功能强大且复杂的移动操作系统。本文将深入探讨移动应用的开发过程及其背后的操作系统机制,揭示这一领域的技术演进。
本文旨在提供关于移动应用与系统技术的全面概述,涵盖移动应用的开发生命周期、主要移动操作系统的特点以及它们之间的竞争关系。我们将探讨如何高效地开发移动应用,并分析iOS和Android两大主流操作系统的技术优势与局限。同时,本文还将讨论跨平台解决方案的兴起及其对移动开发领域的影响。通过这篇技术性文章,读者将获得对移动应用开发及操作系统深层理解的钥匙。
346 12
|
11月前
|
分布式计算 资源调度 Hadoop
Spark Standalone与YARN的区别?
【10月更文挑战第5天】随着大数据处理需求的增长,Apache Spark 成为了广泛采用的大数据处理框架。本文详细解析了 Spark Standalone 与 YARN 两种常见部署模式的区别,并通过示例代码展示了如何在不同模式下运行 Spark 应用程序。Standalone 模式自带轻量级集群管理,适合小规模集群或独立部署;YARN 则作为外部资源管理器,能够与 Hadoop 生态系统中的其他应用共享资源,更适合大规模生产环境。文章对比了两者的资源管理、部署灵活性、扩展性和集成能力,帮助读者根据需求选择合适的部署模式。
157 1
|
11月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
125 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
分布式计算 安全 OLAP
7倍性能提升|阿里云AnalyticDB Spark向量化能力解析
AnalyticDB Spark如何通过向量化引擎提升性能?
|
11月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
242 0
|
11月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
189 0
|
12月前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
199 0

推荐镜像

更多
  • DNS