Spark修炼之道(高级篇)——Spark源码阅读:第十节 Standalone运行模式解析

简介: Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括:类:org.apache.spark.deploy.master.Master说明:负责整个集群的资源调度及Application的管理。消息类型:接收Worker发送的消息1. RegisterWorker2. ExecutorStateChanged3. Work

Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括:

类:org.apache.spark.deploy.master.Master
说明:负责整个集群的资源调度及Application的管理。
消息类型:
接收Worker发送的消息
1. RegisterWorker
2. ExecutorStateChanged
3. WorkerSchedulerStateResponse
4. Heartbeat

向Worker发送的消息
1. RegisteredWorker
2. RegisterWorkerFailed
3. ReconnectWorker
4. KillExecutor
5.LaunchExecutor
6.LaunchDriver
7.KillDriver
8.ApplicationFinished

向AppClient发送的消息
1. RegisteredApplication
2. ExecutorAdded
3. ExecutorUpdated
4. ApplicationRemoved
接收AppClient发送的消息
1. RegisterApplication
2. UnregisterApplication
3. MasterChangeAcknowledged
4. RequestExecutors
5. KillExecutors

向Driver Client发送的消息
1.SubmitDriverResponse
2.KillDriverResponse
3.DriverStatusResponse

接收Driver Client发送的消息
1.RequestSubmitDriver 
2.RequestKillDriver         
3.RequestDriverStatus
类org.apache.spark.deploy.worker.Worker
说明:向Master注册自己并启动CoarseGrainedExecutorBackend,在运行时启动Executor运行Task任务
消息类型:
向Master发送的消息
1. RegisterWorker
2. ExecutorStateChanged
3. WorkerSchedulerStateResponse
4. Heartbeat
接收Master发送的消息
1. RegisteredWorker
2. RegisterWorkerFailed
3. ReconnectWorker
4. KillExecutor
5.LaunchExecutor
6.LaunchDriver
7.KillDriver
8.ApplicationFinished
类org.apache.spark.deploy.client.AppClient.ClientEndpoint
说明:向Master注册并监控Application,请求或杀死Executors等
消息类型:
向Master发送的消息
1. RegisterApplication
2. UnregisterApplication
3. MasterChangeAcknowledged
4. RequestExecutors
5. KillExecutors
接收Master发送的消息
1. RegisteredApplication
2. ExecutorAdded
3. ExecutorUpdated
4. ApplicationRemoved
类:org.apache.spark.scheduler.cluster.DriverEndpoint
说明:运行时注册Executor并启动Task的运行并处理Executor发送来的状态更新等
消息类型:
向Executor发送的消息
1.LaunchTask
2.KillTask
3.RegisteredExecutor
4.RegisterExecutorFailed
接收Executor发送的消息
1.RegisterExecutor
2.StatusUpdate
类:org.apache.spark.deploy.ClientEndpoint 
说明:管理Driver包括提交Driver、Kill掉Driver及获取Driver状态信息
向Master发送的消息
1.RequestSubmitDriver 
2.RequestKillDriver         
3.RequestDriverStatus

接收Master 发送的消息
1.SubmitDriverResponse
2.KillDriverResponse
3.DriverStatusResponse

上面所有的类都继承自org.apache.spark.rpc.ThreadSafeRpcEndpoint,其底层实现目前都是通过AKKA来实现的,具体如下图所示:

这里写图片描述

各类之间的交互关系如下图所示:
这里写图片描述

1. AppClient与Master间的交互

SparkContext在创建时,会调用,createTaskScheduler方法创建相应的TaskScheduler及SchedulerBackend

 // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s
    // constructor
    _taskScheduler.start()
Standalone运行模式创建的TaskScheduler及SchedulerBackend具体源码如下:

