【spark系列10】spark logicalPlan Statistics (逻辑计划阶段的统计信息)

简介: 【spark系列10】spark logicalPlan Statistics (逻辑计划阶段的统计信息)

背景


本文版本是spark 3.0.1


分析


逻辑阶段的统计信息,对于逻辑阶段的优化也是很重要的,比如broadcathashJoin,dynamic partitions pruning,本文分析一下spark 是怎么获取stastatics信息的

直接到LogicalPlanStats:

trait LogicalPlanStats { self: LogicalPlan =>
  /**
   * Returns the estimated statistics for the current logical plan node. Under the hood, this
   * method caches the return value, which is computed based on the configuration passed in the
   * first time. If the configuration changes, the cache can be invalidated by calling
   * [[invalidateStatsCache()]].
   */
  def stats: Statistics = statsCache.getOrElse {
    if (conf.cboEnabled) {
      statsCache = Option(BasicStatsPlanVisitor.visit(self))
    } else {
      statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
    }
    statsCache.get
  }
  /** A cache for the estimated statistics, such that it will only be computed once. */
  protected var statsCache: Option[Statistics] = None
  /** Invalidates the stats cache. See [[stats]] for more information. */
  final def invalidateStatsCache(): Unit = {
    statsCache = None
    children.foreach(_.invalidateStatsCache())
  }
}

该stats方法用来计算statistics,如果开启了cbo,则用BasicStatsPlanVisitor的visit,否则调用SizeInBytesOnlyStatsPlanVisitor的visit方法。我们可以看一下SizeInBytesOnlyStatsPlanVisitor.visit方法,因为BasicStatsPlanVisitor的很多方法都是调用SizeInBytesOnlyStatsPlanVisitor方法。而我们可以重点看一下default方法:

override def default(p: LogicalPlan): Statistics = p match {
    case p: LeafNode => p.computeStats()
    case _: LogicalPlan => Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).product)
  }

因为统计信息都是一层一层从叶子节点往上传递的,当匹配到叶子节点的时候,则直接调用该computeStats方法,对于不同版本的dataSource是有区别的:

  • 对于v1版本的,拿hiveTableRelation举例:
override def computeStats(): Statistics = {
   tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled || conf.planStatsEnabled))
     .orElse(tableStats)
     .getOrElse {
     throw new IllegalStateException("table stats must be specified.")
   }
 }

直接从元数据中获取信息,如果开启了cbo或者planstats,则还会获取行信息和列的统计信息

  • 对于v2版本的, 拿DataSourceV2Relation举例:
 override def computeStats(): Statistics = {
    if (Utils.isTesting) {
      // when testing, throw an exception if this computeStats method is called because stats should
      // not be accessed before pushing the projection and filters to create a scan. otherwise, the
      // stats are not accurate because they are based on a full table scan of all columns.
      throw new IllegalStateException(
        s"BUG: computeStats called before pushdown on DSv2 relation: $name")
    } else {
      // when not testing, return stats because bad stats are better than failing a query
      table.asReadable.newScanBuilder(options) match {
        case r: SupportsReportStatistics =>
          val statistics = r.estimateStatistics()
          DataSourceV2Relation.transformV2Stats(statistics, None, conf.defaultSizeInBytes)
        case _ =>
          Statistics(sizeInBytes = conf.defaultSizeInBytes)
      }
    }

直接调用table.newScanBuilder.如果继承了SupportsReportStatistics,则调用该estimateStatistics方法,这里涉及到的Table SupportsRead SupportsReportStatistics 都是spark 3引入的新类,我们直接看ParquetScan,默认是继承FileScan的estimateStatistics方法

override def estimateStatistics(): Statistics = {
    new Statistics {
      override def sizeInBytes(): OptionalLong = {
        val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor
        val size = (compressionFactor * fileIndex.sizeInBytes).toLong
        OptionalLong.of(size)
      }
      override def numRows(): OptionalLong = OptionalLong.empty()
    }
  }

其实可以看出v2版本的没有列统计信息,至少目前是没有,而v1版本的部分是有列统计信息的, 毕竟统计每一列的信息是耗时的.

相关文章
|
6月前
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
40 0
|
分布式计算 关系型数据库 MySQL
Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中
Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中
Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中
|
5月前
|
机器学习/深度学习 分布式计算 大数据
【大数据技术】Spark MLlib机器学习特征抽取 TF-IDF统计词频实战(附源码和数据集)
【大数据技术】Spark MLlib机器学习特征抽取 TF-IDF统计词频实战(附源码和数据集)
31 0
|
11月前
|
SQL JSON 分布式计算
【大数据学习篇10】Spark项目实战~网站转化率统计
【大数据学习篇10】Spark项目实战~网站转化率统计
415 0
【大数据学习篇10】Spark项目实战~网站转化率统计
|
12月前
|
存储 分布式计算 Java
JAVA Spark rdd使用Spark编程实现:统计出每个省份广 告被点击次数的TOP3
JAVA Spark rdd使用Spark编程实现:统计出每个省份广 告被点击次数的TOP3
|
机器学习/深度学习 分布式计算 算法
Spark机器学习库(MLlib)指南之简介及基础统计
Spark机器学习库(MLlib)指南之简介及基础统计
281 0
|
机器学习/深度学习 存储 分布式计算
Spark 机器学习 概括统计 summary statistics [摘要统计]
概括统计 summary statistics [摘要统计] 单词 linalg 分开 linear + algebra: 线性代数
143 0
|
分布式计算 大数据 Hadoop
大数据实验——用Spark实现wordcount单词统计
大数据实验——用Spark实现wordcount单词统计
大数据实验——用Spark实现wordcount单词统计
|
分布式计算 大数据 数据处理
Spark 原理_总体介绍_逻辑执行图 | 学习笔记
快速学习 Spark 原理_总体介绍_逻辑执行图
99 0
Spark 原理_总体介绍_逻辑执行图 | 学习笔记
|
分布式计算 大数据 Scala
Spark 原理_逻辑图_窄依赖的分类_看源码 | 学习笔记
快速学习 Spark 原理_逻辑图_窄依赖的分类_看源码
68 0
Spark 原理_逻辑图_窄依赖的分类_看源码 | 学习笔记