【源码解读】| LiveListenerBus源码解读(上)

简介: 【源码解读】| LiveListenerBus源码解读

导读

/**
 * Asynchronously passes SparkListenerEvents to registered SparkListeners.
 * 异步将SparkListenerEvents传递给已注册的SparkListeners。
 *
 * Until `start()` is called, all posted events are only buffered. Only after this listener bus
 * has started will events be actually propagated to all attached listeners. This listener bus
 * is stopped when `stop()` is called, and it will drop further events after stopping.
 * 在调用“ start()”之前,所有已发布的事件仅被缓冲。
 * 仅在此侦听器总线启动之后,事件才会实际传播到所有连接的侦听器。
 * 当调用`stop()`时,此侦听器总线停止,并且停止后它将丢弃其他事件。
 */

为什么要使用事件监听机制?

设想如果Spark事件通知采用Scala函数调用方式,随着集群规模的增加,会对函数调用的越来越多,最终会受到JVM线程数量的限制而影响监控数据的更新,甚至出现无法提供监控数据给用户。函数调用多数情况是同步调用,这样还会导致线程阻塞,并被长时间占用。

使用事件监听机制的好处是什么?

会将函数调用更换成事件发送或者事件投递,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,整个系统的并发将会大大的增加。发送的事件会进入缓存,由定时调度取出,分配给监听此事件的监听器对监控数据更新。

队列

异步事件队列

异步事件列队主要由LinkedBlockingQueue[SparkListenerEvent] 构建,默认大小为10000

事件监听线程会不断从LinkedBlockingQueue中获取事件。任何事件都会在LinkedBlockingQueue中存放一段时间,当线程处理完这个事件后,会将其清除。

// LiveListenerBus.scala
   private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
   // AsyncEventQueue.scala
  // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
  conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))  //默认值10000

监听器队列

/** Add a listener to queue shared by all non-internal listeners. */
  /**
   * 主要由SparkContext调用,即用户可以在代码中增加Listener,
   * 或从配单中增加Listener并反射调用[实现在SparkContext中的setupAndStartListenerBus()]
   * */
  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, SHARED_QUEUE)
  }
  /** Add a listener to the executor management queue. */
  /**
   * 分别可增加HeartbeatReceiver(用于监听Executor的Add和Remove,并使用线程定期判断各Executor的心跳时间,超时则Kill
   * Executor),另外可通过ExecutorAllocationManager增加ExecutorAllocationListener
   * (通过计算总task数和Excutor并行度进行匹配,动态增加、减少Executor,需要配置,默认关闭)
   * */
  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
  }
  /** Add a listener to the application status queue. */
  /**
   * 主要增加了AppStatusListener,为AppStatusStore提供Job、Stage、Task的UI展示数据,
   * 以及增加了SQLAppStatusListener,为SQLAppStatesStore提供SQLUI展示数据
   * */
  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, APP_STATUS_QUEUE)
  }
  /** Add a listener to the event log queue. */
  /**将监听到的事件以Json方式写出到日志存储,需要配置,默认为关闭*/
  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EVENT_LOG_QUEUE)
  }
  /**
   * Add a listener to a specific queue, creating a new queue if needed. Queues are independent
   * of each other (each one uses a separate thread for delivering events), allowing slower
   * listeners to be somewhat isolated from others.
   * 前面几个方法内部均调用此方法
   * 另外:spark structured streaming流式计算对应的StreamingQueryListenerBus通过addToQueue()方法加入"streams"队列
   * (用于监听流的start、process、terminate时间,其中process事件能获取到流处理的详细进度,包括流名称、id、水印时间、
   * source offsets、sink offsets等)
   */
  private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String): Unit = synchronized {
    if (stopped.get()) {
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }
    queues.asScala.find(_.name == queue) match {
      case Some(queue) =>
        queue.addListener(listener)
      case None =>
        val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
        newQueue.addListener(listener)
        if (started.get()) {
          newQueue.start(sparkContext)
        }
        queues.add(newQueue)
    }
  }

事件投递


SparkListenerEvent事件类型

SparkListenerEvent 是一个特质,如下是一些子类,可以用于事件的展示、记录。

@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
  /* Whether output this event to the event log */
  protected[spark] def logEvent: Boolean = true
}
@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
  extends SparkListenerEvent
@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
  extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent

外部公用事件投递接口POST

外部事件投递接口,SparkContext、DAGScheduler 、CoarseGrainedSchedulerBackend等都通过post,提交事件到总线。

