【源码解读】|SparkContext源码解读(上)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 【源码解读】|SparkContext源码解读

本文针对于「Spark2.4」系列分析

/**
 * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
 * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
 *  Spark功能的主要入口点。SparkContext表示与Spark集群的连接,可用于在该集群上创建RDD,累加器和广播变量。
 *
 * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before
 * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.
 * 每个JVM只能激活一个SparkContext。在创建新的SparkContext之前,您必须“停止()”活动的SparkContext。
 * 此限制可能最终会消除;有关更多详细信息,请参见SPARK-2243
 *
 * @param config a Spark Config object describing the application configuration. Any settings in
 *   this config overrides the default configs as well as system properties.
 *   传入参数config是一个描述应用程序配置的Spark Config对象。此配置中的任何设置都会覆盖默认配置以及系统属性
 */

640.png

SparkDriver核心组件

须知

一、 CallSite创建

什么叫CallSite?CallSite有什么用?

/** CallSite represents a place in user code. It can have a short and a long form. */
CallSite表示用户代码中的一个位置。它可以有短的和长的形式。(最短栈、最长栈)
private[spark] case class CallSite(shortForm: String, longForm: String)

源码中通过「getCallSite()」 方法配置返回CallSite

参数示意:

参数英文名 参数含义
lastSparkMethod 方法存入
firstUserFile 类名存入
firstUserLine 行号存入

源码如下:

def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
    // Keep crawling up the stack trace until we find the first function not inside of the spark
    // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
    // transformation, a SparkContext function (such as parallelize), or anything else that leads
    // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
    var lastSparkMethod = "<unknown>"
    var firstUserFile = "<unknown>"
    var firstUserLine = 0
    var insideSpark = true
    val callStack = new ArrayBuffer[String]() :+ "<unknown>"
    Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement =>
      // When running under some profilers, the current stack trace might contain some bogus
      // frames. This is intended to ensure that we don't crash in these situations by
      // ignoring any frames that we can't examine.
      if (ste != null && ste.getMethodName != null
        && !ste.getMethodName.contains("getStackTrace")) {
        if (insideSpark) {
          if (skipClass(ste.getClassName)) {
            lastSparkMethod = if (ste.getMethodName == "<init>") {
              // Spark method is a constructor; get its class name
              ste.getClassName.substring(ste.getClassName.lastIndexOf('.') + 1)
            } else {
              ste.getMethodName
            }
            callStack(0) = ste.toString // Put last Spark method on top of the stack trace.
          } else {
            if (ste.getFileName != null) {
              firstUserFile = ste.getFileName
              if (ste.getLineNumber >= 0) {
                firstUserLine = ste.getLineNumber
              }
            }
            callStack += ste.toString
            insideSpark = false
          }
        } else {
          callStack += ste.toString
        }
      }
    }
    val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
    val shortForm =
      if (firstUserFile == "HiveSessionImpl.java") {
        // To be more user friendly, show a nicer string for queries submitted from the JDBC
        // server.
        "Spark JDBC Server Query"
      } else {
        s"$lastSparkMethod at $firstUserFile:$firstUserLine"
      }
    val longForm = callStack.take(callStackDepth).mkString("\n")
    CallSite(shortForm, longForm)
  }

客户端结果:

举例:WordCount例子中,获得数据如下
  最短栈:SparkContext at MyWorkCount.scala:7
  最长栈:org.apache.spark.SparkContext.<init>(SparkContext.scala:76)
            com.spark.MyWorkCount$.main(MyWorkCount.scala:7)
          com.spark.MyWorkCount.main(MyWorkCount.scala)

二、ActiveContext取舍

// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
  // 如果为true,则在多个SparkContext处于活动状态时记录警告而不是引发异常  默认false
  private val allowMultipleContexts: Boolean =
    config.getBoolean("spark.driver.allowMultipleContexts", false)
  // In order to prevent multiple SparkContexts from being active at the same time, mark this
  // context as having started construction.
  // NOTE: this must be placed at the beginning of the SparkContext constructor.
  // 为了防止同时激活多个SparkContext,将此上下文标记为active。以防止多个SparkContext实例同时成为active级别的。
  SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
  val startTime = System.currentTimeMillis()
  private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false)
  //断言存活的SparkCntext
  private[spark] def assertNotStopped(): Unit = {
    if (stopped.get()) {
      val activeContext = SparkContext.activeContext.get()
      val activeCreationSite =
        if (activeContext == null) {
          "(No active SparkContext.)"
        } else {
          activeContext.creationSite.longForm
        }
      throw new IllegalStateException(
        s"""Cannot call methods on a stopped SparkContext.
           |This stopped SparkContext was created at:
           |
           |${creationSite.longForm}
           |
           |The currently active SparkContext was created at:
           |
           |$activeCreationSite
         """.stripMargin)
    }
  }

