SparkSQL Catalyst解析

本文涉及的产品
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Catalyst Optimizer是SparkSQL的核心组件(查询优化器),它负责将SQL语句转换成物理执行计划,Catalyst的优劣决定了SQL执行的性能。

Catalyst Optimizer是SparkSQL的核心组件(查询优化器),它负责将SQL语句转换成物理执行计划,Catalyst的优劣决定了SQL执行的性能。

查询优化器是一个SQL引擎的核心,开源常用的有Apache Calcite(很多开源组件都通过引入Calcite来实现查询优化,如Hive/Phoenix/Drill等),另外一个是orca(HAWQ/GreenPlum中使用)。

关系代数是查询优化器的理论基础。常见的查询优化技术:查询重用(ReuseSubquery/ReuseExchange等)/RBO/CBO等。

SparkSQL执行流程

sql1

SparkSQL中对一条SQL语句的处理过程如上图所示:
1) SqlParser将SQL语句解析成一个逻辑执行计划(未解析)
2) Analyzer利用HiveMeta中表/列等信息,对逻辑执行计划进行解析(如表/列是否存在等)
3) SparkOptimizer利用Rule Based(基于经验规则RBO)/Cost Based(基于代价CBO)的优化方法,对逻辑执行计划进行优化(如谓词下推/JoinReorder)
4) SparkPlanner将逻辑执行计划转换成物理执行计划(如Filter -> FilterExec),
同时从某些逻辑算子的多种物理算子实现中根据RBO/CBO选择其中一个合适的物理算子(如Join的多个实现BroadcastJoin/SortMergeJoin/HashJoin中选择一个实现)
5) PrepareForExecution是执行物理执行计划之前做的一些事情,比如ReuseExchange/WholeStageCodegen的处理等等
6) 最终在SparkCore中执行该物理执行计划。

接下来介绍Catalyst中的核心模块SparkOptimizer/SparkPlanner.

SparkOptimizer

使用已有的规则对逻辑执行计划进行优化,该过程是基于经验/启发式的优化方法,得到优化过的逻辑执行计划。

444

如上图所示,Optimizer中有很多Batch,每个Batch中包含1个或多个Rule,Batch的另外一个属性是迭代次数(Once/FixPoint默认100次),每个Batch内部Rule有前后执行顺序,Batch之间也是按照顺序来执行的。目前Optimizer中有60多个Rule。
备注: 从Rule看JoinReorder在这个过程就已经处理了。

SparkPlanner

参考: https://issues.apache.org/jira/browse/SPARK-1251
SparkPlanner将逻辑执行计划转换成物理执行计划,即将逻辑执行计划树中的逻辑节点转换成物理节点,如Join转换成HashJoinExec/SortMergeJoinExec...,Filter转成FilterExec等

666

Spark的Stragety有8个:

  • DataSourceV2Strategy
  • FileSourceStrategy
  • DataSourceStrategy
  • SpecialLimits
  • Aggregation
  • JoinSelection
  • InMemoryScans
  • BasicOperators

上述很多Stragety都是基于规则的策略。
JoinSelection用到了相关的统计信息来选择将Join转换为BroadcastHashJoinExec还是ShuffledHashJoinExec还是SortMergeJoinExec,属于CBO基于代价的策略。

PrepareForExecution

在执行之前,对物理执行计划做一些处理,这些处理都是基于规则的,包括

  • PlanSubqueries
  • EnsureRequirements
  • CollapseCodegenStages
  • ReuseExchange
  • ReuseSubquery

经过上述步骤之后生成的最终物理执行计划提交到Spark执行。

CBO(基于代价)实现

CBO的实现有三个步骤如下,可以大致了解一下:

1. 统计信息采集

Optimizer/Planner中CBO(基于代价)的优化需要采集统计信息,包括表维度和列维度。

//包含表/列
case class Statistics(
    sizeInBytes: BigInt,
    rowCount: Option[BigInt] = None,
    attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
    hints: HintInfo = HintInfo())

//列
case class ColumnStat(
    distinctCount: BigInt,
    min: Option[Any],
    max: Option[Any],
    nullCount: BigInt,
    avgLen: Long,
    maxLen: Long,
    histogram: Option[Histogram] = None)
AI 代码解读

上面结构体用来存储统计信息,可以看出:
表维度: 大小/条数
列维度: NDV/min/max/Null/平均长度/最大长度/直方图

上述信息需要提前使用Analyze命令进行采集

// 采集表维度的统计信息,NOSCAN表示不扫描表(即只有表大小信息,不采集表条数信息)
ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS [NOSCAN];

// 采集列信息
// 若spark.sql.statistics.histogram.enabled设置为true,则会采集直方图信息
// 采集直方图信息需要额外一次的表扫描
// 使用的是等高直方图
// 只支持IntegralType/DoubleType/DecimalType/FloatType/DateType/TimestampType的列采集直方图
ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;
AI 代码解读