/**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {

   //省略其它非关键代码
   case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
    //省略其它非关键代码

}

创建完成TaskScheduler及SchedulerBackend后,调用TaskScheduler的start方法,启动SchedulerBackend(Standalone模式对应SparkDeploySchedulerBackend)

//TaskSchedulerImpl中的start方法 
override def start() {
    //调用SchedulerBackend的start方法 
    backend.start()
    //省略其它非关键代码
  }

对应SparkDeploySchedulerBackend中的start方法源码如下:

override def start() {
    super.start()

    //省略其它非关键代码
    //Application相关信息(包括应用程序名、executor运行内存等)
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    //创建AppClient,传入相应启动参数
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    waitForRegistration()
  }

AppClient类中的start方法原码如下:

//AppClient Start方法
  def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    //ClientEndpoint,该ClientEndpoint为AppClient的内部类
    //它是AppClient的rpcEndpoint
    endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
  }
ClientEndpoint在启动时,会向Master注册Application

override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

registerWithMaster方法便是向Master注册Application,其源码如下:

/**
     * Register with all masters asynchronously. It will call `registerWithMaster` every
     * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
     * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
     *
     * nthRetry means this is the nth attempt to register with master.
     */
    private def registerWithMaster(nthRetry: Int) {
      registerMasterFutures = tryRegisterAllMasters()
      //注册失败时重试
      registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = {
          Utils.tryOrExit {
            if (registered) {
              registerMasterFutures.foreach(_.cancel(true))
              registerMasterThreadPool.shutdownNow()
            } else if (nthRetry >= REGISTRATION_RETRIES) {
              markDead("All masters are unresponsive! Giving up.")
            } else {
              registerMasterFutures.foreach(_.cancel(true))
              registerWithMaster(nthRetry + 1)
            }
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)
    }

向所有Masters注册,因为Master可能实现了高可靠(HA),例如ZooKeeper的HA方式,所以存在多个Master,但最终只有Active Master响应,具体源码如下:

 /**
     *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
     */
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            //获取Master rpcEndpoint
            val masterRef =
              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            //向Master发送RegisterApplication信息            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

Master会接收来自AppClient的RegisterApplication消息,具体源码如下:

//org.apache.spark.deploy.master.Master.receive方法接受AppClient发送来的RegisterApplication消息
override def receive: PartialFunction[Any, Unit] = {

    case RegisterApplication(description, driver) => {
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don‘t send response
      } else {
        logInfo("Registering app " + description.name)
        //创建ApplicationInfo
        val app = createApplication(description, driver)
        //注册Application
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        //向AppClient发送RegisteredApplication消息
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }
    }

AppClient内部类ClientEndpoint接收Master发来的RegisteredApplication消息

 override def receive: PartialFunction[Any, Unit] = {
      case RegisteredApplication(appId_, masterRef) =>
        // FIXME How to handle the following cases?
        // 1. A master receives multiple registrations and sends back multiple
        // RegisteredApplications due to an unstable network.
        // 2. Receive multiple RegisteredApplication from different masters because the master is
        // changing.
        appId = appId_
        registered = true
        master = Some(masterRef)
        listener.connected(appId)
       //省略其它非关键代码
}

通过上述过程便完成Application的注册。其它交互信息如下

//------------------AppClient 向 Master  发送的消息------------------//

  //AppClient向Master注册Application
  case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    extends DeployMessage

 //AppClient向Master注销Application
  case class UnregisterApplication(appId: String)

 //Master从故障中恢复后,发送MasterChange消息给AppClient,AppClient接收到该消息后,更改保存的Master信息,然后发送MasterChangeAcknowledged给Master
  case class MasterChangeAcknowledged(appId: String)

//为Application的运行申请数量为requestedTotal的Executor
  case class RequestExecutors(appId: String, requestedTotal: Int)

//杀死Application对应的Executors
  case class KillExecutors(appId: String, executorIds: Seq[String])
//------------------Master  向 AppClient 发送的消息------------------//

  //向AppClient发送Application注册成功的消息
  case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage

  // TODO(matei): replace hostPort with host
  //Worker启动了Executor后,发送该消息通知AppClient
  case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
    Utils.checkHostPort(hostPort, "Required hostport")
  }
   //Executor状态更新后,发送该消息通知AppClient
  case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
    exitStatus: Option[Int])

  //Application成功运行或失败时,Master发送该消息给AppClient
  //AppClient接收该消息后,停止Application的运行
  case class ApplicationRemoved(message: String)

   // Master 发生变化时,会利用MasterChanged消息通知Worker及AppClient
  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)

2. Master与Worker间的交互

