Spark修炼之道(高级篇)——Spark源码阅读:第十节 Standalone运行模式解析

简介: Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括:类:org.apache.spark.deploy.master.Master说明:负责整个集群的资源调度及Application的管理。消息类型:接收Worker发送的消息1. RegisterWorker2. ExecutorStateChanged3. Work

Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括:

类:org.apache.spark.deploy.master.Master
说明:负责整个集群的资源调度及Application的管理。
消息类型:
接收Worker发送的消息
1. RegisterWorker
2. ExecutorStateChanged
3. WorkerSchedulerStateResponse
4. Heartbeat

向Worker发送的消息
1. RegisteredWorker
2. RegisterWorkerFailed
3. ReconnectWorker
4. KillExecutor
5.LaunchExecutor
6.LaunchDriver
7.KillDriver
8.ApplicationFinished

向AppClient发送的消息
1. RegisteredApplication
2. ExecutorAdded
3. ExecutorUpdated
4. ApplicationRemoved
接收AppClient发送的消息
1. RegisterApplication
2. UnregisterApplication
3. MasterChangeAcknowledged
4. RequestExecutors
5. KillExecutors

向Driver Client发送的消息
1.SubmitDriverResponse
2.KillDriverResponse
3.DriverStatusResponse

接收Driver Client发送的消息
1.RequestSubmitDriver 
2.RequestKillDriver         
3.RequestDriverStatus
类org.apache.spark.deploy.worker.Worker
说明:向Master注册自己并启动CoarseGrainedExecutorBackend,在运行时启动Executor运行Task任务
消息类型:
向Master发送的消息
1. RegisterWorker
2. ExecutorStateChanged
3. WorkerSchedulerStateResponse
4. Heartbeat
接收Master发送的消息
1. RegisteredWorker
2. RegisterWorkerFailed
3. ReconnectWorker
4. KillExecutor
5.LaunchExecutor
6.LaunchDriver
7.KillDriver
8.ApplicationFinished
类org.apache.spark.deploy.client.AppClient.ClientEndpoint
说明:向Master注册并监控Application,请求或杀死Executors等
消息类型:
向Master发送的消息
1. RegisterApplication
2. UnregisterApplication
3. MasterChangeAcknowledged
4. RequestExecutors
5. KillExecutors
接收Master发送的消息
1. RegisteredApplication
2. ExecutorAdded
3. ExecutorUpdated
4. ApplicationRemoved
类:org.apache.spark.scheduler.cluster.DriverEndpoint
说明:运行时注册Executor并启动Task的运行并处理Executor发送来的状态更新等
消息类型:
向Executor发送的消息
1.LaunchTask
2.KillTask
3.RegisteredExecutor
4.RegisterExecutorFailed
接收Executor发送的消息
1.RegisterExecutor
2.StatusUpdate
类:org.apache.spark.deploy.ClientEndpoint 
说明:管理Driver包括提交Driver、Kill掉Driver及获取Driver状态信息
向Master发送的消息
1.RequestSubmitDriver 
2.RequestKillDriver         
3.RequestDriverStatus

接收Master 发送的消息
1.SubmitDriverResponse
2.KillDriverResponse
3.DriverStatusResponse

上面所有的类都继承自org.apache.spark.rpc.ThreadSafeRpcEndpoint,其底层实现目前都是通过AKKA来实现的,具体如下图所示:

这里写图片描述

各类之间的交互关系如下图所示:
这里写图片描述

1. AppClient与Master间的交互

SparkContext在创建时,会调用,createTaskScheduler方法创建相应的TaskScheduler及SchedulerBackend

 // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s
    // constructor
    _taskScheduler.start()
Standalone运行模式创建的TaskScheduler及SchedulerBackend具体源码如下:

