本文基于 spark 3.1.2
在调试spark sql任务的时候,发现有任务产生了200多个exchange,而且任务长期运行不出来。
和之前的文章spark task过多导致任务运行过慢甚至超时 做法一样(对应的内存都是调优完后的镜像信息),三步曲如下:
用jstat -gcutil查看一下对应的gc情况,如下:
S0 S1 E O M CCS YGC YGCT FGC FGCT GCT 0.00 100.00 96.35 75.69 91.07 93.57 761 8.476 37 1.073 9.548 100.00 0.00 28.08 76.17 91.07 93.57 762 8.509 37 1.073 9.581 100.00 0.00 66.38 76.17 91.07 93.57 762 8.509 38 1.148 9.656 0.00 89.11 15.98 60.79 91.04 93.46 763 8.529 38 1.148 9.676 0.00 89.11 67.15 60.79 91.04 93.46 763 8.529 38 1.148 9.676 53.32 0.00 0.00 60.79 91.04 93.46 764 8.536 38 1.148 9.683 53.32 0.00 32.68 60.79 91.04 93.46 764 8.536 38 1.148 9.683 53.32 0.00 66.07 60.79 91.04 93.46 764 8.536 38 1.148 9.683 53.32 0.00 98.66 60.79 91.04 93.46 764 8.536 38 1.148 9.683 0.00 90.36 25.66 60.79 91.04 93.46 765 8.543 38 1.148 9.691 0.00 90.36 59.88 60.79 91.04 93.46 765 8.543 38 1.148 9.691 0.00 90.36 95.83 60.79 91.04 93.46 765 8.543 38 1.148 9.691 97.11 0.00 25.22 61.08 91.04 93.46 766 8.555 38 1.148 9.702
用jmap -heap 命令查看一下对应的堆情况:
Heap Configuration: MinHeapFreeRatio = 40 MaxHeapFreeRatio = 70 MaxHeapSize = 6442450944 (6144.0MB) NewSize = 172621824 (164.625MB) MaxNewSize = 523436032 (499.1875MB) OldSize = 345374720 (329.375MB) NewRatio = 2 SurvivorRatio = 8 MetaspaceSize = 21807104 (20.796875MB) CompressedClassSpaceSize = 1073741824 (1024.0MB) MaxMetaspaceSize = 17592186044415 MB G1HeapRegionSize = 0 (0.0MB) Heap Usage: New Generation (Eden + 1 Survivor Space): capacity = 155385856 (148.1875MB) used = 125461056 (119.64898681640625MB) free = 29924800 (28.53851318359375MB) 80.74161910849853% used Eden Space: capacity = 138149888 (131.75MB) used = 108225088 (103.21148681640625MB) free = 29924800 (28.53851318359375MB) 78.33888942421727% used From Space: capacity = 17235968 (16.4375MB) used = 17235968 (16.4375MB) free = 0 (0.0MB) 100.0% used To Space: capacity = 17235968 (16.4375MB) used = 0 (0.0MB) free = 17235968 (16.4375MB) 0.0% used concurrent mark-sweep generation: capacity = 1930588160 (1841.15234375MB) used = 1187447176 (1132.437873840332MB) free = 743140984 (708.714469909668MB) 61.50701639027974% used
再次 我们用jmap -dump:format=b,file=heapdump.hprof命令dump内存的堆信息,我们分析一下,用MAT打开,我们可以看到如下的信息:
class SQLAppStatusListener( conf: SparkConf, kvstore: ElementTrackingStore, live: Boolean) extends SparkListener with Logging {
val statusStore: SQLAppStatusStore = { val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] val listener = new SQLAppStatusListener(conf, kvStore, live = true) sparkContext.listenerBus.addToStatusQueue(listener) val statusStore = new SQLAppStatusStore(kvStore, Some(listener)) sparkContext.ui.foreach(new SQLTab(statusStore, _)) statusStore }
SQLAppStatusListener 会被加到数据总线中,也就是说所有的event的事件都会被接受,只不过可以自己进行过滤。
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { if (!isSQLStage(event.stageInfo.stageId)) { return } // Reset the metrics tracking object for the new attempt. Option(stageMetrics.get(event.stageInfo.stageId)).foreach { stage => if (stage.attemptId != event.stageInfo.attemptNumber) { stageMetrics.put(event.stageInfo.stageId, new LiveStageMetrics(event.stageInfo.stageId, event.stageInfo.attemptNumber, stage.numTasks, stage.accumIdsToMetricType)) } } } ... private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { val SparkListenerSQLExecutionEnd(executionId, time) = event Option(liveExecutions.get(executionId)).foreach { exec => exec.completionTime = Some(new Date(time)) update(exec) // Aggregating metrics can be expensive for large queries, so do it asynchronously. The end // event count is updated after the metrics have been aggregated, to prevent a job end event // arriving during aggregation from cleaning up the metrics data. kvstore.doAsync { exec.metricsValues = aggregateMetrics(exec) removeStaleMetricsData(exec) exec.endEvents.incrementAndGet() update(exec, force = true) } } } private def removeStaleMetricsData(exec: LiveExecutionData): Unit = { // Remove stale LiveStageMetrics objects for stages that are not active anymore. val activeStages = liveExecutions.values().asScala.flatMap { other => if (other != exec) other.stages else Nil }.toSet stageMetrics.keySet().asScala .filter(!activeStages.contains(_)) .foreach(stageMetrics.remove) }