SparkSQL中产生笛卡尔积的几种典型场景以及处理策略

简介: 本文介绍都有哪些情况会产生笛卡尔积,以及如何事前"预测"写的SQL会产生笛卡尔积从而避免

本文转载自公众号: 大数据学习与分享
原文链接


【前言:如果你经常使用Spark SQL进行数据的处理分析,那么对笛卡尔积的危害性一定不陌生,比如大量占用集群资源导致其他任务无法正常执行,甚至导致节点宕机。那么都有哪些情况会产生笛卡尔积,以及如何事前"预测"写的SQL会产生笛卡尔积从而避免呢?(以下不考虑业务需求确实需要笛卡尔积的场景)】

Spark SQL几种产生笛卡尔积的典型场景

首先来看一下在Spark SQL中产生笛卡尔积的几种典型SQL:

  1. join语句中不指定on条件
select * from test_partition1 join test_partition2;
  1. join语句中指定不等值连接
select * from test_partition1 t1 inner join test_partition2 t2 on t1.name <> t2.name;
  1. join语句on中用or指定连接条件
select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id or t1.name = t2.name;
  1. join语句on中用||指定连接条件
select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id || t1.name = t2.name;

除了上述举的几个典型例子,实际业务开发中产生笛卡尔积的原因多种多样。

同时需要注意,在一些SQL中即使满足了上述4种规则之一也不一定产生笛卡尔积。比如,对于join语句中指定不等值连接条件的下述SQL不会产生笛卡尔积:
--在Spark SQL内部优化过程中针对join策略的选择,最终会通过SortMergeJoin进行处理。

select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id and t1.name<>t2.name;

此外,对于直接在SQL中使用cross join的方式,也不一定产生笛卡尔积。比如下述SQL:

-- Spark SQL内部优化过程中选择了SortMergeJoin方式进行处理
select * from test_partition1 t1 cross  join test_partition2 t2 on t1.id = t2.id;

但是如果cross join没有指定on条件同样会产生笛卡尔积。
那么如何判断一个SQL是否产生了笛卡尔积呢?

Spark SQL是否产生了笛卡尔积

以join语句不指定on条件产生笛卡尔积的SQL为例:

-- test_partition1和test_partition2是Hive分区表
select * from test_partition1 join test_partition2;

通过Spark UI上SQL一栏查看上述SQL执行图,如下:
56.png

可以看出,因为该join语句中没有指定on连接查询条件,导致了CartesianProduct即笛卡尔积。
再来看一下该join语句的逻辑计划和物理计划:



== Parsed Logical Plan ==
'GlobalLimit 1000
+- 'LocalLimit 1000
   +- 'Project [*]
      +- 'UnresolvedRelation `t`
​
== Analyzed Logical Plan ==
id: string, name: string, dt: string, id: string, name: string, dt: string
GlobalLimit 1000
+- LocalLimit 1000
   +- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]
      +- SubqueryAlias `t`
         +- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]
            +- Join Inner
               :- SubqueryAlias `default`.`test_partition1`
               :  +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
               +- SubqueryAlias `default`.`test_partition2`
                  +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]
​
== Optimized Logical Plan ==
GlobalLimit 1000
+- LocalLimit 1000
   +- Join Inner
      :- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
      +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]