这里只给出其基本的消息交互,后面有时间再来具体分析。

 //------------------Worker向Master发送的消息------------------//

  //向Master注册Worker,Master在完成Worker注册后,向Worker发送RegisteredWorker消息,此后便可接收来自Master的调度
  case class RegisterWorker(
      id: String,
      host: String,
      port: Int,
      worker: RpcEndpointRef,
      cores: Int,
      memory: Int,
      webUiPort: Int,
      publicAddress: String)
    extends DeployMessage {
    Utils.checkHost(host, "Required hostname")
    assert (port > 0)
  }

  //向Master汇报Executor的状态变化
  case class ExecutorStateChanged(
      appId: String,
      execId: Int,
      state: ExecutorState,
      message: Option[String],
      exitStatus: Option[Int])
    extends DeployMessage

  //向Master汇报Driver状态变化
  case class DriverStateChanged(
      driverId: String,
      state: DriverState,
      exception: Option[Exception])
    extends DeployMessage

  //Worker向Master汇报其运行的Executor及Driver信息
  case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
     driverIds: Seq[String])

  //Worker向Master发送的心跳信息,主要向Master报活
  case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
 //------------------Master向Worker发送的消息------------------//

  //Worker发送RegisterWorker消息注册Worker,注册成功后Master回复RegisteredWorker消息给Worker
  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage

  //Worker发送RegisterWorker消息注册Worker,注册失败后Master回复RegisterWorkerFailed消息给Worker
  case class RegisterWorkerFailed(message: String) extends DeployMessage

   //Worker心跳超时后,Master向Worker发送ReconnectWorker消息,通知Worker节点需要重新注册
  case class ReconnectWorker(masterUrl: String) extends DeployMessage

  //application运行完毕后,Master向Worker发送KillExecutor消息,Worker接收到消息后,删除对应execId的Executor
  case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage
   //向Worker节点发送启动Executor消息
  case class LaunchExecutor(
      masterUrl: String,
      appId: String,
      execId: Int,
      appDesc: ApplicationDescription,
      cores: Int,
      memory: Int)
    extends DeployMessage

  //向Worker节点发送启动Driver消息
  case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage

  //杀死对应Driver
  case class KillDriver(driverId: String) extends DeployMessage

  case class ApplicationFinished(id: String)

3. Driver Client与 Master间的消息交互

Driver Client主要是管理Driver,包括向Master提交Driver、请求杀死Driver等,其源码位于org.apache.spark.deploy.client.scala源码文件当中,类名为:org.apache.spark.deploy.ClientEndpoint。要注意其与org.apache.spark.deploy.client.AppClient.ClientEndpoint类的本质不同。

 //------------------Driver Client间Master信息的交互------------------//

  //Driver Client向Master请求提交Driver
  case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
  //Master向Driver Client返回注册是否成功的消息
  case class SubmitDriverResponse(
      master: RpcEndpointRef, success: Boolean, driverId: Option[String], message: String)
    extends DeployMessage


  //Driver Client向Master请求Kill Driver
  case class RequestKillDriver(driverId: String) extends DeployMessage
  // Master回复Kill Driver是否成功
  case class KillDriverResponse(
      master: RpcEndpointRef, driverId: String, success: Boolean, message: String)
    extends DeployMessage


  //Driver Client向Master请求Driver状态
  case class RequestDriverStatus(driverId: String) extends DeployMessage
  //Master向Driver Client返回状态请求信息
  case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
    workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception])

4. Driver与Executor间的消息交互

//------------------Driver向Executor发送的消息------------------//
  //启动Task
  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
  //杀死Task
  case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
    extends CoarseGrainedClusterMessage
 //Executor注册成功
case object RegisteredExecutor extends CoarseGrainedClusterMessage
 //Executor注册失败
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage



//------------------Executor向Driver发送的消息------------------//
 //向Driver注册Executor
 case class RegisterExecutor(
      executorId: String,
      executorRef: RpcEndpointRef,
      hostPort: String,
      cores: Int,
      logUrls: Map[String, String])
    extends CoarseGrainedClusterMessage {
    Utils.checkHostPort(hostPort, "Expected host port")
  }

 //向Driver汇报状态变化
 case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
    data: SerializableBuffer) extends CoarseGrainedClusterMessage

  object StatusUpdate {
    /** Alternate factory method that takes a ByteBuffer directly for the data field */
    def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer)
      : StatusUpdate = {
      StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
    }
  }

作者:周志湖
网名:摇摆少年梦

