spark full outer join目前存在一个问题,那就是在数据倾斜的时候,会导致Execuotr OOM:具体的问题描述,可以见SPARK-24985,转述一下就是:
SortMergeJoinExec类以下代码块的处理:
doExecute || \/ case FullOuter => val leftNullRow = new GenericInternalRow(left.output.length) val rightNullRow = new GenericInternalRow(right.output.length) val smjScanner = new SortMergeFullOuterJoinScanner( leftKeyGenerator = createLeftKeyGenerator(), rightKeyGenerator = createRightKeyGenerator(), keyOrdering, leftIter = RowIterator.fromScala(leftIter), rightIter = RowIterator.fromScala(rightIter), boundCondition, leftNullRow, rightNullRow) new FullOuterIterator( smjScanner, resultProj, numOutputRows).toScala
其中SortMergeFullOuterJoinScanner在迭代的时候对左右两边匹配的数据保存在内存中,这个内存是没有边界的,具体的处理方法如下:
private def findMatchingRows(matchingKey: InternalRow): Unit = { leftMatches.clear() rightMatches.clear() leftIndex = 0 rightIndex = 0 while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) { leftMatches += leftRow.copy() advancedLeft() } while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) { rightMatches += rightRow.copy() advancedRight() } if (leftMatches.size <= leftMatched.capacity) { leftMatched.clearUntil(leftMatches.size) } else { leftMatched = new BitSet(leftMatches.size) } if (rightMatches.size <= rightMatched.capacity) { rightMatched.clearUntil(rightMatches.size) } else { rightMatched = new BitSet(rightMatches.size) } }