spark shuffle(ExchangeExec)过多导致任务运行过慢甚至超时

简介: spark shuffle(ExchangeExec)过多导致任务运行过慢甚至超时

背景以及现象


本文基于 spark 3.1.2

设置spark.driver.memory=2g

在调试spark sql任务的时候,发现有任务产生了200多个exchange,而且任务长期运行不出来。


分析


运行对应的sql(多个连续的join操作,且join的key都不一样),得到如下的物理计划(我们只截取了一部分):

image.png

和之前的文章spark task过多导致任务运行过慢甚至超时 做法一样(对应的内存都是调优完后的镜像信息),三步曲如下:

用jstat -gcutil查看一下对应的gc情况,如下:

  S0     S1     E      O      M     CCS    YGC     YGCT    FGC    FGCT     GCT
  0.00 100.00  96.35  75.69  91.07  93.57    761    8.476    37    1.073    9.548
100.00   0.00  28.08  76.17  91.07  93.57    762    8.509    37    1.073    9.581
100.00   0.00  66.38  76.17  91.07  93.57    762    8.509    38    1.148    9.656
  0.00  89.11  15.98  60.79  91.04  93.46    763    8.529    38    1.148    9.676
  0.00  89.11  67.15  60.79  91.04  93.46    763    8.529    38    1.148    9.676
 53.32   0.00   0.00  60.79  91.04  93.46    764    8.536    38    1.148    9.683
 53.32   0.00  32.68  60.79  91.04  93.46    764    8.536    38    1.148    9.683
 53.32   0.00  66.07  60.79  91.04  93.46    764    8.536    38    1.148    9.683
 53.32   0.00  98.66  60.79  91.04  93.46    764    8.536    38    1.148    9.683
  0.00  90.36  25.66  60.79  91.04  93.46    765    8.543    38    1.148    9.691
  0.00  90.36  59.88  60.79  91.04  93.46    765    8.543    38    1.148    9.691
  0.00  90.36  95.83  60.79  91.04  93.46    765    8.543    38    1.148    9.691
 97.11   0.00  25.22  61.08  91.04  93.46    766    8.555    38    1.148    9.702

用jmap -heap 命令查看一下对应的堆情况:

Heap Configuration:
   MinHeapFreeRatio         = 40
   MaxHeapFreeRatio         = 70
   MaxHeapSize              = 6442450944 (6144.0MB)
   NewSize                  = 172621824 (164.625MB)
   MaxNewSize               = 523436032 (499.1875MB)
   OldSize                  = 345374720 (329.375MB)
   NewRatio                 = 2
   SurvivorRatio            = 8
   MetaspaceSize            = 21807104 (20.796875MB)
   CompressedClassSpaceSize = 1073741824 (1024.0MB)
   MaxMetaspaceSize         = 17592186044415 MB
   G1HeapRegionSize         = 0 (0.0MB)
Heap Usage:
New Generation (Eden + 1 Survivor Space):
   capacity = 155385856 (148.1875MB)
   used     = 125461056 (119.64898681640625MB)
   free     = 29924800 (28.53851318359375MB)
   80.74161910849853% used
Eden Space:
   capacity = 138149888 (131.75MB)
   used     = 108225088 (103.21148681640625MB)
   free     = 29924800 (28.53851318359375MB)
   78.33888942421727% used
From Space:
   capacity = 17235968 (16.4375MB)
   used     = 17235968 (16.4375MB)
   free     = 0 (0.0MB)
   100.0% used
To Space:
   capacity = 17235968 (16.4375MB)
   used     = 0 (0.0MB)
   free     = 17235968 (16.4375MB)
   0.0% used
concurrent mark-sweep generation:
   capacity = 1930588160 (1841.15234375MB)
   used     = 1187447176 (1132.437873840332MB)
   free     = 743140984 (708.714469909668MB)
   61.50701639027974% used

再次 我们用jmap -dump:format=b,file=heapdump.hprof命令dump内存的堆信息,我们分析一下,用MAT打开,我们可以看到如下的信息:

image.png

image.png

可以看到SQLAppStatusListener这个对象占用的内存达到了500多M,而且只是在任务的一开始,后续很长一段时间,该内存会增加,且会持续占用。

我们可以稍微分析一下SQLAppStatusListener这个类:

