SparkSQL Catalyst解析

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Catalyst Optimizer是SparkSQL的核心组件(查询优化器),它负责将SQL语句转换成物理执行计划,Catalyst的优劣决定了SQL执行的性能。

Catalyst Optimizer是SparkSQL的核心组件(查询优化器),它负责将SQL语句转换成物理执行计划,Catalyst的优劣决定了SQL执行的性能。

查询优化器是一个SQL引擎的核心,开源常用的有Apache Calcite(很多开源组件都通过引入Calcite来实现查询优化,如Hive/Phoenix/Drill等),另外一个是orca(HAWQ/GreenPlum中使用)。

关系代数是查询优化器的理论基础。常见的查询优化技术:查询重用(ReuseSubquery/ReuseExchange等)/RBO/CBO等。

SparkSQL执行流程

sql1

SparkSQL中对一条SQL语句的处理过程如上图所示:
1) SqlParser将SQL语句解析成一个逻辑执行计划(未解析)
2) Analyzer利用HiveMeta中表/列等信息,对逻辑执行计划进行解析(如表/列是否存在等)
3) SparkOptimizer利用Rule Based(基于经验规则RBO)/Cost Based(基于代价CBO)的优化方法,对逻辑执行计划进行优化(如谓词下推/JoinReorder)
4) SparkPlanner将逻辑执行计划转换成物理执行计划(如Filter -> FilterExec),
同时从某些逻辑算子的多种物理算子实现中根据RBO/CBO选择其中一个合适的物理算子(如Join的多个实现BroadcastJoin/SortMergeJoin/HashJoin中选择一个实现)
5) PrepareForExecution是执行物理执行计划之前做的一些事情,比如ReuseExchange/WholeStageCodegen的处理等等
6) 最终在SparkCore中执行该物理执行计划。

接下来介绍Catalyst中的核心模块SparkOptimizer/SparkPlanner.

SparkOptimizer

使用已有的规则对逻辑执行计划进行优化,该过程是基于经验/启发式的优化方法,得到优化过的逻辑执行计划。

444

如上图所示,Optimizer中有很多Batch,每个Batch中包含1个或多个Rule,Batch的另外一个属性是迭代次数(Once/FixPoint默认100次),每个Batch内部Rule有前后执行顺序,Batch之间也是按照顺序来执行的。目前Optimizer中有60多个Rule。
备注: 从Rule看JoinReorder在这个过程就已经处理了。

SparkPlanner

参考: https://issues.apache.org/jira/browse/SPARK-1251
SparkPlanner将逻辑执行计划转换成物理执行计划,即将逻辑执行计划树中的逻辑节点转换成物理节点,如Join转换成HashJoinExec/SortMergeJoinExec...,Filter转成FilterExec等

666

Spark的Stragety有8个:

  • DataSourceV2Strategy
  • FileSourceStrategy
  • DataSourceStrategy
  • SpecialLimits
  • Aggregation
  • JoinSelection
  • InMemoryScans
  • BasicOperators

上述很多Stragety都是基于规则的策略。
JoinSelection用到了相关的统计信息来选择将Join转换为BroadcastHashJoinExec还是ShuffledHashJoinExec还是SortMergeJoinExec,属于CBO基于代价的策略。

PrepareForExecution

在执行之前,对物理执行计划做一些处理,这些处理都是基于规则的,包括

  • PlanSubqueries
  • EnsureRequirements
  • CollapseCodegenStages
  • ReuseExchange
  • ReuseSubquery

经过上述步骤之后生成的最终物理执行计划提交到Spark执行。

CBO(基于代价)实现

CBO的实现有三个步骤如下,可以大致了解一下:

1. 统计信息采集

Optimizer/Planner中CBO(基于代价)的优化需要采集统计信息,包括表维度和列维度。

//包含表/列
case class Statistics(
    sizeInBytes: BigInt,
    rowCount: Option[BigInt] = None,
    attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
    hints: HintInfo = HintInfo())

//列
case class ColumnStat(
    distinctCount: BigInt,
    min: Option[Any],
    max: Option[Any],
    nullCount: BigInt,
    avgLen: Long,
    maxLen: Long,
    histogram: Option[Histogram] = None)

上面结构体用来存储统计信息,可以看出:
表维度: 大小/条数
列维度: NDV/min/max/Null/平均长度/最大长度/直方图

上述信息需要提前使用Analyze命令进行采集

// 采集表维度的统计信息,NOSCAN表示不扫描表(即只有表大小信息,不采集表条数信息)
ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS [NOSCAN];

// 采集列信息
// 若spark.sql.statistics.histogram.enabled设置为true,则会采集直方图信息
// 采集直方图信息需要额外一次的表扫描
// 使用的是等高直方图
// 只支持IntegralType/DoubleType/DecimalType/FloatType/DateType/TimestampType的列采集直方图
ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;

