spark full outer join 数据倾斜导致OOM

简介: spark full outer join 数据倾斜导致OOM

spark full outer join目前存在一个问题,那就是在数据倾斜的时候,会导致Execuotr OOM:具体的问题描述,可以见SPARK-24985,转述一下就是:


SortMergeJoinExec类以下代码块的处理:

doExecute 
   ||
   \/
case FullOuter =>
     val leftNullRow = new GenericInternalRow(left.output.length)
     val rightNullRow = new GenericInternalRow(right.output.length)
     val smjScanner = new SortMergeFullOuterJoinScanner(
     leftKeyGenerator = createLeftKeyGenerator(),
     rightKeyGenerator = createRightKeyGenerator(),
     keyOrdering,
     leftIter = RowIterator.fromScala(leftIter),
     rightIter = RowIterator.fromScala(rightIter),
     boundCondition,
     leftNullRow,
     rightNullRow)
     new FullOuterIterator(
      smjScanner,
      resultProj,
      numOutputRows).toScala

其中SortMergeFullOuterJoinScanner在迭代的时候对左右两边匹配的数据保存在内存中,这个内存是没有边界的,具体的处理方法如下:

private def findMatchingRows(matchingKey: InternalRow): Unit = {
    leftMatches.clear()
    rightMatches.clear()
    leftIndex = 0
    rightIndex = 0
    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
      leftMatches += leftRow.copy()
      advancedLeft()
    }
    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
      rightMatches += rightRow.copy()
      advancedRight()
    }
    if (leftMatches.size <= leftMatched.capacity) {
      leftMatched.clearUntil(leftMatches.size)
    } else {
      leftMatched = new BitSet(leftMatches.size)
    }
    if (rightMatches.size <= rightMatched.capacity) {
      rightMatched.clearUntil(rightMatches.size)
    } else {
      rightMatched = new BitSet(rightMatches.size)
    }
  }


相关文章
|
7月前
|
SQL 分布式计算 HIVE
Spark数据倾斜问题分析和解决
Spark数据倾斜问题分析和解决
94 0
|
分布式计算 API 流计算
22MyCat - Spark/Storm 对join扩展(简略)
22MyCat - Spark/Storm 对join扩展(简略)
53 0
|
分布式计算 数据安全/隐私保护 Spark
spark 数据倾斜遇到过吗,如何解决数据倾斜?【重要】
spark 数据倾斜遇到过吗,如何解决数据倾斜?【重要】
118 0
|
SQL 存储 分布式计算
spark outer join push down filter rule(spark 外连接中的下推规则)
spark outer join push down filter rule(spark 外连接中的下推规则)
281 0
spark outer join push down filter rule(spark 外连接中的下推规则)
|
SQL 消息中间件 分布式计算
Spark面试题(五)——数据倾斜调优
数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
288 0
Spark面试题(五)——数据倾斜调优
|
SQL 人工智能 分布式计算
Spark 数据倾斜及其解决方案
本文从数据倾斜的危害、现象、原因等方面,由浅入深阐述Spark数据倾斜及其解决方案。
Spark 数据倾斜及其解决方案
|
分布式计算 Spark
Spark面对OOM问题的解决方法及优化总结
https://blog.csdn.net/yhb315279058/article/details/51035631
1848 0
|
SQL 分布式计算 HIVE
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
131 2
ClickHouse与大数据生态集成:Spark & Flink 实战