spark full outer join 数据倾斜导致OOM

简介: spark full outer join 数据倾斜导致OOM

spark full outer join目前存在一个问题,那就是在数据倾斜的时候,会导致Execuotr OOM:具体的问题描述,可以见SPARK-24985,转述一下就是:


SortMergeJoinExec类以下代码块的处理:

doExecute 
   ||
   \/
case FullOuter =>
     val leftNullRow = new GenericInternalRow(left.output.length)
     val rightNullRow = new GenericInternalRow(right.output.length)
     val smjScanner = new SortMergeFullOuterJoinScanner(
     leftKeyGenerator = createLeftKeyGenerator(),
     rightKeyGenerator = createRightKeyGenerator(),
     keyOrdering,
     leftIter = RowIterator.fromScala(leftIter),
     rightIter = RowIterator.fromScala(rightIter),
     boundCondition,
     leftNullRow,
     rightNullRow)
     new FullOuterIterator(
      smjScanner,
      resultProj,
      numOutputRows).toScala

其中SortMergeFullOuterJoinScanner在迭代的时候对左右两边匹配的数据保存在内存中,这个内存是没有边界的,具体的处理方法如下:

private def findMatchingRows(matchingKey: InternalRow): Unit = {
    leftMatches.clear()
    rightMatches.clear()
    leftIndex = 0
    rightIndex = 0
    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
      leftMatches += leftRow.copy()
      advancedLeft()
    }
    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
      rightMatches += rightRow.copy()
      advancedRight()
    }
    if (leftMatches.size <= leftMatched.capacity) {
      leftMatched.clearUntil(leftMatches.size)
    } else {
      leftMatched = new BitSet(leftMatches.size)
    }
    if (rightMatches.size <= rightMatched.capacity) {
      rightMatched.clearUntil(rightMatches.size)
    } else {
      rightMatched = new BitSet(rightMatches.size)
    }
  }


相关文章
|
4月前
|
SQL 分布式计算 HIVE
Spark数据倾斜问题分析和解决
Spark数据倾斜问题分析和解决
43 0
|
6月前
|
分布式计算 API 流计算
22MyCat - Spark/Storm 对join扩展(简略)
22MyCat - Spark/Storm 对join扩展(简略)
30 0
|
9月前
|
分布式计算 数据安全/隐私保护 Spark
spark 数据倾斜遇到过吗,如何解决数据倾斜?【重要】
spark 数据倾斜遇到过吗,如何解决数据倾斜?【重要】
81 0
|
SQL 存储 分布式计算
spark outer join push down filter rule(spark 外连接中的下推规则)
spark outer join push down filter rule(spark 外连接中的下推规则)
216 0
spark outer join push down filter rule(spark 外连接中的下推规则)
|
SQL 消息中间件 分布式计算
Spark面试题(五)——数据倾斜调优
数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
235 0
Spark面试题(五)——数据倾斜调优
|
SQL 人工智能 分布式计算
Spark 数据倾斜及其解决方案
本文从数据倾斜的危害、现象、原因等方面,由浅入深阐述Spark数据倾斜及其解决方案。
Spark 数据倾斜及其解决方案
|
SQL 分布式计算 并行计算
Spark 数据倾斜及其解决方案
本文从数据倾斜的危害、现象、原因等方面,由浅入深阐述Spark数据倾斜及其解决方案。
1493 0
|
SQL 分布式计算 HIVE
|
分布式计算 Spark Python