开发者社区> 问答> 正文

在PySpark的文字列中检测到INNER连接的笛卡尔积

以下代码引发“检测到INNER联接的笛卡尔积”异常:

first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
second_df = spark.createDataFrame([{"some_value": "????"}, ])

second_df = second_df.withColumn("second_id", F.lit("1"))

If the next line is uncommented, then the JOIN is working fine.

second_df.persist()

result_df = first_df.join(second_df,

                      first_df.first_id == second_df.second_id,
                      'inner')

data = result_df.collect()

result_df.explain()
并告诉我逻辑计划如下所示:

Filter (first_id#0 = 1)
+- LogicalRDD [first_id#0], false
and
Project [some_value#2, 1 AS second_id#4]
+- LogicalRDD [some_value#2], false
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
看起来有一个原因,当RuleExecutor应用名为CheckCartesianProducts的优化规则集时,这些逻辑计划的JOIN条件中不存在列(请参阅https://github.com/apache/spark/blob/v2.3.0/sql/ catalyst / src / main / scala / org / apache / spark / sql / catalyst / optimizer / Optimizer.scala#L1114)。

但是,如果我在JOIN之前使用“persist”方法,它的工作原理和物理计划是:

*(3) SortMergeJoin [first_id#0], [second_id#4], Inner
:- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(first_id#0, 10)
: +- Scan ExistingRDD[first_id#0]
+- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(second_id#4, 10)

  +- InMemoryTableScan [some_value#2, second_id#4]
        +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
              +- *(1) Project [some_value#2, 1 AS second_id#4]
                 +- Scan ExistingRDD[some_value#2]

所以,可能有人可以解释内部导致这样的结果,因为持久化数据框架看起来并不像解决方案。

展开
收起
社区小助手 2018-12-11 17:23:44 2461 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    问题是,一旦您持久保存数据,second_id就会将其合并到缓存表中,不再被视为常量。因此,计划程序无法再推断查询应该表示为笛卡尔积,并使用标准SortMergeJoin的哈希分区second_id。

    在不使用持久性的情况下实现相同的结果将是微不足道的 udf

    from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

    @pandas_udf('integer', PandasUDFType.SCALAR)
    def identity(x):

    return x    
    

    second_df = second_df.withColumn('second_id', identity(lit(1)))

    result_df = first_df.join(second_df,

                         first_df.first_id == second_df.second_id,
                         'inner')
    

    result_df.explain()

    == Physical Plan ==
    *(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
    :- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
    : +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
    : +- *(1) Filter isnotnull(first_id#4)
    : +- Scan ExistingRDD[first_id#4]
    +- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
    +- Exchange hashpartitioning(second_id#129, 200)

      +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
         +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
            +- *(3) Project [some_value#6]
               +- *(3) Filter isnotnull(pythonUDF0#153)
                  +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
                     +- Scan ExistingRDD[some_value#6]

    然而,SortMergeJoin这不是你应该尝试在这里实现的。使用常量键,除了玩具数据外,它会导致极端的数据偏差,并且可能会失败。

    然而,笛卡尔积虽然价格昂贵,但不会遇到这个问题,因此这里应该优先考虑。因此,它建议启用交叉连接或使用显式交叉连接语法(Spark 2.x的spark.sql.crossJoin.enabled)并继续。

    一个悬而未决的问题仍然是如何在缓存数据时防止意外行为。不幸的是,我没有为此做好准备。我相当确定可以使用自定义优化器规则,但这不是单独用Python完成的事情。

    2019-07-17 23:19:56
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载