SPARK中metrics是怎么传递的

简介: SPARK中metrics是怎么传递的

背景


本文基于spark 3.3.0


在看spark源码的时候,总是会看到类似longMetric("numOutputRows")的信息,但是一般来说这种metrics的定义一般是在Driver端,而真正的+1或者-1操作都是在executor进行的,这种指标到底是怎么传递的呢?我们分析一下


分析


FilterExec物理计划为例:

case class FilterExec(condition: Expression, child: SparkPlan)
  extends UnaryExecNode with CodegenSupport with GeneratePredicateHelper {
  ...
  override lazy val metrics = Map(
    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
  ...
  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
      val predicate = Predicate.create(condition, child.output)
      predicate.initialize(0)
      iter.filter { row =>
        val r = predicate.eval(row)
        if (r) numOutputRows += 1
        r
      }
    }
  }

为什么这么写可以


  • "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")),这里只定义了一个numOutputRows的指标,用来记录该物理操作过滤了多少行的数据


  • if (r) numOutputRows += 1 这个操作会在executor端执行
    其实要看懂这个操作,我们要深入一下SQLMetrics.createMetric的实现
 def createMetric(sc: SparkContext, name: String): SQLMetric = {
    val acc = new SQLMetric(SUM_METRIC)
    acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
    acc
  }
 ...
 abstract class AccumulatorV2[IN, OUT] extends Serializable {

其中SQLMetric类是继承AccumulatorV2,从而继承了Serializable,所以这个类是可序列化的,而且是可java序列化的,这一点很重要。再看SQLMetricregister方法,

 private[spark] def register(
    sc: SparkContext,
    name: Option[String] = None,
    countFailedValues: Boolean = false): Unit = {
  if (this.metadata != null) {
    throw new IllegalStateException("Cannot register an Accumulator twice.")
  }
  this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues)
  AccumulatorContext.register(this)
  sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))
}
  • this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues)分配一个拥有全局唯一的id的AccumulatorMetadata实例


  • AccumulatorContext.register(this) 这个调用了往map中登记了以全局唯一id为key,value为WeakReference的值,这里登记到map的作用就是后续Task会对该metrics的值进行操作,下面会说到

  • sc.cleaner.foreach(_.registerAccumulatorForCleanup(this)) 这步操作和之前的文章说的一样SPARK 是怎么清除Shuffle中间结果数据的,只不过这里只是清理了Driver端的metrics


这里很重要


在scala里会有闭包的概念(这里可以自己网上查找原理),但是spark也会对闭包进一步进行处理,详见ClosureCleaner.clean方法。总结一下,简单来说,就是exeuctor会序列化用到的变量,所以说SQLMetric必须是可java序列化的(同时全局唯一的id也会被序列化)。


executor端的变量怎么传递到Driver端的


  1. 1.我们先来看AccumulatorV2的readObject方法:
 private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
  in.defaultReadObject()
  if (atDriverSide) {
    atDriverSide = false
    // Automatically register the accumulator when it is deserialized with the task closure.
    // This is for external accumulators and internal ones that do not represent task level
    // metrics, e.g. internal SQL metrics, which are per-operator.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      taskContext.registerAccumulator(this)
    }
  } else {
    atDriverSide = true
  }
}

这个代码会在Executor执行,所以会执行taskContext.registerAccumulator(this)从而调用taskMetrics.registerAccumulator(a),从而保存在名为externalAccumsArrayBuffer


2. 再看task端的执行TaskRunnerrun()方法:


