背景
此文的分析基于spark 3.1.2
且set spark.sql.catalogImplementation = hive 且表是分区的情况下
在之前翻译的文章Spark SQL explaind中的统计信息-深入了解CBO优化里,我们说到,如果一个hive表是分区的,没有开启CBO,没有进行ATC,那么该逻辑计划的sizeInBytes就是8EB。其实这是不对的。我来分析一下。
分析
就如前面的图所示:
这只是一个大概的流程,在spark的实现中,是有细微的区别的(至少在spark 3.1.2是不一样的)。
我们运行,之前SPARK SQL中 CTE(with表达式)会影响性能么?提到的sql,我们就会发现该sql会进行如下的规则(只列举relation及统计信息的部分):
经过ResolveRelations规则(代码比较简单,不做解释):
UnresolvedRelation || \/ UnresolvedCatalogRelation(CatalogTable)
经过FindDataSourceTable规则(代码比较简单,不做解释):
UnresolvedCatalogRelation(CatalogTable) || \/ HiveTableRelation(CatalogTable)
经过DetermineTableStats规则(增加统计信息sizeInBytes):
HiveTableRelation(CatalogTable) || \/ HiveTableRelation(tableStats=Some(Statistics(sizeInBytes = BigInt(sizeInBytes))))
这部分代码如下:
private def hiveTableWithStats(relation: HiveTableRelation): HiveTableRelation = { val table = relation.tableMeta val partitionCols = relation.partitionCols // For partitioned tables, the partition directory may be outside of the table directory. // Which is expensive to get table size. Please see how we implemented it in the AnalyzeTable. val sizeInBytes = if (conf.fallBackToHdfsForStatsEnabled && partitionCols.isEmpty) { try { val hadoopConf = session.sessionState.newHadoopConf() val tablePath = new Path(table.location) val fs: FileSystem = tablePath.getFileSystem(hadoopConf) fs.getContentSummary(tablePath).getLength } catch { case e: IOException => logWarning("Failed to get table size from HDFS.", e) conf.defaultSizeInBytes } } else { conf.defaultSizeInBytes } val stats = Some(Statistics(sizeInBytes = BigInt(sizeInBytes))) relation.copy(tableStats = stats)
也就是说如果hive表如果是非分区的话,而且开启了spark.sql.statistics.fallBackToHdfs(默认是关闭),
就会从hdfs获取统计信息。
如果是分区表的话,直接默认为Long.MaxValue。
经过RelationConversions规则:
HiveTableRelation(tableStats=Some(Statistics(sizeInBytes = BigInt(sizeInBytes)))) || \/ LogicalRelation(HadoopFsRelation(CatalogFileIndex(sizeInBytes)))
其中sizeInBytes是HiveTableRelation的LogicalPlanVisitor 计算出来的sizeInBytes
这个规则主要是把元数据的relation 转换成基于source的relation,这会提高性能。
因为后续的规则,会基于relation做进一步的优化,比如分区下推filter。
经过PruneFileSourcePartitions规则:
LogicalRelation(HadoopFsRelation(CatalogFileIndex())) || \/ LogicalRelation(HadoopFsRelation(InMemoryFileIndex(partitionSpec,sizeInBytes=allFiles().map(_.getLen).sum)))
该规则主要是针对LogicalRelation把CatalogFileIndex转换为InMemoryFileIndex,InMemoryFileIndex这里就包括了用户指定分区的路径,以及sizeInBytes,
这就是在SPARK UI 为什么能看到scan的数据明细,而且sizeInBytes在后续做优化判断的时候,具有很好的指导意义。
再结合visit,就可以知道统计信息的来源了,代码如下:
trait LogicalPlanStats { self: LogicalPlan => /** * Returns the estimated statistics for the current logical plan node. Under the hood, this * method caches the return value, which is computed based on the configuration passed in the * first time. If the configuration changes, the cache can be invalidated by calling * [[invalidateStatsCache()]]. */ def stats: Statistics = statsCache.getOrElse { if (conf.cboEnabled) { statsCache = Option(BasicStatsPlanVisitor.visit(self)) } else { statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self)) } statsCache.get }
结论
其中最重要的规则还是DetermineTableStats RelationConversions和PruneFileSourcePartitions。它们把基于元数据的relatoin转换成基于datasource的relation,这样我们能够在datasource上做更进一步的分析和优化。
当然具体的case还是得具体分析。