三、创建SparkENV对象(DriverENV)
SparkContext中非常重要的类,它维护着Spark的执行环境,所有的线程都可以通过SparkContext访问到同一个SparkEnv对象。包含一些rpc创建……etc.
「LiveListenerBus」 生命周期监听总线
// Create the Spark execution environment (cache, map output tracker, etc) // 创建SparkEev 执行环境(cache, map输出追踪器, 等等) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) // If running the REPL, register the repl's output dir with the file server. //REPL-> “读取-求值-输出”循环(英语:Read-Eval-Print Loop,简称REPL)指的是一个简单的,交互式的编程环境 // 如果运行REPL,请向文件服务器注册repl的输出目录。 _conf.getOption("spark.repl.class.outputDir").foreach { path => val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path)) _conf.set("spark.repl.class.uri", replUri) } -------------------------------------------------------------------------- private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf)) }
四、初始化SparkStatusTracker
低级别的状态报告API,只能提供非常脆弱的一致性机制,对Job(作业)、Stage(阶段)的状态进行监控。
//用于监视job和stage的进度 //注意SparkStatusTracker中API提供非常弱的一致性语义,在Active阶段中有可能返回'None' _statusTracker = new SparkStatusTracker(this, _statusStore)
五、初始化ConsoleProgressBar
进度条 [stage1]====================>
//说白了就是console print的那个线。。。。。。 _progressBar = if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } -------------------------------------------------------------------- private def show(now: Long, stages: Seq[StageData]) { val width = TerminalWidth / stages.size val bar = stages.map { s => val total = s.numTasks val header = s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" val w = width - header.length - tailer.length val bar = if (w > 0) { val percent = w * s.numCompleteTasks / total (0 until w).map { i => if (i < percent) "=" else if (i == percent) ">" else " " }.mkString("") } else { "" } header + bar + tailer }.mkString("") // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed // after idle some time) if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) { System.err.print(CR + bar) lastUpdateTime = now } lastProgressBar = bar }
六、创建&初始化 Spark UI
Spark监控的web平台,提供了整个生命周期的监控包括任务、环境。
//是否允许UI开启 _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)) } else { // For tests, do not enable the UI None } // Bind the UI before starting the task scheduler to communicate // the bound port to the cluster manager properly // 在启动任务计划程序以将绑定的端口正确通信到集群管理器之前,先绑定UI _ui.foreach(_.bind()) //默认生成hadoop配置 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) // Add each JAR given through the constructor // jar和file添加 if (jars != null) { jars.foreach(addJar) } if (files != null) { files.foreach(addFile) }
七、ExecutorMemory配置
// executor内存 根据以下属性逐级查找 如果都没有的话最后使用1024MB _executorMemory = _conf.getOption("spark.executor.memory") .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")) .map(warnSparkMem)) .map(Utils.memoryStringToMb) .getOrElse(1024) // Convert java options to env vars as a work around // since we can't set env vars directly in sbt. for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => executorEnvs("SPARK_PREPEND_CLASSES") = v } // The Mesos scheduler backend relies on this environment variable to set executor memory. // Mesos调度程序后端依赖于此环境变量来设置执行程序内存。 // TODO: Set this only in the Mesos scheduler. executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser
八、注册HeartbeatReceiver
心跳接收器,所有 Executor 都会向HeartbeatReceiver 发送心跳,当其接收到 Executor 的心跳信息后,首先更新 Executor 的最后可见时间,然后将此信息交给 TaskScheduler 进一步处理。
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) // 我们需要在“ createTaskScheduler”之前注册“ HeartbeatReceiver”, // 因为执行器将在构造函数中检索“ HeartbeatReceiver”。 (SPARK-6640) _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
九、创建TaskScheduler
Spark任务调度器,负责任务的提交,并且请求集群管理器对任务调度。由于它调度的Task是有DagScheduler创建,所以DagScheduler是它的前置调度器。
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts
十、创建&启动DAGScheduler
一个基于Stage的调度器, 负责创建 Job,将 DAG 中的 RDD 划分到不同的 Stage,并将Stage作为Tasksets提交给底层调度器TaskScheduler执行。
//创建DAGScheduler 传入当前SparkContext对象,然后又去取出taskScheduler // def this(sc: SparkContext) = this(sc, sc.taskScheduler) _dagScheduler = new DAGScheduler(this) //绑定心跳执行器 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor // 在taskScheduler在DAGScheduler的构造函数中设置DAGScheduler引用之后,初始化TaskScheduler _taskScheduler.start() _applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) if (_conf.getBoolean("spark.ui.reverseProxy", false)) { System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId) } _ui.foreach(_.setAppId(_applicationId)) // 啪啪啪一丢设置后 UI和任务关联
十一、初始化BlockManager
在DAGShceduler中有一个BlockManagerMaster对象,该对象的工作就是负责管理全局所有BlockManager的元数据,当集群中有BlockManager注册完成的时候,其会向BlockManagerMaster发送自己元数据信息;BlockManagerMaster会为BlockManager创建一个属于这个BlockManager的BlockManagerInfo,用于存放BlockManager的信息。
_env.blockManager.initialize(_applicationId)
十二、初始化MericsSystem
Spark webui 监控指标。包括Shuffle read/wirte gc...etc。spark运行时监控。
// The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. // 需要将驱动程序的指标系统设置为spark.app.id到应用程序ID。 // 因此,它应该在我们从任务计划程序获取应用程序ID并设置spark.app.id之后开始。 //启动指标监控系统 gc时间,shuffler read/write...etc. _env.metricsSystem.start() // Attach the driver metrics servlet handler to the web ui after the metrics system is started. // 启动指标系统后,将驱动程序指标servlet处理程序附加到Web ui。 _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
十三、创建EventLoggingListener
事件监听器将各种事件进行json转换
//创建事件日志监听 添加到总线列队中去(总线列队后面会详细讲~~~) _eventLogger = if (isEventLogEnabled) { val logger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() listenerBus.addToEventLogQueue(logger) Some(logger) } else { None }
十四、创建&启动资源划分管理器
根据配置判断是否开启动态资源管理器
// Optionally scale number of executors dynamically based on workload. Exposed for testing. // 可以根据工作负载动态伸缩执行器的数量spark.dynamicAllocation.enabled val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, _env.blockManager.master)) case _ => None } } else { None } _executorAllocationManager.foreach(_.start())
十五、创建ContextCleaner
上下文清理器,为RDD、shuffle、broadcast状态的异步清理器,清理超出应用范围的RDD、ShuffleDependency、Broadcast对象。
//根据spark.cleaner.referenceTracking 默认是true 创建ContextCleaner _cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } _cleaner.foreach(_.start())
十六、ExtraListeners配置
setupAndStartListenerBus()
十七、环境更新
postEnvironmentUpdate() --------------------------------------------------------------- //环境更新,这个就是在UI Environment界面显示的数据 private def postEnvironmentUpdate() { if (taskScheduler != null) { val schedulingMode = getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq val addedFilePaths = addedFiles.keys.toSeq val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths) val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails) listenerBus.post(environmentUpdate) } }
十八、投递程序启动事件
postApplicationStart() -------------------------------------------------------------- //主要关注post方法,发送事件。同样关注start() stop() private def postApplicationStart() { // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) }
十九、最后绑定注册
//关注postStartHook _taskScheduler.postStartHook() _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
二十、结束清理
// Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // is killed, though. // 如果用户忘记上下文,请确保上下文已停止。这样可以避免在JVM干净退出之后 // 保留未完成的事件日志。但是,如果杀死了JVM 则无济于事 logDebug("Adding shutdown hook") _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") try { stop() } catch { case e: Throwable => logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e) } } } catch { case NonFatal(e) => logError("Error initializing SparkContext.", e) try { stop() } catch { case NonFatal(inner) => logError("Error stopping SparkContext after init error.", inner) } finally { throw e } }
总结
- Spark 中的组件很多,涉及网络通信、分布式、消息、存储、计算、缓存、测量、清理、文件服务、Web UI 的等等。
- Spark中大量采用事件监听方式,实现driver端的组件之间的通信。