SPARK中关于HighlyCompressedMapStatus的说明(会造成运行时的数据不精确)

简介: SPARK中关于HighlyCompressedMapStatus的说明(会造成运行时的数据不精确)

背景


本文基于spark 3.1.2


分析


HighlyCompressedMapStatus 是属于MapStatus的子类,也就是在每个ShuffleMapTask写完数据以后,会返回给Driver端的结果,以便记录该次MapTask的任务情况,以及shuffle数据在整个集群的分布情况。


MapStatus在Map任务怎么被写入的


在每个ShuffleMapTask结束以后,都会生成MapStatus的数据结构,如下:

/** Write a bunch of records to this task's output */
  override def write(records: Iterator[Product2[K, V]]): Unit = {
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
                                ||
                                \/
 def apply(
      loc: BlockManagerId,
      uncompressedSizes: Array[Long],
      mapTaskId: Long): MapStatus = {
    if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
      HighlyCompressedMapStatus(loc, uncompressedSizes, mapTaskId)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes, mapTaskId)
    }
  }

其中minPartitionsToUseHighlyCompressMapStatus也就是spark.shuffle.minNumPartitionsToHighlyCompress,默认是2000,只要超过这个阈值,就会生成HighlyCompressedMapStatus实例,否则就是CompressedMapStatus


这是因为MapStatus的存储会占用driver端太多的内存,这在下文中会解释到。


我们转到HighlyCompressedMapStatus构造方法中,如下:

def apply(
      loc: BlockManagerId,
      uncompressedSizes: Array[Long],
      mapTaskId: Long): HighlyCompressedMapStatus = {
        ...
        val threshold = Option(SparkEnv.get)
      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
       val avgSize = if (numSmallBlocks > 0) {
      totalSmallBlockSize / numSmallBlocks
    } else {
      0
    }
    emptyBlocks.trim()
    emptyBlocks.runOptimize()
    new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
      hugeBlockSizes, mapTaskId)

这里有个阈值判断SHUFFLE_ACCURATE_BLOCK_THRESHOLD,也就是spark.shuffle.accurateBlockThreshold,这是来map端任务记录精确分区的阈值,如果大于该阈值,则会记录真实的reduce数据的分区大小,如果小于则记录的是每个reduce大小的平均值(这导致会在reduce获取运行时的数据大小信息时数据不准确的问题,从而导致AQE的效果不是很理想)。


MapStatus在Driver端怎么被记录的


之前说过为啥下游reduce的个数超过2000时,就会生成压缩的MapStatus实例,这是跟MapStatus在Driver端的存储有关。对于MapStatus的信息都会通过ExecutorBackendstatusUpdate方法传给driver,最终是DAGScheduler的方法片段:

mapOutputTracker.registerMapOutput(
                shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)

被mapOutputTracker所注册,保存在shuffleStatuses Map结构中,如果说CompressedMapStatus类型的数据结构的化,回导致CompressedMapStatus记录的信息很多,最终会导致Driver的OOM问题。


CompressedMapStatus的数据结构如下:

private[spark] class CompressedMapStatus(
    private[this] var loc: BlockManagerId,
    private[this] var compressedSizes: Array[Byte],
    private[this] var _mapTaskId: Long)
  extends MapStatus with Externalizable

HighlyCompressedMapStatus采用的是对于数据量小的reduce分区数据采用公用平均值的方式,这在一定程度上能够减缓Driver OOM的概率,


HighlyCompressedMapStatus的数据结构如下:

private[spark] class HighlyCompressedMapStatus private (
    private[this] var loc: BlockManagerId,
    private[this] var numNonEmptyBlocks: Int,
    private[this] var emptyBlocks: RoaringBitmap,
    private[this] var avgSize: Long,
    private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte],
    private[this] var _mapTaskId: Long)
  extends MapStatus with Externalizable

MapStatus在Driver端怎么被使用的


我们知道MapStatus的信息最终会被保存在MapOutputTrackerMaster中,这样下游的reduce任务如果需要获取上游MapTask的运行情况的时候就会最终调用到MapOutputTrackerMaster对应的方法中,最终会调用到MapStatusgetSizeForBlock方法,(当然AQE中的数据倾斜处理规则OptimizeSkewedJoin也会用到)


对于getSizeForBlock方法的实现对于不同的子类行为是不一样的:


  • CompressedMapStatus
 override def getSizeForBlock(reduceId: Int): Long = {
    MapStatus.decompressSize(compressedSizes(reduceId))
  }

CompressedMapStatus 直接返回每个reduce的真实数据大小


  • HighlyCompressedMapStatus
 override def getSizeForBlock(reduceId: Int): Long = {
    assert(hugeBlockSizes != null)
    if (emptyBlocks.contains(reduceId)) {
      0
    } else {
      hugeBlockSizes.get(reduceId) match {
        case Some(size) => MapStatus.decompressSize(size)
        case None => avgSize
      }
    }
  }

HighlyCompressedMapStatus对于reduce数据量比较小的,就直接返回一个平均值(这会对数据统计造成误导),对于数据量比较大的reduce分区(spark.shuffle.accurateBlockThreshold参数控制的,默认100MB)就会返回真实值。


为什么HighlyCompressedMapStatus更加节约存储


因为对于小数据量的分区,只需要存储一个平均值,而不像 CompressedMapStatus那样都会存储具体的数值(用map存储)

相关文章
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
56 3
|
2月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
42 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
4月前
|
存储 分布式计算 Java
|
4月前
|
分布式计算 监控 大数据
如何处理 Spark 中的倾斜数据?
【8月更文挑战第13天】
261 4
|
4月前
|
存储 缓存 分布式计算
|
4月前
|
SQL 存储 分布式计算
|
4月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
61 1
|
5月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
5月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
弹性计算 分布式计算 DataWorks
DataWorks产品使用合集之spark任务如何跨空间取表数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
47 1