【源码解读】|SparkContext源码解读(下)

简介: 【源码解读】|SparkContext源码解读


三、创建SparkENV对象(DriverENV)

SparkContext中非常重要的类,它维护着Spark的执行环境,所有的线程都可以通过SparkContext访问到同一个SparkEnv对象。包含一些rpc创建……etc.

「LiveListenerBus」 生命周期监听总线

// Create the Spark execution environment (cache, map output tracker, etc)
    // 创建SparkEev 执行环境(cache, map输出追踪器, 等等)
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
 // If running the REPL, register the repl's output dir with the file server.
 //REPL-> “读取-求值-输出”循环(英语:Read-Eval-Print Loop,简称REPL)指的是一个简单的,交互式的编程环境
    // 如果运行REPL,请向文件服务器注册repl的输出目录。
    _conf.getOption("spark.repl.class.outputDir").foreach { path =>
      val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
      _conf.set("spark.repl.class.uri", replUri)
    }
 --------------------------------------------------------------------------
 private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
  }

四、初始化SparkStatusTracker

低级别的状态报告API,只能提供非常脆弱的一致性机制,对Job(作业)、Stage(阶段)的状态进行监控。

//用于监视job和stage的进度
 //注意SparkStatusTracker中API提供非常弱的一致性语义,在Active阶段中有可能返回'None'
    _statusTracker = new SparkStatusTracker(this, _statusStore)

五、初始化ConsoleProgressBar

进度条  [stage1]====================>

//说白了就是console print的那个线。。。。。。
 _progressBar =  
      if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
        Some(new ConsoleProgressBar(this))
      } else {
        None
      }
     --------------------------------------------------------------------
    private def show(now: Long, stages: Seq[StageData]) {
    val width = TerminalWidth / stages.size
    val bar = stages.map { s =>
      val total = s.numTasks
      val header = s"[Stage ${s.stageId}:"
      val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
      val w = width - header.length - tailer.length
      val bar = if (w > 0) {
        val percent = w * s.numCompleteTasks / total
        (0 until w).map { i =>
          if (i < percent) "=" else if (i == percent) ">" else " "
        }.mkString("")
      } else {
        ""
      }
      header + bar + tailer
    }.mkString("")
    // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
    // after idle some time)
    if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
      System.err.print(CR + bar)
      lastUpdateTime = now
    }
    lastProgressBar = bar
  }

六、创建&初始化 Spark UI

Spark监控的web平台,提供了整个生命周期的监控包括任务、环境。

//是否允许UI开启
 _ui = 
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
          startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
    // Bind the UI before starting the task scheduler to communicate
    // the bound port to the cluster manager properly
    // 在启动任务计划程序以将绑定的端口正确通信到集群管理器之前,先绑定UI
    _ui.foreach(_.bind())
 //默认生成hadoop配置
    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) 
    // Add each JAR given through the constructor
    // jar和file添加
    if (jars != null) {
      jars.foreach(addJar)
    }
    if (files != null) {
      files.foreach(addFile)
    }

七、ExecutorMemory配置

// executor内存 根据以下属性逐级查找 如果都没有的话最后使用1024MB
    _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)
    // Convert java options to env vars as a work around
    // since we can't set env vars directly in sbt.
    for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
      value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
      executorEnvs(envKey) = value
    }
    Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
      executorEnvs("SPARK_PREPEND_CLASSES") = v
    }
    // The Mesos scheduler backend relies on this environment variable to set executor memory.
    // Mesos调度程序后端依赖于此环境变量来设置执行程序内存。
    // TODO: Set this only in the Mesos scheduler.
    executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
    executorEnvs ++= _conf.getExecutorEnv
    executorEnvs("SPARK_USER") = sparkUser

八、注册HeartbeatReceiver

心跳接收器,所有 Executor 都会向HeartbeatReceiver 发送心跳,当其接收到 Executor 的心跳信息后,首先更新 Executor 的最后可见时间,然后将此信息交给 TaskScheduler 进一步处理。

// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
    // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
    // 我们需要在“ createTaskScheduler”之前注册“ HeartbeatReceiver”,
    // 因为执行器将在构造函数中检索“ HeartbeatReceiver”。 (SPARK-6640)
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

九、创建TaskScheduler

Spark任务调度器,负责任务的提交,并且请求集群管理器对任务调度。由于它调度的Task是有DagScheduler创建,所以DagScheduler是它的前置调度器。

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts

十、创建&启动DAGScheduler

一个基于Stage的调度器, 负责创建 Job,将 DAG 中的 RDD 划分到不同的 Stage,并将Stage作为Tasksets提交给底层调度器TaskScheduler执行。

//创建DAGScheduler 传入当前SparkContext对象,然后又去取出taskScheduler
 // def this(sc: SparkContext) = this(sc, sc.taskScheduler)
  _dagScheduler = new DAGScheduler(this)  
    //绑定心跳执行器                              
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) 
    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor
    // 在taskScheduler在DAGScheduler的构造函数中设置DAGScheduler引用之后,初始化TaskScheduler
    _taskScheduler.start()
    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
      System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
    }
    _ui.foreach(_.setAppId(_applicationId)) // 啪啪啪一丢设置后  UI和任务关联

十一、初始化BlockManager

在DAGShceduler中有一个BlockManagerMaster对象,该对象的工作就是负责管理全局所有BlockManager的元数据,当集群中有BlockManager注册完成的时候,其会向BlockManagerMaster发送自己元数据信息;BlockManagerMaster会为BlockManager创建一个属于这个BlockManager的BlockManagerInfo,用于存放BlockManager的信息。