投递过程:

  • 总线启动,调用postToQueues()方法将事件投入到对应的命名队列中。
  • 总线未启动,将事件保存到ListBuffer[SparkListenerEvent]队列中,等待总线启动时投递事件,清空缓存

640.png

事件投递过程代码如下

// 在SparkContext中会调用事件的start方法启动总线
  def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {
    // 标记总线为已启动
    if (!started.compareAndSet(false, true)) {
      throw new IllegalStateException("LiveListenerBus already started.")
    }
    this.sparkContext = sc
    // 总线启动后,将queuedEvents缓存队列投递后清空
    queues.asScala.foreach { q =>
      q.start(sc)
      queuedEvents.foreach(q.post)
    }
    queuedEvents = null
    metricsSystem.registerSource(metrics)
  }
  //在post方法中,会判断总线是否启动及投递
    def post(event: SparkListenerEvent): Unit = {
    if (stopped.get()) {
      return
    }
    metrics.numEventsPosted.inc()
    // If the event buffer is null, it means the bus has been started and we can avoid
    // synchronization and post events directly to the queues. This should be the most
    // common case during the life of the bus.
    // 总线已经启动,缓存队列queuedEvents已置为null,则直接投递
    if (queuedEvents == null) {
      postToQueues(event)
      return
    }
    // Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
    // calling start() picks up the new event.
    synchronized {
      if (!started.get()) {
        // 总线未启动,则将事件先放入缓存队列
        queuedEvents += event
        return
      }
    }
    // If the bus was already started when the check above was made, just post directly to the queues.
    // 投递事件
    postToQueues(event)
  }

DAGScheduler投递事件分析

640.png

更新监控指标

def executorHeartbeatReceived(
      execId: String,
      // (taskId, stageId, stageAttemptId, accumUpdates)
      accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
      blockManagerId: BlockManagerId): Boolean = {
    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
    blockManagerMaster.driverEndpoint.askSync[Boolean](
      BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
  }
相关文章
|
8月前
|
JavaScript 前端开发
new原理解析
本文深入解析了JavaScript中的new关键字,介绍了其作用、原理,并提供了一些代码示例来帮助读者更好地理解。通过对new关键字的详细解析,我们可以更好地理解JavaScript中对象实例的创建过程,从而更加灵活地运用new关键字来构建复杂的应用程序。
97 0
|
存储 SQL 分布式计算
【源码解读】| LiveListenerBus源码解读(下)
【源码解读】| LiveListenerBus源码解读
167 0
【源码解读】| LiveListenerBus源码解读(下)
|
JSON Java 数据格式
|
存储
HashMap源码解读(下篇)
HashMap源码解读(下篇)
109 0
HashMap源码解读(下篇)
|
存储 Java 对象存储
HashMap源码解读(上篇)
HashMap源码解读(上篇)
127 0
HashMap源码解读(上篇)
|
存储 分布式计算 监控
【源码解读】|SparkEnv源码解读
【源码解读】|SparkEnv源码解读
146 0
lowbit(x)的原理解析
快速学习lowbit(x)的原理解析
|
SQL cobar 自然语言处理
Cobar源码分析之AST
Cobar是阿里开源的数据库中间件,关于它的介绍这里不再赘述,可以参考之前的文章《Cobar SQL审计的设计与实现》 Cobar中利用AST可以获取table名、列名、比较的值进行分库分表,这也是Cobar最重要的功能 线上写了一条没有where条件的update或delete,这时可以利用AST进行表达式计算,对没有where条件和where条件恒为true的SQL进行拦截。
263 0
Cobar源码分析之AST
|
存储 Java 数据库
Java集合源码分析之开篇
初衷 Java集合是我们使用最频繁的工具,也是面试的热点,但我们对它的理解仅限于使用上,而且大多数情况没有考虑过其使用规范。本系列文章将跟随源码的思路,分析实现的每个细节,以期在使用时避免各种不规范的坑。在这里,我们会惊艳于开发者优秀的设计,也会感激先辈们付出的艰辛努力,更重要的是知其所以然,少犯错误,写出优秀的代码。 许多人对集合类的理解是暴力的,当需要保存对象时就使用ArrayList,当需要保存键值对时就使用HashMap,当需要不可重复时就使用HashSet,等等。而且使用方式也比较单一:
217 0
kprobe原理解析
kprobe的出现就很有必要,它可以在运行的内核中动态插入探测点,执行你预定义的操作。
4974 1

热门文章

最新文章