本文针对于「Spark2.4」系列分析
/** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * Spark功能的主要入口点。SparkContext表示与Spark集群的连接,可用于在该集群上创建RDD,累加器和广播变量。 * * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. * 每个JVM只能激活一个SparkContext。在创建新的SparkContext之前,您必须“停止()”活动的SparkContext。 * 此限制可能最终会消除;有关更多详细信息,请参见SPARK-2243 * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. * 传入参数config是一个描述应用程序配置的Spark Config对象。此配置中的任何设置都会覆盖默认配置以及系统属性 */
SparkDriver核心组件
须知
一、 CallSite创建
什么叫CallSite?CallSite有什么用?
/** CallSite represents a place in user code. It can have a short and a long form. */ CallSite表示用户代码中的一个位置。它可以有短的和长的形式。(最短栈、最长栈) private[spark] case class CallSite(shortForm: String, longForm: String)
源码中通过「getCallSite()」 方法配置返回CallSite
参数示意:
参数英文名 | 参数含义 |
lastSparkMethod | 方法存入 |
firstUserFile | 类名存入 |
firstUserLine | 行号存入 |
源码如下:
def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = { // Keep crawling up the stack trace until we find the first function not inside of the spark // package. We track the last (shallowest) contiguous Spark method. This might be an RDD // transformation, a SparkContext function (such as parallelize), or anything else that leads // to instantiation of an RDD. We also track the first (deepest) user method, file, and line. var lastSparkMethod = "<unknown>" var firstUserFile = "<unknown>" var firstUserLine = 0 var insideSpark = true val callStack = new ArrayBuffer[String]() :+ "<unknown>" Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement => // When running under some profilers, the current stack trace might contain some bogus // frames. This is intended to ensure that we don't crash in these situations by // ignoring any frames that we can't examine. if (ste != null && ste.getMethodName != null && !ste.getMethodName.contains("getStackTrace")) { if (insideSpark) { if (skipClass(ste.getClassName)) { lastSparkMethod = if (ste.getMethodName == "<init>") { // Spark method is a constructor; get its class name ste.getClassName.substring(ste.getClassName.lastIndexOf('.') + 1) } else { ste.getMethodName } callStack(0) = ste.toString // Put last Spark method on top of the stack trace. } else { if (ste.getFileName != null) { firstUserFile = ste.getFileName if (ste.getLineNumber >= 0) { firstUserLine = ste.getLineNumber } } callStack += ste.toString insideSpark = false } } else { callStack += ste.toString } } } val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt val shortForm = if (firstUserFile == "HiveSessionImpl.java") { // To be more user friendly, show a nicer string for queries submitted from the JDBC // server. "Spark JDBC Server Query" } else { s"$lastSparkMethod at $firstUserFile:$firstUserLine" } val longForm = callStack.take(callStackDepth).mkString("\n") CallSite(shortForm, longForm) }
客户端结果:
举例:WordCount例子中,获得数据如下 最短栈:SparkContext at MyWorkCount.scala:7 最长栈:org.apache.spark.SparkContext.<init>(SparkContext.scala:76) com.spark.MyWorkCount$.main(MyWorkCount.scala:7) com.spark.MyWorkCount.main(MyWorkCount.scala)
二、ActiveContext取舍
// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active // 如果为true,则在多个SparkContext处于活动状态时记录警告而不是引发异常 默认false private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false) // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having started construction. // NOTE: this must be placed at the beginning of the SparkContext constructor. // 为了防止同时激活多个SparkContext,将此上下文标记为active。以防止多个SparkContext实例同时成为active级别的。 SparkContext.markPartiallyConstructed(this, allowMultipleContexts) val startTime = System.currentTimeMillis() private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false) //断言存活的SparkCntext private[spark] def assertNotStopped(): Unit = { if (stopped.get()) { val activeContext = SparkContext.activeContext.get() val activeCreationSite = if (activeContext == null) { "(No active SparkContext.)" } else { activeContext.creationSite.longForm } throw new IllegalStateException( s"""Cannot call methods on a stopped SparkContext. |This stopped SparkContext was created at: | |${creationSite.longForm} | |The currently active SparkContext was created at: | |$activeCreationSite """.stripMargin) } }
正式篇
一、读取SparkConf、日志压缩配置
Spark配置类,配置已键值对形式存储,封装了一个ConcurrentHashMap类实例settings用于存储Spark的配置信息。
//copy一份配置文件 _conf = config.clone() //必要信息检查,验证提交配置项参数、提交方式 _conf.validateSettings() //检查部署模式spark.master配置 if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } //检查spark.app.name配置 if (!_conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration") } // log out spark.app.name in the Spark driver logs 打印应用程序名称 logInfo(s"Submitted application: $appName") // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster // 如果用户代码由AM在YARN群集上运行,则必须设置系统属性spark.yarn.app.id if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") } // 检查日志配置 if (_conf.getBoolean("spark.logConf", false)) { logInfo("Spark configuration:\n" + _conf.toDebugString) } // Set Spark driver host and port system properties. This explicitly sets the configuration // instead of relying on the default value of the config constant. // 设置Spark驱动程序主机和端口系统属性。这将显式设置配置而不依赖于config常量的默认值 _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS)) _conf.setIfMissing("spark.driver.port", "0") _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) //获取用户传入jar包 //在YARN模式下,它将返回一个空列表,因为YARN 具有自己的分发jar的机制。 _jars = Utils.getUserJars(_conf) //获取用户传入的文件 _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten //事件日志目录 _eventLogDir = if (isEventLogEnabled) { val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) .stripSuffix("/") Some(Utils.resolveURI(unresolvedDir)) } else { None } //事件日志压缩 默认flase不压缩 _eventLogCodec = { val compress = _conf.getBoolean("spark.eventLog.compress", false) if (compress && isEventLogEnabled) { Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) } else { None } }
二、初始化LiveListenerBus
SparkContext 中的事件总线,可以接收各种使用方的事件,并且异步传递Spark事件监听与SparkListeners监听器的注册。
//创建生命周期监听总线 _listenerBus = new LiveListenerBus(_conf) // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. // 在创建SparkEnv之前 初始化 应用程序状态存储 和 侦听器,以便获取所有事件 _statusStore = AppStatusStore.createLiveStore(conf) listenerBus.addToStatusQueue(_statusStore.listener.get)