task = ser.deserialize[Task[Any]]( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) ... valaccumUpdates = task.collectAccumulatorUpdates()


  • 这里会调用ser.deserialize方法,从而触发AccumulatorV2readObject方法,从而该AccumulatorV2变量会保存在executor端,且保留了全局唯一id

  • val accumUpdates = task.collectAccumulatorUpdates() 收集spark内置的metrics(如remoteBlocksFetched)和自定义的metrics,这个会通过execBackend.statusUpdate方法,传达Driver端,最终调用到DAGSchedulerupdateAccumulators方法更新指标:
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
event.accumUpdates.foreach { updates =>
  val id = updates.id
  try {
  // Find the corresponding accumulator on the driver and update it
  val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match {
    case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]]
    case None =>
      throw SparkCoreErrors.accessNonExistentAccumulatorError(id)
  }
  acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]])
  // To avoid UI cruft, ignore cases where value wasn't updated
  if (acc.name.isDefined && !updates.isZero) {
    stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
    event.taskInfo.setAccumulables(
      acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
  }

acc.merge这个方法就完成了指标的更新。


event.taskInfo.setAccumulables这个是给当前event更新到最新的metrics,因为最终driver调用SparkListenerTaskEnd方法,从而被 AppStatusListeneronTaskEnd方法接受,从而完成Spark UI的更新(被AppStatusStore调用)。


同时也被SQLAppStatusListeneronTaskEnd方法接受,这里读者自己看代码即可,结果也是完成Spark UI的更新(被SQLAppStatusStore调用)


3.再看Executor端的reportHeartBeat方法:

   private def reportHeartBeat(): Unit = {
    ...
    val accumulatorsToReport =
      if (HEARTBEAT_DROP_ZEROES) {
        taskRunner.task.metrics.accumulators().filterNot(_.isZero)
      } else {
        taskRunner.task.metrics.accumulators()
      }
    accumUpdates += ((taskRunner.taskId, accumulatorsToReport))
    }
    ... 
     val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId,
  executorUpdates)
  try {
  val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
    message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))

这个reportHeartBeat会被周期的性的调用,用来向driver发送心跳信息,同时会带上metrics信息(包括spark内置的metrics和自定义的metrics),该方法通过向driver发送Heartbeat消息,最终会调用到DAGSchedulerexecutorHeartbeatReceived方法,从而被AppStatusListeneronExecutorMetricsUpdate方法接受:

  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
  val now = System.nanoTime()
  event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) =>
    liveTasks.get(taskId).foreach { task =>
      val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates)
      val delta = task.updateMetrics(metrics)
      maybeUpdate(task, now)
      Option(liveStages.get((sid, sAttempt))).foreach { stage =>
       stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, delta)
       maybeUpdate(stage, now)
       val esummary = stage.executorSummary(event.execId)
       esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, delta)
       maybeUpdate(esummary, now)
     }
   }
}

这里更新的是正在运行的task的指标更新,从而更新到Spark UI界面(被AppStatusStore调用)。


还有被SQLAppStatusListeneronExecutorMetricsUpdate方法接受,这里读者自己看代码即可,结果也是完成Spark UI的更新(被SQLAppStatusStore调用)


总结


在Driver端定义的metrics,会被反序列化到Executor端,在Executor端,通过两种方式传回Driver端:


  • 在任务运行期间,利用heartbeat心跳来传递metrics

  • 在任务结束以后,利用任务结果的更新来传递metrics

  • 最终,都是通过sparkListener:SQLAppStatusListener和 AppStatusListener分别完成Spark UI状态的更新。
相关文章
|
5天前
|
分布式计算 大数据 数据处理
如何在 PySpark 中实现自定义转换
【8月更文挑战第14天】
14 4
|
6天前
|
SQL 机器学习/深度学习 分布式计算
|
3月前
|
分布式计算 Scala Spark
Spark参数解析之MasterArguments
Spark参数解析之MasterArguments
28 0
|
存储 SQL JSON
Spark - Task 与 Partition 一一对应与参数详解
使用 spark 读取 parquet 文件,共有 M个 parquet 文件,于是启动了 PExecutor x QCores 进行如下 WordCount 代码测试,其中 P x Q = M 即 Core 数目与 parquet 文件数一一对应。
548 0
Spark - Task 与 Partition 一一对应与参数详解
|
SQL JSON 分布式计算
【Spark】(task2)PySpark数据统计和分组聚合
1.2 保存读取的信息 步骤2:将读取的进行保存,表头也需要保存,这里可保存为csv或者json格式文件。
642 0
【Spark】(task2)PySpark数据统计和分组聚合
jMeter parallel controller 无法使用 CSV Data config 提供的变量?
jMeter parallel controller 无法使用 CSV Data config 提供的变量?
125 0
jMeter parallel controller 无法使用 CSV Data config 提供的变量?
|
分布式计算 Java API
Spark 2.4.0编程指南--spark dataSet action
## 技能标签 - Spark session 创建 - 在Spark 2.0之后,RDD被数据集(Dataset)取代 ,保留RDD旧api - 数据集数据集介绍 - 读取本地文件(txt,json),HDFS文件 - 对txt格式文件数据遍历(行数据转成对象) - 对json格式文件...
1603 0
|
Web App开发 分布式计算 Java
|
机器学习/深度学习 分布式计算 算法