正式篇

一、读取SparkConf、日志压缩配置

Spark配置类,配置已键值对形式存储,封装了一个ConcurrentHashMap类实例settings用于存储Spark的配置信息。

//copy一份配置文件
  _conf = config.clone()
  //必要信息检查,验证提交配置项参数、提交方式 
    _conf.validateSettings()
  //检查部署模式spark.master配置
    if (!_conf.contains("spark.master")) { 
      throw new SparkException("A master URL must be set in your configuration")
    }
    //检查spark.app.name配置
    if (!_conf.contains("spark.app.name")) {  
      throw new SparkException("An application name must be set in your configuration")
    }
    // log out spark.app.name in the Spark driver logs 打印应用程序名称
    logInfo(s"Submitted application: $appName")
    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    // 如果用户代码由AM在YARN群集上运行,则必须设置系统属性spark.yarn.app.id
    if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
      throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    }
    // 检查日志配置
    if (_conf.getBoolean("spark.logConf", false)) {
      logInfo("Spark configuration:\n" + _conf.toDebugString)
    }
    // Set Spark driver host and port system properties. This explicitly sets the configuration
    // instead of relying on the default value of the config constant.
    // 设置Spark驱动程序主机和端口系统属性。这将显式设置配置而不依赖于config常量的默认值
    _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
    _conf.setIfMissing("spark.driver.port", "0")
    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
 //获取用户传入jar包
 //在YARN模式下,它将返回一个空列表,因为YARN 具有自己的分发jar的机制。
    _jars = Utils.getUserJars(_conf)  
    //获取用户传入的文件
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
      .toSeq.flatten  
 //事件日志目录
    _eventLogDir =    
      if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
      } else {
        None
      }
 //事件日志压缩 默认flase不压缩
    _eventLogCodec = {   
      val compress = _conf.getBoolean("spark.eventLog.compress", false)
      if (compress && isEventLogEnabled) {
        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
      } else {
        None
      }
    }

二、初始化LiveListenerBus

SparkContext 中的事件总线,可以接收各种使用方的事件,并且异步传递Spark事件监听与SparkListeners监听器的注册。

//创建生命周期监听总线
    _listenerBus = new LiveListenerBus(_conf) 
    // Initialize the app status store and listener before SparkEnv is created so that it gets
    // all events.
    // 在创建SparkEnv之前 初始化 应用程序状态存储 和 侦听器,以便获取所有事件
    _statusStore = AppStatusStore.createLiveStore(conf)
    listenerBus.addToStatusQueue(_statusStore.listener.get)
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
缓存 分布式计算 监控
【源码解读】| LiveListenerBus源码解读(上)
【源码解读】| LiveListenerBus源码解读
177 0
【源码解读】| LiveListenerBus源码解读(上)
|
存储 SQL 分布式计算
【源码解读】| LiveListenerBus源码解读(下)
【源码解读】| LiveListenerBus源码解读
169 0
【源码解读】| LiveListenerBus源码解读(下)
|
存储 JSON 缓存
【源码解读】|SparkContext源码解读(下)
【源码解读】|SparkContext源码解读
|
存储 分布式计算 监控
【源码解读】|SparkEnv源码解读
【源码解读】|SparkEnv源码解读
146 0
|
XML 缓存 Java
Apache OFbiz MiniLang 源码解读
MiniLang所有元素的父类——MiniLangElement MiniLang 是基于XML的“描述型语言”。所有的元素,包括节点、属性都继承自该类。它包含三个属性: lineNumber:表示解析MiniLang的源码(通常是Java)所处的行号,主要是为了便于日志记录 tagName:当前元素的tag名称,主要用于日志记录 simpleMethod:simpleMethod是一个大的“传输对象”,里面实现了MiniLang支持的所有执行方式,其作用类似于serviceengine中的serviceDispatcher。
2005 0
|
监控 前端开发 调度
swoft 源码解读【转】
官网: https://www.swoft.org/ 源码解读: http://naotu.baidu.com/file/814e81c9781b733e04218ac7a0494e2a?token=f009094c71a791c5 号外号外, 欢迎大家...
2012 0
深入ggtree:ggtree()源码解读
GGTREE的快速使用函数就是ggtree,源码如下, function (tr, mapping = NULL, layout = "rectangular", open.
1541 0