/**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {

   //省略其它非关键代码
   case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
    //省略其它非关键代码

}

创建完成TaskScheduler及SchedulerBackend后,调用TaskScheduler的start方法,启动SchedulerBackend(Standalone模式对应SparkDeploySchedulerBackend)

//TaskSchedulerImpl中的start方法 
override def start() {
    //调用SchedulerBackend的start方法 
    backend.start()
    //省略其它非关键代码
  }

对应SparkDeploySchedulerBackend中的start方法源码如下:

override def start() {
    super.start()

    //省略其它非关键代码
    //Application相关信息(包括应用程序名、executor运行内存等)
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    //创建AppClient,传入相应启动参数
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    waitForRegistration()
  }

AppClient类中的start方法原码如下:

//AppClient Start方法
  def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    //ClientEndpoint,该ClientEndpoint为AppClient的内部类
    //它是AppClient的rpcEndpoint
    endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
  }
ClientEndpoint在启动时,会向Master注册Application

override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

registerWithMaster方法便是向Master注册Application,其源码如下:

/**
     * Register with all masters asynchronously. It will call `registerWithMaster` every
     * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
     * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
     *
     * nthRetry means this is the nth attempt to register with master.
     */
    private def registerWithMaster(nthRetry: Int) {
      registerMasterFutures = tryRegisterAllMasters()
      //注册失败时重试
      registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = {
          Utils.tryOrExit {
            if (registered) {
              registerMasterFutures.foreach(_.cancel(true))
              registerMasterThreadPool.shutdownNow()
            } else if (nthRetry >= REGISTRATION_RETRIES) {
              markDead("All masters are unresponsive! Giving up.")
            } else {
              registerMasterFutures.foreach(_.cancel(true))
              registerWithMaster(nthRetry + 1)
            }
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
    }

向所有Masters注册,因为Master可能实现了高可靠(HA),例如ZooKeeper的HA方式,所以存在多个Master,但最终只有Active Master响应,具体源码如下:

 /**
     *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
     */
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            //获取Master rpcEndpoint
            val masterRef =
              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            //向Master发送RegisterApplication信息            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

Master会接收来自AppClient的RegisterApplication消息,具体源码如下:

//org.apache.spark.deploy.master.Master.receive方法接受AppClient发送来的RegisterApplication消息
override def receive: PartialFunction[Any, Unit] = {

    case RegisterApplication(description, driver) => {
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don‘t send response
      } else {
        logInfo("Registering app " + description.name)
        //创建ApplicationInfo
        val app = createApplication(description, driver)
        //注册Application
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        //向AppClient发送RegisteredApplication消息
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }
    }

AppClient内部类ClientEndpoint接收Master发来的RegisteredApplication消息

 override def receive: PartialFunction[Any, Unit] = {
      case RegisteredApplication(appId_, masterRef) =>
        // FIXME How to handle the following cases?
        // 1. A master receives multiple registrations and sends back multiple
        // RegisteredApplications due to an unstable network.
        // 2. Receive multiple RegisteredApplication from different masters because the master is
        // changing.
        appId = appId_
        registered = true
        master = Some(masterRef)
        listener.connected(appId)
       //省略其它非关键代码
}

通过上述过程便完成Application的注册。其它交互信息如下

//------------------AppClient 向 Master  发送的消息------------------//

  //AppClient向Master注册Application
  case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    extends DeployMessage

 //AppClient向Master注销Application
  case class UnregisterApplication(appId: String)

 //Master从故障中恢复后,发送MasterChange消息给AppClient,AppClient接收到该消息后,更改保存的Master信息,然后发送MasterChangeAcknowledged给Master
  case class MasterChangeAcknowledged(appId: String)

//为Application的运行申请数量为requestedTotal的Executor
  case class RequestExecutors(appId: String, requestedTotal: Int)

//杀死Application对应的Executors
  case class KillExecutors(appId: String, executorIds: Seq[String])
//------------------Master  向 AppClient 发送的消息------------------//

  //向AppClient发送Application注册成功的消息
  case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage

  // TODO(matei): replace hostPort with host
  //Worker启动了Executor后,发送该消息通知AppClient
  case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
    Utils.checkHostPort(hostPort, "Required hostport")
  }
   //Executor状态更新后,发送该消息通知AppClient
  case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
    exitStatus: Option[Int])

  //Application成功运行或失败时,Master发送该消息给AppClient
  //AppClient接收该消息后,停止Application的运行
  case class ApplicationRemoved(message: String)

   // Master 发生变化时,会利用MasterChanged消息通知Worker及AppClient
  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)

2. Master与Worker间的交互

