SparkSQL中产生笛卡尔积的几种典型场景以及处理策略

简介: 本文介绍都有哪些情况会产生笛卡尔积,以及如何事前"预测"写的SQL会产生笛卡尔积从而避免

本文转载自公众号: 大数据学习与分享
原文链接


【前言:如果你经常使用Spark SQL进行数据的处理分析,那么对笛卡尔积的危害性一定不陌生,比如大量占用集群资源导致其他任务无法正常执行,甚至导致节点宕机。那么都有哪些情况会产生笛卡尔积,以及如何事前"预测"写的SQL会产生笛卡尔积从而避免呢?(以下不考虑业务需求确实需要笛卡尔积的场景)】

Spark SQL几种产生笛卡尔积的典型场景

首先来看一下在Spark SQL中产生笛卡尔积的几种典型SQL:

  1. join语句中不指定on条件
select * from test_partition1 join test_partition2;
  1. join语句中指定不等值连接
select * from test_partition1 t1 inner join test_partition2 t2 on t1.name <> t2.name;
  1. join语句on中用or指定连接条件
select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id or t1.name = t2.name;
  1. join语句on中用||指定连接条件
select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id || t1.name = t2.name;

除了上述举的几个典型例子,实际业务开发中产生笛卡尔积的原因多种多样。

同时需要注意,在一些SQL中即使满足了上述4种规则之一也不一定产生笛卡尔积。比如,对于join语句中指定不等值连接条件的下述SQL不会产生笛卡尔积:
--在Spark SQL内部优化过程中针对join策略的选择,最终会通过SortMergeJoin进行处理。

select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id and t1.name<>t2.name;

此外,对于直接在SQL中使用cross join的方式,也不一定产生笛卡尔积。比如下述SQL:

-- Spark SQL内部优化过程中选择了SortMergeJoin方式进行处理
select * from test_partition1 t1 cross  join test_partition2 t2 on t1.id = t2.id;

但是如果cross join没有指定on条件同样会产生笛卡尔积。
那么如何判断一个SQL是否产生了笛卡尔积呢?

Spark SQL是否产生了笛卡尔积

以join语句不指定on条件产生笛卡尔积的SQL为例:

-- test_partition1和test_partition2是Hive分区表
select * from test_partition1 join test_partition2;

通过Spark UI上SQL一栏查看上述SQL执行图,如下:
56.png

可以看出,因为该join语句中没有指定on连接查询条件,导致了CartesianProduct即笛卡尔积。
再来看一下该join语句的逻辑计划和物理计划:



== Parsed Logical Plan ==
'GlobalLimit 1000
+- 'LocalLimit 1000
   +- 'Project [*]
      +- 'UnresolvedRelation `t`
​
== Analyzed Logical Plan ==
id: string, name: string, dt: string, id: string, name: string, dt: string
GlobalLimit 1000
+- LocalLimit 1000
   +- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]
      +- SubqueryAlias `t`
         +- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]
            +- Join Inner
               :- SubqueryAlias `default`.`test_partition1`
               :  +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
               +- SubqueryAlias `default`.`test_partition2`
                  +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]
​
== Optimized Logical Plan ==
GlobalLimit 1000
+- LocalLimit 1000
   +- Join Inner
      :- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
      +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]