2.估算算子统计信息

逻辑执行计划树中只有叶子节点(表)有实际的统计信息(通过Analyze获取), 逻辑执行计划树中非叶子节点会根据子节点信息以及估算方法获取本节点的统计信息。

/**
 * Returns the estimated statistics for the current logical plan node. Under the hood, this
 * method caches the return value, which is computed based on the configuration passed in the
 * first time. If the configuration changes, the cache can be invalidated by calling
 * [[invalidateStatsCache()]].
 */
def stats: Statistics = statsCache.getOrElse {
  if (conf.cboEnabled) {
    statsCache = Option(BasicStatsPlanVisitor.visit(self))
  } else {
    statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
  }
  statsCache.get
}

def visit(p: LogicalPlan): T = p match {
    case p: Aggregate => visitAggregate(p)
    case p: Distinct => visitDistinct(p)
    case p: Except => visitExcept(p)
    case p: Expand => visitExpand(p)
    case p: Filter => visitFilter(p)
    case p: Generate => visitGenerate(p)
    case p: GlobalLimit => visitGlobalLimit(p)
    case p: Intersect => visitIntersect(p)
    case p: Join => visitJoin(p)
    case p: LocalLimit => visitLocalLimit(p)
    case p: Pivot => visitPivot(p)
    case p: Project => visitProject(p)
    case p: Repartition => visitRepartition(p)
    case p: RepartitionByExpression => visitRepartitionByExpr(p)
    case p: ResolvedHint => visitHint(p)
    case p: Sample => visitSample(p)
    case p: ScriptTransformation => visitScriptTransform(p)
    case p: Union => visitUnion(p)
    case p: Window => visitWindow(p)
    case p: LogicalPlan => default(p)
  }

每个算子都有自己的预估方法
CBO打开/关闭,有些算子的预估方法不一样,如AggregateEstimation/FilterEstimation/JoinEstimation/ProjectEstimation,其它算子CBO打开/关闭使用一套预估方法。

3.基于统计信息的优化

统计信息越准确,基于统计信息的优化更准确,从目前代码看只有下面三种场景使用到了统计信息。

JoinReorder

动态规划

//代价函数
//weight可以通过参数控制spark.sql.cbo.joinReorder.card.weight,默认0.7
//根据行数/大小来计算代价
cost = rows * weight + size * (1 - weight)

// 比较两种Join的代价大小
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
      if (other.planCost.card == 0 || other.planCost.size == 0) {
        false
      } else {
        val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
        val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
        relativeRows * conf.joinReorderCardWeight +
          relativeSize * (1 - conf.joinReorderCardWeight) < 1
      }
}
JoinSelection

根据Join两个子节点的统计信息,判断使用BroadcastHashJoinExec还是ShuffledHashJoinExec还是SortMergeJoinExec,比如其中一个表(size)很小则可以使用BroadcastHashJoinExec。

StarSchemaDetection

探测星型模型,判断一个列是否是表的主键(因为SparkSQL不支持主键设置)

/**
 * Determines if a column referenced by a base table access is a primary key.
 * A column is a PK if it is not nullable and has unique values.
 * To determine if a column has unique values in the absence of informational
 * RI constraints, the number of distinct values is compared to the total
 * number of rows in the table. If their relative difference
 * is within the expected limits (i.e. 2 * spark.sql.statistics.ndv.maxError based
 * on TPC-DS data results), the column is assumed to have unique values.
 */
  private def isUnique(
      column: Attribute,
      plan: LogicalPlan): Boolean = plan match {
    case PhysicalOperation(_, _, t: LeafNode) =>
      val leafCol = findLeafNodeCol(column, plan)
      leafCol match {
        case Some(col) if t.outputSet.contains(col) =>
          val stats = t.stats
          stats.rowCount match {
            case Some(rowCount) if rowCount >= 0 =>
              if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
                val colStats = stats.attributeStats.get(col)
                if (colStats.get.nullCount > 0) {
                  false
                } else {
                  val distinctCount = colStats.get.distinctCount
                  val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
                  // ndvMaxErr adjusted based on TPCDS 1TB data results
                  relDiff <= conf.ndvMaxError * 2
                }
              } else {
                false
              }
            case None => false
          }
        case None => false
      }
    case _ => false
  }

image

目录
相关文章
|
1月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
42 0
|
1月前
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
78 0
|
SQL 分布式计算 数据库
SparkSQL的解析详解
  SparkSQL继承自Hive的接口,由于hive是基于MapReduce进行计算的,在计算过程中大量的中间数据要落地于磁盘,从而消耗了大量的I/O,降低了运行的效率,从而基于内存运算的SparkSQL应运而生。
1061 0
|
3天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
66 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
52 0
|
1月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
60 0
|
1月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
80 0
|
3天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
16天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
36 3

推荐镜像

更多