private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { // Note that there is a chance that this task is launched after the stage is cancelled. // In that case, we wouldn't have the stage anymore in stageIdToStage. val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1) listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) } private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = { listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId)) } private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) { listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) } .........................................
/** Called when stage's parents are available and we can now do its task. * 在stages父类有空闲的时候,就可以去执行task * */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // First figure out the indexes of partition ids to compute. //1. 当前Stage没有计算完的分区对应的索引 val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage //2. 关联ActiveJob中的调度池,作业组,描述等 val properties = jobIdToActiveJob(jobId).properties //3. 将当前stage加入runningStages集合 runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. //4. 根据Stage类别,计算分区位置 stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return }
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], // func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //1.创建最终FinalStage(ResultStage) } catch { case e: BarrierJobSlotsNumberCheckFailed => logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + "than the total number of slots in the cluster currently.") // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically. val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { override def apply(key: Int, value: Int): Int = value + 1 }) if (numCheckFailures <= maxFailureNumTasksCheck) { messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, partitions, callSite, listener, properties)) }, timeIntervalNumTasksCheck, TimeUnit.SECONDS ) return } else { // Job failed, clear internal data..... ... private[scheduler] def cleanUpAfterSchedulerStop() { for (job <- activeJobs) { val error = new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down") job.listener.jobFailed(error) // Tell the listeners that all of the running stages have ended. Don't bother // cancelling the stages because if the DAG scheduler is stopped, the entire application // is in the process of getting stopped. val stageFailedMessage = "Stage cancelled because SparkContext was shut down" // The `toArray` here is necessary so that we don't iterate over `runningStages` while // mutating it. runningStages.toArray.foreach { stage => markStageAsFinished(stage, Some(stageFailedMessage)) } listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error))) } }
AsyncEventQueue 异步事件处理
AsyncEventQueue 功能点
- dispatchThread
AsyncEventQueue 父类doPostEvent方法实现
protected override def doPostEvent( listener: SparkListenerInterface, event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => listener.onStageSubmitted(stageSubmitted) case stageCompleted: SparkListenerStageCompleted => listener.onStageCompleted(stageCompleted) case jobStart: SparkListenerJobStart => listener.onJobStart(jobStart) case jobEnd: SparkListenerJobEnd => listener.onJobEnd(jobEnd) case taskStart: SparkListenerTaskStart => listener.onTaskStart(taskStart) case taskGettingResult: SparkListenerTaskGettingResult => listener.onTaskGettingResult(taskGettingResult) case taskEnd: SparkListenerTaskEnd => listener.onTaskEnd(taskEnd) case environmentUpdate: SparkListenerEnvironmentUpdate => listener.onEnvironmentUpdate(environmentUpdate) case blockManagerAdded: SparkListenerBlockManagerAdded => listener.onBlockManagerAdded(blockManagerAdded) case blockManagerRemoved: SparkListenerBlockManagerRemoved => listener.onBlockManagerRemoved(blockManagerRemoved) case unpersistRDD: SparkListenerUnpersistRDD => listener.onUnpersistRDD(unpersistRDD) case applicationStart: SparkListenerApplicationStart => listener.onApplicationStart(applicationStart) case applicationEnd: SparkListenerApplicationEnd => listener.onApplicationEnd(applicationEnd) case metricsUpdate: SparkListenerExecutorMetricsUpdate => listener.onExecutorMetricsUpdate(metricsUpdate) case executorAdded: SparkListenerExecutorAdded => listener.onExecutorAdded(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => listener.onExecutorRemoved(executorRemoved) case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage => listener.onExecutorBlacklistedForStage(executorBlacklistedForStage) case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage => listener.onNodeBlacklistedForStage(nodeBlacklistedForStage) case executorBlacklisted: SparkListenerExecutorBlacklisted => listener.onExecutorBlacklisted(executorBlacklisted) case executorUnblacklisted: SparkListenerExecutorUnblacklisted => listener.onExecutorUnblacklisted(executorUnblacklisted) case nodeBlacklisted: SparkListenerNodeBlacklisted => listener.onNodeBlacklisted(nodeBlacklisted) case nodeUnblacklisted: SparkListenerNodeUnblacklisted => listener.onNodeUnblacklisted(nodeUnblacklisted) case blockUpdated: SparkListenerBlockUpdated => listener.onBlockUpdated(blockUpdated) case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted => listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted) case _ => listener.onOtherEvent(event) } }
Streaming 后续会详细分析
Spark UI中Job、Stage、Task页面,调用AppStatusStore提供的方法,读取kvstore中存储的rdd任务相关信息。
** * A Spark listener that writes application information to a data store. The types written to the * store are defined in the `storeTypes.scala` file and are based on the public REST API. * Spark监听器,将应用程序信息写入数据存储。写入的类型 * store定义在' storeTypes中。scala '文件,并且基于公共REST API。 * @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of * unfinished tasks can be more accurately calculated (see SPARK-21922). */ private[spark] class AppStatusListener( kvstore: ElementTrackingStore, conf: SparkConf, live: Boolean, lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
Spark UI中SQL页面,调用SQLAppStatusStore提供的方法,读取kvstore中存储的SparkPlan物理计划(SQL真实执行流程)相关信息。
class SQLAppStatusListener( conf: SparkConf, kvstore: ElementTrackingStore, live: Boolean) extends SparkListener with Logging {