SPARK统计信息的来源-通过优化规则来分析

简介: SPARK统计信息的来源-通过优化规则来分析

背景


此文的分析基于spark 3.1.2

且set spark.sql.catalogImplementation = hive 且表是分区的情况下


在之前翻译的文章Spark SQL explaind中的统计信息-深入了解CBO优化里,我们说到,如果一个hive表是分区的,没有开启CBO,没有进行ATC,那么该逻辑计划的sizeInBytes就是8EB。其实这是不对的。我来分析一下。


分析


就如前面的图所示:

image.png

这只是一个大概的流程,在spark的实现中,是有细微的区别的(至少在spark 3.1.2是不一样的)。

我们运行,之前SPARK SQL中 CTE(with表达式)会影响性能么?提到的sql,我们就会发现该sql会进行如下的规则(只列举relation及统计信息的部分):

经过ResolveRelations规则(代码比较简单,不做解释):

UnresolvedRelation
         ||
         \/ 
UnresolvedCatalogRelation(CatalogTable)

经过FindDataSourceTable规则(代码比较简单,不做解释):

UnresolvedCatalogRelation(CatalogTable)
         ||
         \/ 
HiveTableRelation(CatalogTable)

经过DetermineTableStats规则(增加统计信息sizeInBytes):

HiveTableRelation(CatalogTable) 
         ||
         \/ 
HiveTableRelation(tableStats=Some(Statistics(sizeInBytes = BigInt(sizeInBytes))))

这部分代码如下:

private def hiveTableWithStats(relation: HiveTableRelation): HiveTableRelation = {
    val table = relation.tableMeta
    val partitionCols = relation.partitionCols
    // For partitioned tables, the partition directory may be outside of the table directory.
    // Which is expensive to get table size. Please see how we implemented it in the AnalyzeTable.
    val sizeInBytes = if (conf.fallBackToHdfsForStatsEnabled && partitionCols.isEmpty) {
      try {
        val hadoopConf = session.sessionState.newHadoopConf()
        val tablePath = new Path(table.location)
        val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
        fs.getContentSummary(tablePath).getLength
      } catch {
        case e: IOException =>
          logWarning("Failed to get table size from HDFS.", e)
          conf.defaultSizeInBytes
      }
    } else {
      conf.defaultSizeInBytes
    }
    val stats = Some(Statistics(sizeInBytes = BigInt(sizeInBytes)))
    relation.copy(tableStats = stats)

也就是说如果hive表如果是非分区的话,而且开启了spark.sql.statistics.fallBackToHdfs(默认是关闭),

就会从hdfs获取统计信息。

如果是分区表的话,直接默认为Long.MaxValue。

经过RelationConversions规则:

HiveTableRelation(tableStats=Some(Statistics(sizeInBytes = BigInt(sizeInBytes))))
         ||
         \/ 
LogicalRelation(HadoopFsRelation(CatalogFileIndex(sizeInBytes)))

其中sizeInBytes是HiveTableRelation的LogicalPlanVisitor 计算出来的sizeInBytes

这个规则主要是把元数据的relation 转换成基于source的relation,这会提高性能。

因为后续的规则,会基于relation做进一步的优化,比如分区下推filter。


经过PruneFileSourcePartitions规则:

LogicalRelation(HadoopFsRelation(CatalogFileIndex()))
         ||
         \/ 
LogicalRelation(HadoopFsRelation(InMemoryFileIndex(partitionSpec,sizeInBytes=allFiles().map(_.getLen).sum)))

该规则主要是针对LogicalRelation把CatalogFileIndex转换为InMemoryFileIndex,InMemoryFileIndex这里就包括了用户指定分区的路径,以及sizeInBytes,

这就是在SPARK UI 为什么能看到scan的数据明细,而且sizeInBytes在后续做优化判断的时候,具有很好的指导意义。

image.png

再结合visit,就可以知道统计信息的来源了,代码如下:

trait LogicalPlanStats { self: LogicalPlan =>
  /**
   * Returns the estimated statistics for the current logical plan node. Under the hood, this
   * method caches the return value, which is computed based on the configuration passed in the
   * first time. If the configuration changes, the cache can be invalidated by calling
   * [[invalidateStatsCache()]].
   */
  def stats: Statistics = statsCache.getOrElse {
    if (conf.cboEnabled) {
      statsCache = Option(BasicStatsPlanVisitor.visit(self))
    } else {
      statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
    }
    statsCache.get
  }

结论


其中最重要的规则还是DetermineTableStats RelationConversions和PruneFileSourcePartitions。它们把基于元数据的relatoin转换成基于datasource的relation,这样我们能够在datasource上做更进一步的分析和优化。

当然具体的case还是得具体分析。


相关文章
|
6月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
211 1
Spark快速大数据分析PDF下载读书分享推荐
|
4天前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
|
2月前
|
分布式计算 监控 大数据
如何优化Spark中的shuffle操作?
【10月更文挑战第18天】
|
3月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
217 2
|
3月前
|
存储 分布式计算 监控
Spark如何优化?需要注意哪些方面?
【10月更文挑战第10天】Spark如何优化?需要注意哪些方面?
59 6
|
3月前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
54 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
3月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
136 0
|
3月前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
59 0
|
5月前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
Spark在供应链核算中应用问题之通过Spark UI进行任务优化如何解决
|
6月前
|
弹性计算 分布式计算 Serverless
全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
【7月更文挑战第6天】全托管一站式大规模数据处理和分析Serverless平台 | EMR Serverless Spark 评测
23749 42