背景
本文基于spark 3.1.2
分析
HighlyCompressedMapStatus
是属于MapStatus
的子类,也就是在每个ShuffleMapTask
写完数据以后,会返回给Driver
端的结果,以便记录该次MapTask的任务情况,以及shuffle数据在整个集群的分布情况。
MapStatus在Map任务怎么被写入的
在每个ShuffleMapTask结束以后,都会生成MapStatus的数据结构,如下:
/** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
|| \/
def apply( loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long): MapStatus = { if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) { HighlyCompressedMapStatus(loc, uncompressedSizes, mapTaskId) } else { new CompressedMapStatus(loc, uncompressedSizes, mapTaskId) } }
其中minPartitionsToUseHighlyCompressMapStatus
也就是spark.shuffle.minNumPartitionsToHighlyCompress
,默认是2000,只要超过这个阈值,就会生成HighlyCompressedMapStatus
实例,否则就是CompressedMapStatus
这是因为MapStatus的存储会占用driver端太多的内存,这在下文中会解释到。
我们转到HighlyCompressedMapStatus构造方法中,如下:
def apply( loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long): HighlyCompressedMapStatus = { ... val threshold = Option(SparkEnv.get) .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD)) .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get) val avgSize = if (numSmallBlocks > 0) { totalSmallBlockSize / numSmallBlocks } else { 0 } emptyBlocks.trim() emptyBlocks.runOptimize() new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, hugeBlockSizes, mapTaskId)
这里有个阈值判断SHUFFLE_ACCURATE_BLOCK_THRESHOLD
,也就是spark.shuffle.accurateBlockThreshold
,这是来map端任务记录精确分区的阈值,如果大于该阈值,则会记录真实的reduce数据的分区大小,如果小于则记录的是每个reduce大小的平均值(这导致会在reduce获取运行时的数据大小信息时数据不准确的问题,从而导致AQE的效果不是很理想)。
MapStatus在Driver端怎么被记录的
之前说过为啥下游reduce的个数超过2000时,就会生成压缩的MapStatus
实例,这是跟MapStatus在Driver端的存储有关。对于MapStatus的信息都会通过ExecutorBackend
的statusUpdate
方法传给driver,最终是DAGScheduler
的方法片段:
mapOutputTracker.registerMapOutput( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
被mapOutputTracker所注册,保存在shuffleStatuses
Map结构中,如果说CompressedMapStatus
类型的数据结构的化,回导致CompressedMapStatus
记录的信息很多,最终会导致Driver
的OOM问题。
CompressedMapStatus
的数据结构如下:
private[spark] class CompressedMapStatus( private[this] var loc: BlockManagerId, private[this] var compressedSizes: Array[Byte], private[this] var _mapTaskId: Long) extends MapStatus with Externalizable
而HighlyCompressedMapStatus
采用的是对于数据量小的reduce分区数据采用公用平均值的方式,这在一定程度上能够减缓Driver OOM的概率,
HighlyCompressedMapStatus
的数据结构如下:
private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte], private[this] var _mapTaskId: Long) extends MapStatus with Externalizable
MapStatus在Driver端怎么被使用的
我们知道MapStatus
的信息最终会被保存在MapOutputTrackerMaster
中,这样下游的reduce任务如果需要获取上游MapTask的运行情况的时候就会最终调用到MapOutputTrackerMaster
对应的方法中,最终会调用到MapStatus
的getSizeForBlock
方法,(当然AQE中的数据倾斜处理规则OptimizeSkewedJoin
也会用到)
对于getSizeForBlock
方法的实现对于不同的子类行为是不一样的:
- CompressedMapStatus
override def getSizeForBlock(reduceId: Int): Long = { MapStatus.decompressSize(compressedSizes(reduceId)) }
CompressedMapStatus
直接返回每个reduce的真实数据大小
- HighlyCompressedMapStatus
override def getSizeForBlock(reduceId: Int): Long = { assert(hugeBlockSizes != null) if (emptyBlocks.contains(reduceId)) { 0 } else { hugeBlockSizes.get(reduceId) match { case Some(size) => MapStatus.decompressSize(size) case None => avgSize } } }
HighlyCompressedMapStatus
对于reduce数据量比较小的,就直接返回一个平均值(这会对数据统计造成误导),对于数据量比较大的reduce分区(spark.shuffle.accurateBlockThreshold
参数控制的,默认100MB)就会返回真实值。
为什么HighlyCompressedMapStatus更加节约存储
因为对于小数据量的分区,只需要存储一个平均值,而不像 CompressedMapStatus
那样都会存储具体的数值(用map存储)