class SQLAppStatusListener(
    conf: SparkConf,
    kvstore: ElementTrackingStore,
    live: Boolean) extends SparkListener with Logging {

该类继承自SparkListener,并且该类会在

SharedState中会被初始化:

val statusStore: SQLAppStatusStore = {
    val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
    val listener = new SQLAppStatusListener(conf, kvStore, live = true)
    sparkContext.listenerBus.addToStatusQueue(listener)
    val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
    sparkContext.ui.foreach(new SQLTab(statusStore, _))
    statusStore
  }

SQLAppStatusListener 会被加到数据总线中,也就是说所有的event的事件都会被接受,只不过可以自己进行过滤。

而最终改listener存储的状态都会被ui给展示出来。


SQLAppStatusListener存储的组件最主要的也是stageMetrics,也是在内存中占用比较多的对象。


查看stageMetrics被调用的地方,主要是在onStageSubmitted方法,onExecutorMetricsUpdate方法,onExecutionEnd方法中:

override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    if (!isSQLStage(event.stageInfo.stageId)) {
      return
    }
    // Reset the metrics tracking object for the new attempt.
    Option(stageMetrics.get(event.stageInfo.stageId)).foreach { stage =>
      if (stage.attemptId != event.stageInfo.attemptNumber) {
        stageMetrics.put(event.stageInfo.stageId,
          new LiveStageMetrics(event.stageInfo.stageId, event.stageInfo.attemptNumber,
            stage.numTasks, stage.accumIdsToMetricType))
      }
    }
  }
  ...
  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    val SparkListenerSQLExecutionEnd(executionId, time) = event
    Option(liveExecutions.get(executionId)).foreach { exec =>
      exec.completionTime = Some(new Date(time))
      update(exec)
      // Aggregating metrics can be expensive for large queries, so do it asynchronously. The end
      // event count is updated after the metrics have been aggregated, to prevent a job end event
      // arriving during aggregation from cleaning up the metrics data.
      kvstore.doAsync {
        exec.metricsValues = aggregateMetrics(exec)
        removeStaleMetricsData(exec)
        exec.endEvents.incrementAndGet()
        update(exec, force = true)
      }
    }
  }
  private def removeStaleMetricsData(exec: LiveExecutionData): Unit = {
    // Remove stale LiveStageMetrics objects for stages that are not active anymore.
    val activeStages = liveExecutions.values().asScala.flatMap { other =>
      if (other != exec) other.stages else Nil
    }.toSet
    stageMetrics.keySet().asScala
      .filter(!activeStages.contains(_))
      .foreach(stageMetrics.remove)
  }

这里只截取了onStageSubmitted方法和onExecutionEnd方法,

因为onStageSubmitted这是在有stage提交的时候,spark会发出SparkListenerJobStart事件,这会往stageMetrics写入对应的信息。

而onExecutionEnd方法是在executor被移除的时候,spark会发出SparkListenerSQLExecutionEnd事件,这个时候会清除stageMetrics对应的信息。


所以说在stage很多的情况下(也就是exchange很多的情况下),stageMetrics会存储大量的信息,而这种情况下,executor会被长期占用而得不到释放,

所以导致了driver端内存持续增加。


结论以及解决方法


所以在这种情况下,如果业务上改变不了,我们就得增加内存,在笔者的情况下,增加driver内存到6g就能很好的解决,且运行的速度很顺畅。

spark.driver.memory=6g
相关文章
|
9月前
|
存储 缓存 分布式计算
Spark任务OOM问题如何解决?
大家好,我是V哥。在实际业务中,Spark任务常因数据量过大、资源分配不合理或代码瓶颈导致OOM(Out of Memory)。本文详细分析了各种业务场景下的OOM原因,并提供了优化方案,包括调整Executor内存和CPU资源、优化内存管理策略、数据切分及减少宽依赖等。通过综合运用这些方法,可有效解决Spark任务中的OOM问题。关注威哥爱编程,让编码更顺畅!
408 3
|
4月前
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
120 4
|
4月前
|
分布式计算 监控 Java
|
5月前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
213 15
|
6月前
|
存储 分布式计算 调度
Spark Master HA 主从切换过程不会影响到集群已有作业的运行, 为什么?
Spark Master 的高可用性(HA)机制确保主节点故障时,备用主节点能无缝接管集群管理,保障稳定运行。关键在于: 1. **Driver 和 Executor 独立**:任务执行不依赖 Master。 2. **应用状态保持**:备用 Master 通过 ZooKeeper 恢复集群状态。 3. **ZooKeeper 协调**:快速选举新 Master 并同步状态。 4. **容错机制**:任务可在其他 Executor 上重新调度。 这些特性保证了集群在 Master 故障时仍能正常运行。
|
6月前
|
缓存 分布式计算 资源调度
Spark 与 MapReduce 的 Shuffle 的区别?
MapReduce 和 Spark 在 Shuffle 过程中有显著区别。MapReduce 采用两阶段模型,中间数据写入磁盘,I/O 开销大;而 Spark 使用基于内存的多阶段执行模型,支持操作合并和内存缓存,减少 I/O。Spark 的 RDD 转换优化减少了 Shuffle 次数,提升了性能。此外,Spark 通过 lineage 实现容错,资源管理更灵活,整体大数据处理效率更高。
|
9月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
151 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
8月前
|
分布式计算 监控 大数据
如何优化Spark中的shuffle操作?
【10月更文挑战第18天】
|
9月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
220 0
|
9月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
168 0

热门文章

最新文章