SPARK SQL中 CTE(with表达式)会影响性能么?

简介: SPARK SQL中 CTE(with表达式)会影响性能么?

背景及问题


本文基于spark 3.1.2

最近在排查spark sql问题的时候,出现了一系列的(CTE)with操作,导致该任务运行不出来,而把对应的(CTE)with 替换成了临时表以后,任务很快的就能运行出来

对应的最简化的sql如下:

with temp1 as (
  select 
    null as user_id
    ,a.sku_id
  from xxx.xxx `a`
  where pt between '20211228' and '20220313'
  group by 
    a.sku_id),
temp2 as (
  select  
    a.xxx_code user_id
    ,a.sku_id 
  from xxx.xxx_1`a`
  left join xxx.xxx_2 `c` on c.pt='20220313' and a.xxx_code=c.xxx_code and c.xxx_id=1
  where a.pt='20220313'
  and TO_CHAR(upper_time,'yyyymmdd') >= '20220230'
  group by 
     a.xxx_code 
    ,a.sku_id)
select 
 *
 from (
 select 
 a1.sku_id,
 a1.user_id
 from temp1 `a1`
 -- BroadcastNestedLoopJoin
 full join temp2 `a5` on a1.user_id=a5.user_id and a1.sku_id=a5.sku_id 
 );

先说结论,其实是null as user_id 这块代码在作为join条件的时候被优化成布尔表达式false

分析

运行此sql,我们可以得到一下的物理计划:

image.png

我们看到 temp1和temp2的join的居然是BroadcastNestedLoopJoin,要知道BroadcastNestedLoopJoin的时间复杂度是O(M*N)的,这在数据大的情况下是很难计算出来的。

并且我们查看对应的代码JoinSelection.scala的时候,发现对于有等值条件的join的情况下,而且join的条件是可排序的情况下,最次也是会变成SortMergeJoin,对应的代码如下:

def createJoinWithoutHint() = {
          createBroadcastHashJoin(false)
            .orElse {
              if (!conf.preferSortMergeJoin) {
                createShuffleHashJoin(false)
              } else {
                None
              }
            }
            .orElse(createSortMergeJoin())
            .orElse(createCartesianProduct())
            .getOrElse {
              // This join could be very slow or OOM
              val buildSide = getSmallerSide(left, right)
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
            }
        }

这部分的代码比较简单,暂且跳过。

就在百思不得其解的时候,还是最重要的一步,查看对应的逻辑计划日志:

