导读
/** * Asynchronously passes SparkListenerEvents to registered SparkListeners. * 异步将SparkListenerEvents传递给已注册的SparkListeners。 * * Until `start()` is called, all posted events are only buffered. Only after this listener bus * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. * 在调用“ start()”之前,所有已发布的事件仅被缓冲。 * 仅在此侦听器总线启动之后,事件才会实际传播到所有连接的侦听器。 * 当调用`stop()`时,此侦听器总线停止,并且停止后它将丢弃其他事件。 */
为什么要使用事件监听机制?
设想如果Spark事件通知采用Scala函数调用方式,随着集群规模的增加,会对函数调用的越来越多,最终会受到JVM线程数量的限制而影响监控数据的更新,甚至出现无法提供监控数据给用户。函数调用多数情况是同步调用,这样还会导致线程阻塞,并被长时间占用。
使用事件监听机制的好处是什么?
会将函数调用更换成事件发送或者事件投递,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,整个系统的并发将会大大的增加。发送的事件会进入缓存,由定时调度取出,分配给监听此事件的监听器对监控数据更新。
队列
异步事件队列
异步事件列队主要由LinkedBlockingQueue[SparkListenerEvent]
构建,默认大小为10000
事件监听线程会不断从LinkedBlockingQueue
中获取事件。任何事件都会在LinkedBlockingQueue
中存放一段时间,当线程处理完这个事件后,会将其清除。
// LiveListenerBus.scala private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() // AsyncEventQueue.scala // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if // it's perpetually being added to more quickly than it's being drained. private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) //默认值10000
监听器队列
/** Add a listener to queue shared by all non-internal listeners. */ /** * 主要由SparkContext调用,即用户可以在代码中增加Listener, * 或从配单中增加Listener并反射调用[实现在SparkContext中的setupAndStartListenerBus()] * */ def addToSharedQueue(listener: SparkListenerInterface): Unit = { addToQueue(listener, SHARED_QUEUE) } /** Add a listener to the executor management queue. */ /** * 分别可增加HeartbeatReceiver(用于监听Executor的Add和Remove,并使用线程定期判断各Executor的心跳时间,超时则Kill * Executor),另外可通过ExecutorAllocationManager增加ExecutorAllocationListener * (通过计算总task数和Excutor并行度进行匹配,动态增加、减少Executor,需要配置,默认关闭) * */ def addToManagementQueue(listener: SparkListenerInterface): Unit = { addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE) } /** Add a listener to the application status queue. */ /** * 主要增加了AppStatusListener,为AppStatusStore提供Job、Stage、Task的UI展示数据, * 以及增加了SQLAppStatusListener,为SQLAppStatesStore提供SQLUI展示数据 * */ def addToStatusQueue(listener: SparkListenerInterface): Unit = { addToQueue(listener, APP_STATUS_QUEUE) } /** Add a listener to the event log queue. */ /**将监听到的事件以Json方式写出到日志存储,需要配置,默认为关闭*/ def addToEventLogQueue(listener: SparkListenerInterface): Unit = { addToQueue(listener, EVENT_LOG_QUEUE) } /** * Add a listener to a specific queue, creating a new queue if needed. Queues are independent * of each other (each one uses a separate thread for delivering events), allowing slower * listeners to be somewhat isolated from others. * 前面几个方法内部均调用此方法 * 另外:spark structured streaming流式计算对应的StreamingQueryListenerBus通过addToQueue()方法加入"streams"队列 * (用于监听流的start、process、terminate时间,其中process事件能获取到流处理的详细进度,包括流名称、id、水印时间、 * source offsets、sink offsets等) */ private[spark] def addToQueue( listener: SparkListenerInterface, queue: String): Unit = synchronized { if (stopped.get()) { throw new IllegalStateException("LiveListenerBus is stopped.") } queues.asScala.find(_.name == queue) match { case Some(queue) => queue.addListener(listener) case None => val newQueue = new AsyncEventQueue(queue, conf, metrics, this) newQueue.addListener(listener) if (started.get()) { newQueue.start(sparkContext) } queues.add(newQueue) } }
事件投递
SparkListenerEvent事件类型
SparkListenerEvent
是一个特质,如下是一些子类,可以用于事件的展示、记录。
@DeveloperApi @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event") trait SparkListenerEvent { /* Whether output this event to the event log */ protected[spark] def logEvent: Boolean = true } @DeveloperApi case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null) extends SparkListenerEvent @DeveloperApi case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent @DeveloperApi case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo) extends SparkListenerEvent @DeveloperApi case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent @DeveloperApi case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent
外部公用事件投递接口POST
外部事件投递接口,SparkContext、DAGScheduler 、CoarseGrainedSchedulerBackend
等都通过post,提交事件到总线。
投递过程:
- 总线启动,调用postToQueues()方法将事件投入到对应的命名队列中。
- 总线未启动,将事件保存到ListBuffer[SparkListenerEvent]队列中,等待总线启动时投递事件,清空缓存
事件投递过程代码如下
// 在SparkContext中会调用事件的start方法启动总线 def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized { // 标记总线为已启动 if (!started.compareAndSet(false, true)) { throw new IllegalStateException("LiveListenerBus already started.") } this.sparkContext = sc // 总线启动后,将queuedEvents缓存队列投递后清空 queues.asScala.foreach { q => q.start(sc) queuedEvents.foreach(q.post) } queuedEvents = null metricsSystem.registerSource(metrics) } //在post方法中,会判断总线是否启动及投递 def post(event: SparkListenerEvent): Unit = { if (stopped.get()) { return } metrics.numEventsPosted.inc() // If the event buffer is null, it means the bus has been started and we can avoid // synchronization and post events directly to the queues. This should be the most // common case during the life of the bus. // 总线已经启动,缓存队列queuedEvents已置为null,则直接投递 if (queuedEvents == null) { postToQueues(event) return } // Otherwise, need to synchronize to check whether the bus is started, to make sure the thread // calling start() picks up the new event. synchronized { if (!started.get()) { // 总线未启动,则将事件先放入缓存队列 queuedEvents += event return } } // If the bus was already started when the check above was made, just post directly to the queues. // 投递事件 postToQueues(event) }
DAGScheduler投递事件分析
更新监控指标
def executorHeartbeatReceived( execId: String, // (taskId, stageId, stageAttemptId, accumUpdates) accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean = { listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates)) blockManagerMaster.driverEndpoint.askSync[Boolean]( BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) }