2.估算算子统计信息

逻辑执行计划树中只有叶子节点(表)有实际的统计信息(通过Analyze获取), 逻辑执行计划树中非叶子节点会根据子节点信息以及估算方法获取本节点的统计信息。

/**
 * Returns the estimated statistics for the current logical plan node. Under the hood, this
 * method caches the return value, which is computed based on the configuration passed in the
 * first time. If the configuration changes, the cache can be invalidated by calling
 * [[invalidateStatsCache()]].
 */
def stats: Statistics = statsCache.getOrElse {
  if (conf.cboEnabled) {
    statsCache = Option(BasicStatsPlanVisitor.visit(self))
  } else {
    statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
  }
  statsCache.get
}

def visit(p: LogicalPlan): T = p match {
    case p: Aggregate => visitAggregate(p)
    case p: Distinct => visitDistinct(p)
    case p: Except => visitExcept(p)
    case p: Expand => visitExpand(p)
    case p: Filter => visitFilter(p)
    case p: Generate => visitGenerate(p)
    case p: GlobalLimit => visitGlobalLimit(p)
    case p: Intersect => visitIntersect(p)
    case p: Join => visitJoin(p)
    case p: LocalLimit => visitLocalLimit(p)
    case p: Pivot => visitPivot(p)
    case p: Project => visitProject(p)
    case p: Repartition => visitRepartition(p)
    case p: RepartitionByExpression => visitRepartitionByExpr(p)
    case p: ResolvedHint => visitHint(p)
    case p: Sample => visitSample(p)
    case p: ScriptTransformation => visitScriptTransform(p)
    case p: Union => visitUnion(p)
    case p: Window => visitWindow(p)
    case p: LogicalPlan => default(p)
  }
AI 代码解读

每个算子都有自己的预估方法
CBO打开/关闭,有些算子的预估方法不一样,如AggregateEstimation/FilterEstimation/JoinEstimation/ProjectEstimation,其它算子CBO打开/关闭使用一套预估方法。

3.基于统计信息的优化

统计信息越准确,基于统计信息的优化更准确,从目前代码看只有下面三种场景使用到了统计信息。

JoinReorder

动态规划

//代价函数
//weight可以通过参数控制spark.sql.cbo.joinReorder.card.weight,默认0.7
//根据行数/大小来计算代价
cost = rows * weight + size * (1 - weight)

// 比较两种Join的代价大小
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
      if (other.planCost.card == 0 || other.planCost.size == 0) {
        false
      } else {
        val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
        val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
        relativeRows * conf.joinReorderCardWeight +
          relativeSize * (1 - conf.joinReorderCardWeight) < 1
      }
}
AI 代码解读
JoinSelection

根据Join两个子节点的统计信息,判断使用BroadcastHashJoinExec还是ShuffledHashJoinExec还是SortMergeJoinExec,比如其中一个表(size)很小则可以使用BroadcastHashJoinExec。

StarSchemaDetection

探测星型模型,判断一个列是否是表的主键(因为SparkSQL不支持主键设置)

/**
 * Determines if a column referenced by a base table access is a primary key.
 * A column is a PK if it is not nullable and has unique values.
 * To determine if a column has unique values in the absence of informational
 * RI constraints, the number of distinct values is compared to the total
 * number of rows in the table. If their relative difference
 * is within the expected limits (i.e. 2 * spark.sql.statistics.ndv.maxError based
 * on TPC-DS data results), the column is assumed to have unique values.
 */
  private def isUnique(
      column: Attribute,
      plan: LogicalPlan): Boolean = plan match {
    case PhysicalOperation(_, _, t: LeafNode) =>
      val leafCol = findLeafNodeCol(column, plan)
      leafCol match {
        case Some(col) if t.outputSet.contains(col) =>
          val stats = t.stats
          stats.rowCount match {
            case Some(rowCount) if rowCount >= 0 =>
              if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
                val colStats = stats.attributeStats.get(col)
                if (colStats.get.nullCount > 0) {
                  false
                } else {
                  val distinctCount = colStats.get.distinctCount
                  val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
                  // ndvMaxErr adjusted based on TPCDS 1TB data results
                  relDiff <= conf.ndvMaxError * 2
                }
              } else {
                false
              }
            case None => false
          }
        case None => false
      }
    case _ => false
  }
AI 代码解读

image

目录
打赏
0
0
0
0
1306
分享
相关文章
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
150 0
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
135 0
SparkSQL的解析详解
  SparkSQL继承自Hive的接口,由于hive是基于MapReduce进行计算的,在计算过程中大量的中间数据要落地于磁盘,从而消耗了大量的I/O,降低了运行的效率,从而基于内存运算的SparkSQL应运而生。
1081 0
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
142 2
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析

推荐镜像

更多