直接重点(我们这里只说join条件部分的变化):

  • 解析完后的初始计划 为
 Join FullOuter, (('a1.user_id = 'a5.user_id) AND ('a1.sku_id = 'a5.sku_id))
  • 经过PromoteStrings规则
  Join FullOuter, ((user_id#3 = user_id#13) AND (sku_id#15 = sku_id#98)) 
                   ||
                   \/ 
  Join FullOuter, ((null = user_id#13) AND (sku_id#15 = sku_id#98))
  • 经过NullPropagation规则
Join FullOuter, ((null = user_id#13) AND (sku_id#15 = sku_id#98)) 
                   ||
                   \/
Join FullOuter, (null AND (sku_id#15 = sku_id#98))
  • 经过ReplaceNullWithFalseInPredicate规则
Join FullOuter, (null AND (sku_id#15 = sku_id#98))
                   ||
                   \/ 
Join FullOuter, (false AND (sku_id#15 = sku_id#98))
  • 经过BooleanSimplification规则
Join FullOuter, (false AND (sku_id#15 = sku_id#98))
                   ||
                   \/  
Join FullOuter, false

这样一步一步下来,其实最终的join条件就变成了 布尔表达式 false。

我们再看JoinSelection.scala 中join对应非等值条件case的判断:

      case logical.Join(left, right, joinType, condition, hint) =>
        val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) {
          getSmallerSide(left, right)
        } else {
          // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to broadcast left side if
          // it's a right join, and broadcast right side if it's a left join.
          // TODO: revisit it. If left side is much smaller than the right side, it may be better
          // to broadcast the left side even if it's a left join.
          if (canBuildBroadcastLeft(joinType)) BuildLeft else BuildRight
        }
...
      def createJoinWithoutHint() = {
          createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
            .orElse(createCartesianProduct())
            .getOrElse {
              // This join could be very slow or OOM
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), desiredBuildSide, joinType, condition))
            }
        }
     createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
       .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
       .getOrElse(createJoinWithoutHint())

最终还是会调用createJoinWithoutHint生成BroadcastNestedLoopJoinExec。


解决方法及总结


改写成临时表

把with改写成临时表,这是有益处的,因为在某些场景下会触发到AQE中的特性,而且改写成临时表后,任务是串行的,能够减少因为资源问题导致的任务运行缓慢问题(笔者曾经有遇到过)

注意:改成临时表的情况下,不能存在null as user_id的语句,否则会报错:

Caused by: org.apache.spark.sql.AnalysisException: Cannot create tables with null type.

把null as user_id改写成0 as user_id

根据之前的分析,导致变成BroadcastNestedLoopJoinExec的原因是null作为了join条件引发的,我们可以改写就好

其实CTE操作并不是影响性能的主要原因,主要原因还是在于spark对于某种case的处理,这种还会得具体case具体分析处理。

当然也可以参考Why is my CTE so slow?.


相关文章
|
2月前
|
SQL 数据库 UED
SQL性能提升秘籍:5步优化法与10个实战案例
在数据库管理和应用开发中,SQL查询的性能优化至关重要。高效的SQL查询不仅可以提高应用的响应速度,还能降低服务器负载,提升用户体验。本文将分享SQL优化的五大步骤和十个实战案例,帮助构建高效、稳定的数据库应用。
75 3
|
2月前
|
SQL IDE 数据库连接
IntelliJ IDEA处理大文件SQL:性能优势解析
在数据库开发和管理工作中,执行大型SQL文件是一个常见的任务。传统的数据库管理工具如Navicat在处理大型SQL文件时可能会遇到性能瓶颈。而IntelliJ IDEA,作为一个强大的集成开发环境,提供了一些高级功能,使其在执行大文件SQL时表现出色。本文将探讨IntelliJ IDEA在处理大文件SQL时的性能优势,并与Navicat进行比较。
35 4
|
2月前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
167 10
|
2月前
|
SQL 关系型数据库 MySQL
惊呆:where 1=1 可能严重影响性能,差了10多倍,快去排查你的 sql
老架构师尼恩在读者交流群中分享了关于MySQL中“where 1=1”条件的性能影响及其解决方案。该条件在动态SQL中常用,但可能在无真实条件时导致全表扫描,严重影响性能。尼恩建议通过其他条件或SQL子句命中索引,或使用MyBatis的`<where>`标签来避免性能问题。他还提供了详细的执行计划分析和优化建议,帮助大家在面试中展示深厚的技术功底,赢得面试官的青睐。更多内容可参考《尼恩Java面试宝典PDF》。
|
2月前
|
SQL 缓存 监控
SQL性能提升指南:五大优化策略与十个实战案例
在数据库性能优化的世界里,SQL优化是提升查询效率的关键。一个高效的SQL查询可以显著减少数据库的负载,提高应用响应速度,甚至影响整个系统的稳定性和扩展性。本文将介绍SQL优化的五大步骤,并结合十个实战案例,为你提供一份详尽的性能提升指南。
55 0
|
2月前
|
SQL JSON 分布式计算
【赵渝强老师】Spark SQL的数据模型:DataFrame
本文介绍了在Spark SQL中创建DataFrame的三种方法。首先,通过定义case class来创建表结构,然后将CSV文件读入RDD并关联Schema生成DataFrame。其次,使用StructType定义表结构,同样将CSV文件读入RDD并转换为Row对象后创建DataFrame。最后,直接加载带有格式的数据文件(如JSON),通过读取文件内容直接创建DataFrame。每种方法都包含详细的代码示例和解释。
|
3月前
|
SQL 监控 数据库
慢SQL对数据库写入性能的影响及优化技巧
在数据库管理系统中,慢SQL(即执行缓慢的SQL语句)不仅会影响查询性能,还可能对数据库的写入性能产生显著的不利影响
|
3月前
|
SQL 存储 数据库
慢SQL对数据库写入性能的影响及优化技巧
在数据库管理系统中,慢SQL(即执行缓慢的SQL语句)不仅会影响查询性能,还可能对数据库的写入性能产生显著的不利影响
|
3月前
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
80 0
|
分布式计算 大数据 Spark
7月30日产品直播【EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework】
EMR团队探索并开发了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来最高4倍性能提升,为EMR再次获取世界第一立下汗马功劳,本次直播将详细介绍Native Codegen框架。
7月30日产品直播【EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework】