SPARK SQL中 CTE(with表达式)会影响性能么?

简介: SPARK SQL中 CTE(with表达式)会影响性能么?

背景及问题


本文基于spark 3.1.2

最近在排查spark sql问题的时候,出现了一系列的(CTE)with操作,导致该任务运行不出来,而把对应的(CTE)with 替换成了临时表以后,任务很快的就能运行出来

对应的最简化的sql如下:

with temp1 as (
  select 
    null as user_id
    ,a.sku_id
  from xxx.xxx `a`
  where pt between '20211228' and '20220313'
  group by 
    a.sku_id),
temp2 as (
  select  
    a.xxx_code user_id
    ,a.sku_id 
  from xxx.xxx_1`a`
  left join xxx.xxx_2 `c` on c.pt='20220313' and a.xxx_code=c.xxx_code and c.xxx_id=1
  where a.pt='20220313'
  and TO_CHAR(upper_time,'yyyymmdd') >= '20220230'
  group by 
     a.xxx_code 
    ,a.sku_id)
select 
 *
 from (
 select 
 a1.sku_id,
 a1.user_id
 from temp1 `a1`
 -- BroadcastNestedLoopJoin
 full join temp2 `a5` on a1.user_id=a5.user_id and a1.sku_id=a5.sku_id 
 );

先说结论,其实是null as user_id 这块代码在作为join条件的时候被优化成布尔表达式false

分析

运行此sql,我们可以得到一下的物理计划:

image.png

我们看到 temp1和temp2的join的居然是BroadcastNestedLoopJoin,要知道BroadcastNestedLoopJoin的时间复杂度是O(M*N)的,这在数据大的情况下是很难计算出来的。

并且我们查看对应的代码JoinSelection.scala的时候,发现对于有等值条件的join的情况下,而且join的条件是可排序的情况下,最次也是会变成SortMergeJoin,对应的代码如下:

def createJoinWithoutHint() = {
          createBroadcastHashJoin(false)
            .orElse {
              if (!conf.preferSortMergeJoin) {
                createShuffleHashJoin(false)
              } else {
                None
              }
            }
            .orElse(createSortMergeJoin())
            .orElse(createCartesianProduct())
            .getOrElse {
              // This join could be very slow or OOM
              val buildSide = getSmallerSide(left, right)
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
            }
        }

这部分的代码比较简单,暂且跳过。

就在百思不得其解的时候,还是最重要的一步,查看对应的逻辑计划日志:

直接重点(我们这里只说join条件部分的变化):

  • 解析完后的初始计划 为
 Join FullOuter, (('a1.user_id = 'a5.user_id) AND ('a1.sku_id = 'a5.sku_id))
  • 经过PromoteStrings规则
  Join FullOuter, ((user_id#3 = user_id#13) AND (sku_id#15 = sku_id#98)) 
                   ||
                   \/ 
  Join FullOuter, ((null = user_id#13) AND (sku_id#15 = sku_id#98))
  • 经过NullPropagation规则
Join FullOuter, ((null = user_id#13) AND (sku_id#15 = sku_id#98)) 
                   ||
                   \/
Join FullOuter, (null AND (sku_id#15 = sku_id#98))
  • 经过ReplaceNullWithFalseInPredicate规则
Join FullOuter, (null AND (sku_id#15 = sku_id#98))
                   ||
                   \/ 
Join FullOuter, (false AND (sku_id#15 = sku_id#98))
  • 经过BooleanSimplification规则
Join FullOuter, (false AND (sku_id#15 = sku_id#98))
                   ||
                   \/  
Join FullOuter, false

这样一步一步下来,其实最终的join条件就变成了 布尔表达式 false。

我们再看JoinSelection.scala 中join对应非等值条件case的判断:

      case logical.Join(left, right, joinType, condition, hint) =>
        val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) {
          getSmallerSide(left, right)
        } else {
          // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to broadcast left side if
          // it's a right join, and broadcast right side if it's a left join.
          // TODO: revisit it. If left side is much smaller than the right side, it may be better
          // to broadcast the left side even if it's a left join.
          if (canBuildBroadcastLeft(joinType)) BuildLeft else BuildRight
        }
...
      def createJoinWithoutHint() = {
          createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
            .orElse(createCartesianProduct())
            .getOrElse {
              // This join could be very slow or OOM
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), desiredBuildSide, joinType, condition))
            }
        }
     createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
       .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
       .getOrElse(createJoinWithoutHint())

最终还是会调用createJoinWithoutHint生成BroadcastNestedLoopJoinExec。


解决方法及总结


改写成临时表

把with改写成临时表,这是有益处的,因为在某些场景下会触发到AQE中的特性,而且改写成临时表后,任务是串行的,能够减少因为资源问题导致的任务运行缓慢问题(笔者曾经有遇到过)

注意:改成临时表的情况下,不能存在null as user_id的语句,否则会报错:

Caused by: org.apache.spark.sql.AnalysisException: Cannot create tables with null type.

把null as user_id改写成0 as user_id

根据之前的分析,导致变成BroadcastNestedLoopJoinExec的原因是null作为了join条件引发的,我们可以改写就好

其实CTE操作并不是影响性能的主要原因,主要原因还是在于spark对于某种case的处理,这种还会得具体case具体分析处理。

当然也可以参考Why is my CTE so slow?.


相关文章
|
2月前
|
SQL
SQL如何在CTE中使用Order By的功能
SQL Server如何在CTE中使用Order By的功能
|
2月前
|
SQL JSON 分布式计算
Spark SQL架构及高级用法
Spark SQL基于Catalyst优化器与Tungsten引擎,提供高效的数据处理能力。其架构涵盖SQL解析、逻辑计划优化、物理计划生成及分布式执行,支持复杂数据类型、窗口函数与多样化聚合操作,结合自适应查询与代码生成技术,实现高性能大数据分析。
|
3月前
|
SQL 关系型数据库 PostgreSQL
CTE vs 子查询:深入拆解PostgreSQL复杂SQL的隐藏性能差异
本文深入探讨了PostgreSQL中CTE(公共表表达式)与子查询的选择对SQL性能的影响。通过分析两者底层机制,揭示CTE的物化特性及子查询的优化融合优势,并结合多场景案例对比执行效率。最终给出决策指南,帮助开发者根据数据量、引用次数和复杂度选择最优方案,同时提供高级优化技巧和版本演进建议,助力SQL性能调优。
306 1
|
4月前
|
消息中间件 分布式计算 监控
从InfluxDB到StarRocks:Grab实现Spark监控平台10倍性能提升
Grab 是东南亚领先的超级应用,其 Spark 可观测平台 Iris 核心存储迁移到 StarRocks 后性能显著提升。新架构统一了实时与历史数据分析,减少多平台切换复杂性,查询速度提升 10 倍以上,资源使用效率提高 40%。通过物化视图、动态分区和直接 Kafka 摄取数据等优化,简化数据管道并降低运维成本。未来 Grab 将进一步增强推荐系统、集成机器学习,持续优化用户体验与系统可扩展性。
|
6月前
|
SQL 关系型数据库 MySQL
如何优化SQL查询以提高数据库性能?
这篇文章以生动的比喻介绍了优化SQL查询的重要性及方法。它首先将未优化的SQL查询比作在自助餐厅贪多嚼不烂的行为,强调了只获取必要数据的必要性。接着,文章详细讲解了四种优化策略:**精简选择**(避免使用`SELECT *`)、**专业筛选**(利用`WHERE`缩小范围)、**高效联接**(索引和限制数据量)以及**使用索引**(加速搜索)。此外,还探讨了如何避免N+1查询问题、使用分页限制结果、理解执行计划以及定期维护数据库健康。通过这些技巧,可以显著提升数据库性能,让查询更高效流畅。
|
6月前
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
197 4
|
7月前
|
SQL 关系型数据库 OLAP
云原生数据仓库AnalyticDB PostgreSQL同一个SQL可以实现向量索引、全文索引GIN、普通索引BTREE混合查询,简化业务实现逻辑、提升查询性能
本文档介绍了如何在AnalyticDB for PostgreSQL中创建表、向量索引及混合检索的实现步骤。主要内容包括:创建`articles`表并设置向量存储格式,创建ANN向量索引,为表增加`username`和`time`列,建立BTREE索引和GIN全文检索索引,并展示了查询结果。参考文档提供了详细的SQL语句和配置说明。
181 2
|
8月前
|
SQL Oracle 关系型数据库
如何在 Oracle 中配置和使用 SQL Profiles 来优化查询性能?
在 Oracle 数据库中,SQL Profiles 是优化查询性能的工具,通过提供额外统计信息帮助生成更有效的执行计划。配置和使用步骤包括:1. 启用自动 SQL 调优;2. 手动创建 SQL Profile,涉及收集、执行调优任务、查看报告及应用建议;3. 验证效果;4. 使用 `DBA_SQL_PROFILES` 视图管理 Profile。
|
8月前
|
SQL 分布式计算 Java
Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
本文摘自 Arm China的工程师顾煜祺关于“在 Arm 平台上使用 Native 算子库加速 Spark”的分享,主要内容包括以下四个部分: 1.技术背景 2.算子库构成 3.算子操作优化 4.未来工作
928 0
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
584 0

热门文章

最新文章