这里只给出其基本的消息交互,后面有时间再来具体分析。

 //------------------Worker向Master发送的消息------------------//

  //向Master注册Worker,Master在完成Worker注册后,向Worker发送RegisteredWorker消息,此后便可接收来自Master的调度
  case class RegisterWorker(
      id: String,
      host: String,
      port: Int,
      worker: RpcEndpointRef,
      cores: Int,
      memory: Int,
      webUiPort: Int,
      publicAddress: String)
    extends DeployMessage {
    Utils.checkHost(host, "Required hostname")
    assert (port > 0)
  }

  //向Master汇报Executor的状态变化
  case class ExecutorStateChanged(
      appId: String,
      execId: Int,
      state: ExecutorState,
      message: Option[String],
      exitStatus: Option[Int])
    extends DeployMessage

  //向Master汇报Driver状态变化
  case class DriverStateChanged(
      driverId: String,
      state: DriverState,
      exception: Option[Exception])
    extends DeployMessage

  //Worker向Master汇报其运行的Executor及Driver信息
  case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
     driverIds: Seq[String])

  //Worker向Master发送的心跳信息,主要向Master报活
  case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
 //------------------Master向Worker发送的消息------------------//

  //Worker发送RegisterWorker消息注册Worker,注册成功后Master回复RegisteredWorker消息给Worker
  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage

  //Worker发送RegisterWorker消息注册Worker,注册失败后Master回复RegisterWorkerFailed消息给Worker
  case class RegisterWorkerFailed(message: String) extends DeployMessage

   //Worker心跳超时后,Master向Worker发送ReconnectWorker消息,通知Worker节点需要重新注册
  case class ReconnectWorker(masterUrl: String) extends DeployMessage

  //application运行完毕后,Master向Worker发送KillExecutor消息,Worker接收到消息后,删除对应execId的Executor
  case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage
   //向Worker节点发送启动Executor消息
  case class LaunchExecutor(
      masterUrl: String,
      appId: String,
      execId: Int,
      appDesc: ApplicationDescription,
      cores: Int,
      memory: Int)
    extends DeployMessage

  //向Worker节点发送启动Driver消息
  case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage

  //杀死对应Driver
  case class KillDriver(driverId: String) extends DeployMessage

  case class ApplicationFinished(id: String)

3. Driver Client与 Master间的消息交互

Driver Client主要是管理Driver,包括向Master提交Driver、请求杀死Driver等,其源码位于org.apache.spark.deploy.client.scala源码文件当中,类名为:org.apache.spark.deploy.ClientEndpoint。要注意其与org.apache.spark.deploy.client.AppClient.ClientEndpoint类的本质不同。

 //------------------Driver Client间Master信息的交互------------------//

  //Driver Client向Master请求提交Driver
  case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
  //Master向Driver Client返回注册是否成功的消息
  case class SubmitDriverResponse(
      master: RpcEndpointRef, success: Boolean, driverId: Option[String], message: String)
    extends DeployMessage


  //Driver Client向Master请求Kill Driver
  case class RequestKillDriver(driverId: String) extends DeployMessage
  // Master回复Kill Driver是否成功
  case class KillDriverResponse(
      master: RpcEndpointRef, driverId: String, success: Boolean, message: String)
    extends DeployMessage


  //Driver Client向Master请求Driver状态
  case class RequestDriverStatus(driverId: String) extends DeployMessage
  //Master向Driver Client返回状态请求信息
  case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
    workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception])

4. Driver与Executor间的消息交互

//------------------Driver向Executor发送的消息------------------//
  //启动Task
  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
  //杀死Task
  case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
    extends CoarseGrainedClusterMessage
 //Executor注册成功
case object RegisteredExecutor extends CoarseGrainedClusterMessage
 //Executor注册失败
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage



//------------------Executor向Driver发送的消息------------------//
 //向Driver注册Executor
 case class RegisterExecutor(
      executorId: String,
      executorRef: RpcEndpointRef,
      hostPort: String,
      cores: Int,
      logUrls: Map[String, String])
    extends CoarseGrainedClusterMessage {
    Utils.checkHostPort(hostPort, "Expected host port")
  }

 //向Driver汇报状态变化
 case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
    data: SerializableBuffer) extends CoarseGrainedClusterMessage

  object StatusUpdate {
    /** Alternate factory method that takes a ByteBuffer directly for the data field */
    def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer)
      : StatusUpdate = {
      StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
    }
  }

作者:周志湖
网名:摇摆少年梦

目录
相关文章
|
11月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
1066 29
|
11月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
454 4
|
11月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
11月前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
11月前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
12月前
|
机器学习/深度学习 自然语言处理 算法
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
3246 1
|
11月前
|
负载均衡 JavaScript 前端开发
分片上传技术全解析:原理、优势与应用(含简单实现源码)
分片上传通过将大文件分割成多个小的片段或块,然后并行或顺序地上传这些片段,从而提高上传效率和可靠性,特别适用于大文件的上传场景,尤其是在网络环境不佳时,分片上传能有效提高上传体验。 博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
424 2
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
1028 2
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
1181 1
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析

推荐镜像

更多
  • DNS