​
== Physical Plan ==
CollectLimit 1000
+- CartesianProduct
   :- Scan hive default.test_partition1 [id#84, name#85, dt#86], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
   +- Scan hive default.test_partition2 [id#87, name#88, dt#89], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]

通过逻辑计划到物理计划,以及最终的物理计划选择CartesianProduct,可以分析得出该SQL最终确实产生了笛卡尔积。

Spark SQL中产生笛卡尔积的处理策略

在之前的文章中《Spark SQL如何选择join策略》已经介绍过,Spark SQL中主要有
1.ExtractEquiJoinKeys(Broadcast Hash Join、Shuffle Hash Join、Sort Merge Join,这3种是我们比较熟知的Spark SQL join)和Without joining keys(CartesianProduct、BroadcastNestedLoopJoin)join策略。

那么,如何判断SQL是否产生了笛卡尔积就迎刃而解。
在利用Spark SQL执行SQL任务时,通过查看SQL的执行图来分析是否产生了笛卡尔积。如果产生笛卡尔积,则将任务杀死,进行任务优化避免笛卡尔积。【不推荐。用户需要到Spark UI上查看执行图,并且需要对Spark UI界面功能等要了解,需要一定的专业性。(注意:这里之所以这样说,是因为Spark SQL是计算引擎,面向的用户角色不同,用户不一定对Spark本身了解透彻,但熟悉SQL。对于做平台的小伙伴儿,想必深有感触)】

2.分析Spark SQL的逻辑计划和物理计划,通过程序解析计划推断SQL最终是否选择了笛卡尔积执行策略。如果是,及时提示风险。

具体可以参考Spark SQL join策略选择的源码:

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// --- BroadcastHashJoin --------------------------------------------------------------------
// broadcast hints were specified
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// --- ShuffledHashJoin ---------------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
           && muchSmaller(right, left) ||
           !RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
           && muchSmaller(left, right) ||
           !RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
// --- SortMergeJoin ------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
        joins.SortMergeJoinExec(
          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
// --- Without joining keys ------------------------------------------------------------
// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
        joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
        val buildSide = broadcastSide(
left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
// This join could be very slow or OOM
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
case _ => Nil
    }

此外,在业务开发中,要不断总结归纳产生笛卡尔积的情况,形成知识文档,以便在后续业务开发中避免类似的情况出现。
除了笛卡尔积效率比较低,BroadcastNestedLoopJoin效率也相对低效,尤其是当数据量大的时候还很容易造成driver端的OOM,这种情况也是需要极力避免的。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
kettle开发篇-记录关联(笛卡尔积)
kettle开发篇-记录关联(笛卡尔积)
25 0
|
4月前
|
分布式计算 负载均衡 算法
Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流
Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流
|
9月前
|
算法 索引
关系查询处理和查询优化
关系查询处理和查询优化
|
SQL Oracle 架构师
PolarDB for MySQL优化器查询变换系列 - join条件下推
本篇是PolarDB 优化器查询变换系列的第四篇,之前的文章请见:窗口函数解相关:https://ata.alibaba-inc.com/articles/194578IN-list变换:https://ata.alibaba-inc.com/articles/254779Join消除:https://ata.alibaba-inc.com/articles/252403引言在数据库的查询优化特性
185 0
PolarDB for MySQL优化器查询变换系列 - join条件下推
es 平行多次聚合查询
es 平行多次聚合查询
91 0
|
SQL 存储 数据库
工作总结之因为笛卡尔积问题写SQL搞了半天[害](附笛卡尔积总结)
在关系数据库中,一个查询往往会涉及多个表,因为很少有数据库只有一个表,而如果大多查询只涉及到一个表的,那么那个表也往往低于第三范式,存在大量冗余和异常。
210 0
工作总结之因为笛卡尔积问题写SQL搞了半天[害](附笛卡尔积总结)
|
SQL Cloud Native 算法
PolarDB 优化器查询变换系列 - join消除
背景众所周知,数据库的查询优化器可以说是整个系统的"大脑",一条查询语句执行的是否高效,在不同的优化器决策下,可能会产生几个数量级的性能差异,因此优化器也是数据库系统中最为核心的组件和核心竞争力之一。对于各个商业数据库,其优化器通过常年积累下来的能力,是其最为核心的商业机密,而另一方面从现有的开源数据库来看,很可惜大多数产品的优化器还都十分初级,也包括老牌的MySQL/Post
184 0
|
SQL 分布式计算 BI
MaxCompute笛卡尔积逻辑的参数优化&复杂JOIN逻辑优化
这篇文章主要讲一个SQL优化反映的两个优化点。分别是: 一、笛卡尔积逻辑的参数优化。 二、一个复杂JOIN逻辑的优化思路。
1979 2
MaxCompute笛卡尔积逻辑的参数优化&复杂JOIN逻辑优化
|
SQL Java 数据库连接
[MyBatisPlus]DQL编程控制②(查询投影、查询条件)
目前我们在查询数据的时候,什么都没有做默认就是查询表中所有字段的内容,我们所说的查询投影即不查询所有字段,只查询出指定内容的数据。
[MyBatisPlus]DQL编程控制②(查询投影、查询条件)
PyODPS DataFrame 处理笛卡尔积的几种方式
PyODPS 提供了 DataFrame API 来用类似 pandas 的接口进行大规模数据分析以及预处理,本文主要介绍如何使用 PyODPS 执行笛卡尔积的操作。 笛卡尔积最常出现的场景是两两之间需要比较或者运算。
13013 0

相关产品

  • 开源大数据平台 E-MapReduce