【源码解读】| LiveListenerBus源码解读(下)

简介: 【源码解读】| LiveListenerBus源码解读

Task执行启动及获取Result

private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
    // Note that there is a chance that this task is launched after the stage is cancelled.
    // In that case, we wouldn't have the stage anymore in stageIdToStage.
    val stageAttemptId =
      stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
    listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
  }
  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
  }
  private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
    listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
  }
  .........................................

640.png

Stage的启动停止

/** Called when stage's parents are available and we can now do its task.
   * 在stages父类有空闲的时候,就可以去执行task
   * */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // First figure out the indexes of partition ids to compute.
    //1. 当前Stage没有计算完的分区对应的索引
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    //2. 关联ActiveJob中的调度池,作业组,描述等
    val properties = jobIdToActiveJob(jobId).properties
    //3. 将当前stage加入runningStages集合
    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    //4. 根据Stage类别,计算分区位置
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

Job启动停止

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],                         //
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //1.创建最终FinalStage(ResultStage)
    } catch {
      case e: BarrierJobSlotsNumberCheckFailed =>
        logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
          "than the total number of slots in the cluster currently.")
        // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
        val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
          new BiFunction[Int, Int, Int] {
            override def apply(key: Int, value: Int): Int = value + 1
          })
        if (numCheckFailures <= maxFailureNumTasksCheck) {
          messageScheduler.schedule(
            new Runnable {
              override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
                partitions, callSite, listener, properties))
            },
            timeIntervalNumTasksCheck,
            TimeUnit.SECONDS
          )
          return
        } else {
          // Job failed, clear internal data.....
          ...
    private[scheduler] def cleanUpAfterSchedulerStop() {
    for (job <- activeJobs) {
      val error =
        new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
      job.listener.jobFailed(error)
      // Tell the listeners that all of the running stages have ended.  Don't bother
      // cancelling the stages because if the DAG scheduler is stopped, the entire application
      // is in the process of getting stopped.
      val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
      // The `toArray` here is necessary so that we don't iterate over `runningStages` while
      // mutating it.
      runningStages.toArray.foreach { stage =>
        markStageAsFinished(stage, Some(stageFailedMessage))
      }
      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
    }
  }

AsyncEventQueue 异步事件处理


AsyncEventQueue类图继承结构

0449eaa80f97351a56463f458e49ccad.png

AsyncEventQueue方法列表

bfa075a1f91a3e769befbd65d54c31c8.png

AsyncEventQueue 功能点

  • dispatchThread
    AsyncEventQueue内部具有一个单一线程的dispatchThread,调用dispatch()–>postToAll()–>doPostEvent()方法持续处理eventQueue中事件,让所有注册的listener响应事件

AsyncEventQueue 父类doPostEvent方法实现

StreamingListenerBus及StreamingQueryListenerBus重写了doPostEvent(),只关注和处理流相关的事件。

从方法中看出,除了事件匹配还用了SparkListenerInterface

protected override def doPostEvent(
      listener: SparkListenerInterface,
      event: SparkListenerEvent): Unit = {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
      case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)
      case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)
      case taskGettingResult: SparkListenerTaskGettingResult =>
        listener.onTaskGettingResult(taskGettingResult)
      case taskEnd: SparkListenerTaskEnd =>
        listener.onTaskEnd(taskEnd)
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        listener.onEnvironmentUpdate(environmentUpdate)
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        listener.onBlockManagerAdded(blockManagerAdded)
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        listener.onBlockManagerRemoved(blockManagerRemoved)
      case unpersistRDD: SparkListenerUnpersistRDD =>
        listener.onUnpersistRDD(unpersistRDD)
      case applicationStart: SparkListenerApplicationStart =>
        listener.onApplicationStart(applicationStart)
      case applicationEnd: SparkListenerApplicationEnd =>
        listener.onApplicationEnd(applicationEnd)
      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        listener.onExecutorMetricsUpdate(metricsUpdate)
      case executorAdded: SparkListenerExecutorAdded =>
        listener.onExecutorAdded(executorAdded)
      case executorRemoved: SparkListenerExecutorRemoved =>
        listener.onExecutorRemoved(executorRemoved)
      case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
        listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
      case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
        listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
      case executorBlacklisted: SparkListenerExecutorBlacklisted =>
        listener.onExecutorBlacklisted(executorBlacklisted)
      case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
        listener.onExecutorUnblacklisted(executorUnblacklisted)
      case nodeBlacklisted: SparkListenerNodeBlacklisted =>
        listener.onNodeBlacklisted(nodeBlacklisted)
      case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
        listener.onNodeUnblacklisted(nodeUnblacklisted)
      case blockUpdated: SparkListenerBlockUpdated =>
        listener.onBlockUpdated(blockUpdated)
      case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
        listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
      case _ => listener.onOtherEvent(event)
    }
  }

AsyncEventQueue事件处理流程

23ce35b6635418fd134a75ccd935c1fd.png

SparkListenerInterface分析

Streaming 后续会详细分析

21d18dde7d2f9f0d824a44b3dd8471ae.png

AppStatusListener

Spark UI中Job、Stage、Task页面,调用AppStatusStore提供的方法,读取kvstore中存储的rdd任务相关信息。

**
 * A Spark listener that writes application information to a data store. The types written to the
 * store are defined in the `storeTypes.scala` file and are based on the public REST API.
 * Spark监听器,将应用程序信息写入数据存储。写入的类型
 * store定义在' storeTypes中。scala '文件,并且基于公共REST API。
 * @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of
 *                       unfinished tasks can be more accurately calculated (see SPARK-21922).
 */
private[spark] class AppStatusListener(
    kvstore: ElementTrackingStore,
    conf: SparkConf,
    live: Boolean,
    lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {

SQLAppStatusListener

Spark UI中SQL页面,调用SQLAppStatusStore提供的方法,读取kvstore中存储的SparkPlan物理计划(SQL真实执行流程)相关信息。

class SQLAppStatusListener(
    conf: SparkConf,
    kvstore: ElementTrackingStore,
    live: Boolean) extends SparkListener with Logging {


相关文章
|
存储 SQL 测试技术
使用ClickHouse进行向量搜索 - 第二部分
本文介绍了如何使用ClickHouse进行向量搜索。总体来说,本文通俗易懂地介绍了如何使用ClickHouse进行向量搜索,包括概念、实现、高级功能和应用示例,对使用ClickHouse进行向量搜索提供了很好的概述。
52401 19
|
8月前
|
人工智能 前端开发 JavaScript
纯干货!如何用Cursor+宜搭,3天完成三周开发量(附超详细视频教学)
Cursor是热门代码编辑器之一,通过与宜搭(Yida)结合,提供了强大的低代码页面和自定义组件生成能力。方案利用Claude模型的代码生成能力及MCP支持,大幅提升开发效率。开发者可通过Cursor连接宜搭表单设计器或自定义组件设计器,实现React源码的生成与同步,并支持二次编辑。现有功能涵盖文本、数字、图片、链接等多种字段类型,以及属性面板配置、JS代码生成等。
1831 83
|
9月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
183 17
|
存储 安全 测试技术
GoLang协程Goroutiney原理与GMP模型详解
本文详细介绍了Go语言中的Goroutine及其背后的GMP模型。Goroutine是Go语言中的一种轻量级线程,由Go运行时管理,支持高效的并发编程。文章讲解了Goroutine的创建、调度、上下文切换和栈管理等核心机制,并通过示例代码展示了如何使用Goroutine。GMP模型(Goroutine、Processor、Machine)是Go运行时调度Goroutine的基础,通过合理的调度策略,实现了高并发和高性能的程序执行。
759 29
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
819 1
|
SQL 消息中间件 Java
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
NoSQL 程序员 C语言
探秘Segmentation Fault错误:程序猿的噩梦
探秘Segmentation Fault错误:程序猿的噩梦
2619 0
|
Linux 开发工具 git
IntelliJ IDEA配置git工作效率翻倍
IntelliJ IDEA 是一个强大的集成开发环境,用于编程语言如 Java、Kotlin、Scala 和其他多种语言。Git 是一个开源的分布式版本控制系统,用于追踪项目过程中的代码变更。
956 0
IntelliJ IDEA配置git工作效率翻倍
|
存储 缓存 分布式计算
Gluten + Celeborn: 让 Native Spark 拥抱 Cloud Native
本篇文章介绍了 Gluten 项目的背景和目标,以及它如何解决基于 Apache Spark 的数据负载场景中的 CPU 计算瓶颈。此外,还详细介绍了 Gluten 与 Celeborn 的集成。Celeborn 采用了 Push Shuffle 的设计,通过远端存储、数据重组、内存缓存、多副本等设计,不仅进一步提升 Gluten Shuffle 的性能和稳定性,还使得 Gluten 拥有更好的弹性,从而更好的拥抱云原生。
2899 4
Gluten + Celeborn: 让 Native Spark 拥抱 Cloud Native
|
消息中间件 负载均衡 监控
Spring Cloud Alibaba 介绍
Spring Cloud Alibaba 介绍
1755 0
下一篇
oss云网关配置