_env.blockManager.initialize(_applicationId)

十二、初始化MericsSystem

Spark webui 监控指标。包括Shuffle read/wirte gc...etc。spark运行时监控。

// The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
     // 需要将驱动程序的指标系统设置为spark.app.id到应用程序ID。
     // 因此,它应该在我们从任务计划程序获取应用程序ID并设置spark.app.id之后开始。
    //启动指标监控系统 gc时间,shuffler read/write...etc.
    _env.metricsSystem.start()
    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
    // 启动指标系统后,将驱动程序指标servlet处理程序附加到Web ui。
    _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

十三、创建EventLoggingListener

事件监听器将各种事件进行json转换

//创建事件日志监听 添加到总线列队中去(总线列队后面会详细讲~~~)
 _eventLogger = 
      if (isEventLogEnabled) {
        val logger =
          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
            _conf, _hadoopConfiguration)
        logger.start()
        listenerBus.addToEventLogQueue(logger)
        Some(logger)
      } else {
        None
      }

十四、创建&启动资源划分管理器

根据配置判断是否开启动态资源管理器

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
    // 可以根据工作负载动态伸缩执行器的数量spark.dynamicAllocation.enabled
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        schedulerBackend match {
          case b: ExecutorAllocationClient =>
            Some(new ExecutorAllocationManager(
              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
              _env.blockManager.master))
          case _ =>
            None
        }
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())

十五、创建ContextCleaner

上下文清理器,为RDD、shuffle、broadcast状态的异步清理器,清理超出应用范围的RDD、ShuffleDependency、Broadcast对象。

//根据spark.cleaner.referenceTracking 默认是true 创建ContextCleaner
 _cleaner =  
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

十六、ExtraListeners配置

setupAndStartListenerBus()

十七、环境更新

postEnvironmentUpdate()
---------------------------------------------------------------
//环境更新,这个就是在UI Environment界面显示的数据
private def postEnvironmentUpdate() {
    if (taskScheduler != null) {
      val schedulingMode = getSchedulingMode.toString
      val addedJarPaths = addedJars.keys.toSeq
      val addedFilePaths = addedFiles.keys.toSeq
      val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
        addedFilePaths)
      val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
      listenerBus.post(environmentUpdate)
    }
  }

十八、投递程序启动事件

postApplicationStart()
--------------------------------------------------------------
 //主要关注post方法,发送事件。同样关注start() stop()
 private def postApplicationStart() {
    // Note: this code assumes that the task scheduler has been initialized and has contacted
    // the cluster manager to get an application ID (in case the cluster manager provides one).
    listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
      startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
  }

十九、最后绑定注册

//关注postStartHook
  _taskScheduler.postStartHook()
    _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
    _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
    _executorAllocationManager.foreach { e =>
      _env.metricsSystem.registerSource(e.executorAllocationManagerSource)

二十、结束清理

// Make sure the context is stopped if the user forgets about it. This avoids leaving
    // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
    // is killed, though.
    // 如果用户忘记上下文,请确保上下文已停止。这样可以避免在JVM干净退出之后
    // 保留未完成的事件日志。但是,如果杀死了JVM 则无济于事
   logDebug("Adding shutdown hook") 
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      try {
        stop()
      } catch {
        case e: Throwable =>
          logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
      }
    }
  } catch {
    case NonFatal(e) =>
      logError("Error initializing SparkContext.", e)
      try {
        stop()
      } catch {
        case NonFatal(inner) =>
          logError("Error stopping SparkContext after init error.", inner)
      } finally {
        throw e
      }
  }

总结

  • Spark 中的组件很多,涉及网络通信、分布式、消息、存储、计算、缓存、测量、清理、文件服务、Web UI 的等等。
  • Spark中大量采用事件监听方式,实现driver端的组件之间的通信。
相关文章
|
存储 分布式计算 Spark
【源码解读】|SparkContext源码解读(上)
【源码解读】|SparkContext源码解读
198 0
【源码解读】|SparkContext源码解读(上)
|
存储 分布式计算 监控
【源码解读】|SparkEnv源码解读
【源码解读】|SparkEnv源码解读
138 0
|
SQL 分布式计算 NoSQL
Spark从入门到入土(五):SparkSQL原理与实战
Spark从入门到入土(五):SparkSQL原理与实战
Spark从入门到入土(五):SparkSQL原理与实战
|
分布式计算 Spark 索引
Spark2.4.0源码分析之WorldCount ShuffleMapTask处理(八)
- 理解Executor中是如何调用Task的过程 - 理解ShuffleMapTask是处理过程
1617 0
|
分布式计算 Spark 机器学习/深度学习
Spark2.4.0源码分析之WorldCount FinalRDD构建(一)
- Spark dataSet执行计算转成FinalRDD - FinalRdd从第一个RDD到最到一个RDD的转化过程 - RDD之间的依赖引用关系 - ShuffleRowRDD默认分区器为HashPartitioning,实际new Partitioner,分区个数为200
1110 0
|
分布式计算 Spark Hadoop
Spark MapOutputTracker源码分析
## 技能标签 - Spark ShuffleMapTask处理完成后,把MapStatus数据(BlockManagerId,[compressSize])发送给MapOutputTrackerMaster.
1693 0
|
分布式计算 Apache Scala
SparkContext 源码分析
SparkContext 源码分析 更多资源 github: https://github.com/opensourceteams/spark-scala-maven csdn(汇总视频在线看): https://blog.
1076 0