Spark技术内幕:Master的故障恢复

简介:

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现  详细阐述了使用ZK实现的Master的HA,那么Master是如何快速故障恢复的呢?

处于Standby状态的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent发送的ElectedLeader消息后,就开始通过ZK中保存的Application,Driver和Worker的元数据信息进行故障恢复了,它的状态也从RecoveryState.STANDBY变为RecoveryState.RECOVERING了。当然了,如果没有任何需要恢复的数据,Master的状态就直接变为RecoveryState.ALIVE,开始对外服务了。

一方面Master通过

beginRecovery(storedApps, storedDrivers, storedWorkers) 


恢复Application,Driver和Worker的状态,一方面通过

recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
          CompleteRecovery)


在60s后主动向自己发送CompleteRecovery的消息,开始恢复数据完成后的操作。

首先看一下如何通过ZooKeeperLeaderElectionAgent提供的接口恢复数据。

  override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
    val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted // 获取所有的文件
    val appFiles = sortedFiles.filter(_.startsWith("app_")) //获取Application的序列化文件
    val apps = appFiles.map(deserializeFromFile[ApplicationInfo]).flatten //将Application的元数据反序列化
    val driverFiles = sortedFiles.filter(_.startsWith("driver_")) //获取Driver的序列化文件
    val drivers = driverFiles.map(deserializeFromFile[DriverInfo]).flatten //将Driver的元数据反序列化
    val workerFiles = sortedFiles.filter(_.startsWith("worker_")) // 获取Worker的序列化文件
    val workers = workerFiles.map(deserializeFromFile[WorkerInfo]).flatten // 将Worker的元数据反序列化
    (apps, drivers, workers)
  }

获取了原来的Master维护的Application,Driver和Worker的列表后,当前的Master通过beginRecovery来恢复它们的状态。

恢复Application的步骤:

  1. 置待恢复的Application的状态为UNKNOWN,向AppClient发送MasterChanged的消息
  2. AppClient收到后改变其保存的Master的信息,包括URL和Master actor的信息,回复MasterChangeAcknowledged(appId)
  3. Master收到后通过appId后将Application的状态置为WAITING
  4. 检查如果所有的worker和Application的状态都不是UNKNOWN,那么恢复结束,调用completeRecovery()

恢复Worker的步骤:

  1. 重新注册Worker(实际上是更新Master本地维护的数据结构),置状态为UNKNOWN
  2. 向Worker发送MasterChanged的消息
  3. Worker收到消息后,向Master回复 消息WorkerSchedulerStateResponse,并通过该消息上报executor和driver的信息。
  4. Master收到消息后,会置该Worker的状态为ALIVE,并且会检查该Worker上报的信息是否与自己从ZK中获取的数据一致,包括executor和driver。一致的executor和driver将被恢复。对于Driver,其状态被置为RUNNING。
  5. 检查如果所有的worker和Application的状态都不是UNKNOWN,那么恢复结束,调用completeRecovery()
beginRecovery的源码实现:


  def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
      storedWorkers: Seq[WorkerInfo]) {
    for (app <- storedApps) { // 逐个恢复Application
      logInfo("Trying to recover app: " + app.id)
      try {
        registerApplication(app)
        app.state = ApplicationState.UNKNOWN
        app.driver ! MasterChanged(masterUrl, masterWebUiUrl) //向AppClient发送Master变化的消息,AppClient会回复MasterChangeAcknowledged
      } catch {
        case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
      }
    }

    for (driver <- storedDrivers) {
      // Here we just read in the list of drivers. Any drivers associated with now-lost workers
      // will be re-launched when we detect that the worker is missing.
      drivers += driver // 在Worker恢复后,Worker会主动上报运行其上的executors和drivers从而使得Master恢复executor和driver的信息。
    }

    for (worker <- storedWorkers) { //逐个恢复Worker
      logInfo("Trying to recover worker: " + worker.id)
      try {
        registerWorker(worker) //重新注册Worker
        worker.state = WorkerState.UNKNOWN
        worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) //向Worker发送Master变化的消息,Worker会回复WorkerSchedulerStateResponse
      } catch {
        case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
      }
    }
  }

通过下面的流程图可以更加清晰的理解这个过程:


如何判断恢复是否结束?
在上面介绍Application和Worker的恢复时,提到了每次收到他们的回应,都要检查是否当前所有的Worker和Application的状态都不为UNKNOWN,如果是,那么恢复结束,调用completeRecovery()。这个机制并不能完全起作用,如果有一个Worker恰好也是宕机了,那么该Worker的状态会一直是UNKNOWN,那么会导致上述策略一直不会起作用。这时候第二个判断恢复结束的标准就其作用了:超时机制,选择是设定了60s得超时,在60s后,不管是否有Worker或者AppClient未返回相应,都会强制标记当前的恢复结束。对于那些状态仍然是UNKNOWN的app和worker,Master会丢弃这些数据。具体实现如下:

  //调用时机
  // 1. 在恢复开始后的60s会被强制调用
  // 2. 在每次收到AppClient和Worker的消息回复后会检查如果Application和worker的状态都不为UNKNOWN,则调用
  def completeRecovery() {
    // Ensure "only-once" recovery semantics using a short synchronization period.
    synchronized {
      if (state != RecoveryState.RECOVERING) { return }
      state = RecoveryState.COMPLETING_RECOVERY
    }

    // Kill off any workers and apps that didn't respond to us. 删除在60s内没有回应的app和worker
    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
    apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

    // Reschedule drivers which were not claimed by any workers
    drivers.filter(_.worker.isEmpty).foreach { d => // 如果driver的worker为空,则relaunch。
      logWarning(s"Driver ${d.id} was not found after master recovery")
      if (d.desc.supervise) {
        logWarning(s"Re-launching ${d.id}")
        relaunchDriver(d)
      } else {
        removeDriver(d.id, DriverState.ERROR, None)
        logWarning(s"Did not re-launch ${d.id} because it was not supervised")
      }
    }

    state = RecoveryState.ALIVE
    schedule() 
    logInfo("Recovery complete - resuming operations!")
  }

但是对于一个拥有几千个节点的集群来说,60s设置的是否合理?毕竟现在没有使用Standalone模式部署几千个节点的吧?因此硬编码60s看上去也十分合理,毕竟都是逻辑很简单的调用,如果一些节点60S没有返回,那么下线这部分机器也是合理的。

通过设置spark.worker.timeout,可以自定义超时时间。


目录
相关文章
|
6月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
711 1
|
6月前
|
消息中间件 分布式计算 大数据
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
【大数据技术Hadoop+Spark】Flume、Kafka的简介及安装(图文解释 超详细)
260 0
|
6月前
|
SQL 分布式计算 大数据
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)
135 0
|
6月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
226 0
|
6月前
|
分布式计算 大数据 Apache
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
153 0
|
2天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
17 2
|
3天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
18 1
|
2月前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
68 0
|
4月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
4月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
141 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享