目录
相关文章
|
监控 安全 数据可视化
哪些项目适合采用BOT+EPC模式?深度解析
2分钟了解什么是BOT+EPC项目管理模式以及该模式适用于哪些类型的项目。
1194 1
哪些项目适合采用BOT+EPC模式?深度解析
|
数据可视化 算法 数据挖掘
用傅里叶变换解码时间序列:从频域视角解析季节性模式
本文介绍了如何使用傅里叶变换和周期图分析来识别时间序列中的季节性模式,特别是在能源消耗数据中。通过Python实现傅里叶变换和周期图,可以有效提取并量化时间序列中的主要和次要频率成分,克服传统可视化分析的局限性。这对于准确捕捉时间序列中的季节性变化具有重要意义。文章以AEP能源消耗数据为例,展示了如何应用这些方法识别日、周、半年等周期模式。
566 3
用傅里叶变换解码时间序列:从频域视角解析季节性模式
|
数据采集 Web App开发 存储
深度解析:使用 Headless 模式 ChromeDriver 进行无界面浏览器操作
本文介绍了基于无界面浏览器(如ChromeDriver)和代理IP技术的现代爬虫解决方案,以应对传统爬虫面临的反爬机制和动态加载内容等问题。通过Selenium驱动ChromeDriver,并结合亿牛云爬虫代理、自定义Cookie和User-Agent设置,实现高效的数据采集。代码示例展示了如何配置ChromeDriver、处理代理认证、添加Cookie及捕获异常,确保爬虫稳定运行。性能对比显示,Headless模式下的ChromeDriver在数据采集成功率、响应时间和反爬规避能力上显著优于传统爬虫。该方案广泛应用于电商、金融和新闻媒体等行业。
651 0
深度解析:使用 Headless 模式 ChromeDriver 进行无界面浏览器操作
阿里云CDN怎么收费?看这一篇就够了,CDN不同计费模式收费价格全解析
阿里云CDN的费用由基础费用和增值费用组成。基础费用有三种计费方式:按流量、按带宽峰值和月结95带宽峰值,默认为按流量计费,价格根据使用量阶梯递减。增值费用包括静态HTTPS请求、QUIC请求等,按实际使用量收费,不使用不收费。具体收费标准和详细规则可参考阿里云官方页面。
|
分布式计算 资源调度 Hadoop
Spark Standalone与YARN的区别?
本文详细解析了 Apache Spark 的两种常见部署模式:Standalone 和 YARN。Standalone 模式自带轻量级集群管理服务,适合小规模集群;YARN 模式与 Hadoop 生态系统集成,适合大规模生产环境。文章通过示例代码展示了如何在两种模式下运行 Spark 应用程序,并总结了两者的优缺点,帮助读者根据需求选择合适的部署模式。
675 3
|
数据采集 机器学习/深度学习 数据挖掘
10种数据预处理中的数据泄露模式解析:识别与避免策略
在机器学习中,数据泄露是一个常见问题,指的是测试数据在数据准备阶段无意中混入训练数据,导致模型在测试集上的表现失真。本文详细探讨了数据预处理步骤中的数据泄露问题,包括缺失值填充、分类编码、数据缩放、离散化和重采样,并提供了具体的代码示例,展示了如何避免数据泄露,确保模型的测试结果可靠。
1086 2
|
人工智能 数据挖掘 大数据
排队免单与消费增值模式:融合玩法与优势解析
排队免单模式通过订单排队、奖励分配、加速与退出机制等,结合消费增值模式中的积分制度、利润入池与积分增值等,共同提升消费者参与度和忠诚度,促进商家销售增长。具体包括订单自动排队、大单拆小单、异业联盟、线上线下融合及数据分析优化等进阶玩法,以及积分增值模型演算,形成一套完整的消费者激励体系。
|
分布式计算 资源调度 Hadoop
Spark Standalone与YARN的区别?
【10月更文挑战第5天】随着大数据处理需求的增长,Apache Spark 成为了广泛采用的大数据处理框架。本文详细解析了 Spark Standalone 与 YARN 两种常见部署模式的区别,并通过示例代码展示了如何在不同模式下运行 Spark 应用程序。Standalone 模式自带轻量级集群管理,适合小规模集群或独立部署;YARN 则作为外部资源管理器,能够与 Hadoop 生态系统中的其他应用共享资源,更适合大规模生产环境。文章对比了两者的资源管理、部署灵活性、扩展性和集成能力,帮助读者根据需求选择合适的部署模式。
330 1
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
221 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
设计模式 存储 安全
PHP中单例模式的深入解析与实践指南
在PHP开发领域,设计模式是构建高效、可维护代码的重要工具。本文聚焦于单例模式——一种确保类仅有一个实例,并提供全局访问点的模式。我们将从理论出发,探讨单例模式的基本概念、应用场景,并通过实际案例分析其在PHP中的实现技巧。最后,讨论单例模式的优势、潜在缺陷及如何在实际项目中合理运用。

推荐镜像

更多
  • DNS