​
== Physical Plan ==
CollectLimit 1000
+- CartesianProduct
   :- Scan hive default.test_partition1 [id#84, name#85, dt#86], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
   +- Scan hive default.test_partition2 [id#87, name#88, dt#89], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]

通过逻辑计划到物理计划,以及最终的物理计划选择CartesianProduct,可以分析得出该SQL最终确实产生了笛卡尔积。

Spark SQL中产生笛卡尔积的处理策略

在之前的文章中《Spark SQL如何选择join策略》已经介绍过,Spark SQL中主要有
1.ExtractEquiJoinKeys(Broadcast Hash Join、Shuffle Hash Join、Sort Merge Join,这3种是我们比较熟知的Spark SQL join)和Without joining keys(CartesianProduct、BroadcastNestedLoopJoin)join策略。

那么,如何判断SQL是否产生了笛卡尔积就迎刃而解。
在利用Spark SQL执行SQL任务时,通过查看SQL的执行图来分析是否产生了笛卡尔积。如果产生笛卡尔积,则将任务杀死,进行任务优化避免笛卡尔积。【不推荐。用户需要到Spark UI上查看执行图,并且需要对Spark UI界面功能等要了解,需要一定的专业性。(注意:这里之所以这样说,是因为Spark SQL是计算引擎,面向的用户角色不同,用户不一定对Spark本身了解透彻,但熟悉SQL。对于做平台的小伙伴儿,想必深有感触)】

2.分析Spark SQL的逻辑计划和物理计划,通过程序解析计划推断SQL最终是否选择了笛卡尔积执行策略。如果是,及时提示风险。

具体可以参考Spark SQL join策略选择的源码:

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// --- BroadcastHashJoin --------------------------------------------------------------------
// broadcast hints were specified
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// --- ShuffledHashJoin ---------------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
           && muchSmaller(right, left) ||
           !RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
           && muchSmaller(left, right) ||
           !RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
// --- SortMergeJoin ------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
        joins.SortMergeJoinExec(
          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
// --- Without joining keys ------------------------------------------------------------
// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
        joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
        val buildSide = broadcastSide(
left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
// This join could be very slow or OOM
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
case _ => Nil
    }

此外,在业务开发中,要不断总结归纳产生笛卡尔积的情况,形成知识文档,以便在后续业务开发中避免类似的情况出现。
除了笛卡尔积效率比较低,BroadcastNestedLoopJoin效率也相对低效,尤其是当数据量大的时候还很容易造成driver端的OOM,这种情况也是需要极力避免的。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关文章
|
SQL 缓存 分布式计算
SparkSQL与Hive metastore Parquet转换
Spark SQL为了更好的性能,在读写Hive metastore parquet格式的表时,会默认使用自己的Parquet SerDe,而不是采用Hive的SerDe进行序列化和反序列化
SparkSQL与Hive metastore Parquet转换
|
SQL 存储 HIVE
hive分区与分桶
hive分区与分桶
853 1
|
机器学习/深度学习 缓存 编解码
AIGC 商业化道路探索 - Stable Diffusion 商业化应用(上)
Stable Diffusion 应用到商业领域的案例越来越多,商用场景下的技术架构应当如何构建?本文基于阿里云近期的一个 Stable Diffusion 商业案例,对大规模底模切换、大量 LoRA 调优的场景提出一个商业场景适用的技术架构,并已实现部署交付,稳定运行。
|
Java 数据库
史上最全的 IDEA Debug 调试技巧(超详细案例)(二)
史上最全的 IDEA Debug 调试技巧(超详细案例)(二)
史上最全的 IDEA Debug 调试技巧(超详细案例)(二)
|
SQL 分布式计算 算法
聊聊 Spark 作业的 commit 提交机制 - Spark并发更新ORC表失败的问题原因与解决方法
聊聊 Spark 作业的 commit 提交机制 - Spark并发更新ORC表失败的问题原因与解决方法
聊聊 Spark 作业的 commit 提交机制 - Spark并发更新ORC表失败的问题原因与解决方法
|
Java 机器人 程序员
从入门到精通:五种 List 遍历方法对比与实战指南
小米是一位热爱分享技术的程序员,本文详细介绍了 Java 中遍历 List 的五种方式:经典 for 循环、增强 for 循环、Iterator 和 ListIterator、Stream API 以及 forEach 方法。每种方式都有其适用场景和优缺点,例如 for 循环适合频繁访问索引,增强 for 循环和 forEach 方法代码简洁,Stream API 适合大数据量操作,ListIterator 支持双向遍历。文章通过生动的小故事和代码示例,帮助读者更好地理解和选择合适的遍历方式。
770 2
|
JavaScript 前端开发 API
Vue 3新特性详解:Composition API的威力
【10月更文挑战第25天】Vue 3 引入的 Composition API 是一组用于组织和复用组件逻辑的新 API。相比 Options API,它提供了更灵活的结构,便于逻辑复用和代码组织,特别适合复杂组件。本文将探讨 Composition API 的优势,并通过示例代码展示其基本用法,帮助开发者更好地理解和应用这一强大工具。
416 2
Resource cmudict not found. Please use the NLTK Downloader to obtain the resource:
这篇文章介绍了在使用DeepVoice3_pytorch时遇到的“Resource cmudict not found”错误,以及如何通过NLTK Downloader下载缺失的cmudict资源来解决该问题。
Resource cmudict not found. Please use the NLTK Downloader to obtain the resource:
|
机器学习/深度学习 存储 人工智能
政务部门人工智能OCR智能化升级:3大技术架构与4项核心功能解析
本项目针对政务服务数字化需求,建设智能文档处理平台,利用OCR、信息抽取和深度学习技术,实现文件自动解析、分类、比对与审核,提升效率与准确性。平台强调本地部署,确保数据安全,解决低质量扫描件、复杂表格等痛点,降低人工成本与错误率,助力智慧政务发展。
717 0
|
安全 Java Maven
优化Maven镜像配置:使用阿里云加速依赖下载
更新Maven镜像配置至关重要,尤其使用阿里云仓库时。在`settings.xml`中加入特定镜像配置可显著提升依赖下载速度。示例配置指定了阿里云镜像ID、替代表态仓库、安全的URL、默认布局及启用版本管理。需定位至用户目录下的`.m2/`文件夹编辑`settings.xml`,添加镜像信息后保存测试。若下载仍慢,考虑网络状况或备选镜像。多镜像设置时需注意避免冲突。
3330 3
下一篇
开通oss服务