SPARK最新特性Runtime Filtering(运行时过滤)以及与动态分区裁剪的区别

简介: SPARK最新特性Runtime Filtering(运行时过滤)以及与动态分区裁剪的区别

背景

本文基于 SPARK 3.3.0

在最新发布的SPARK RELEASE,第一个显著的特性就是row-level Runtime Filtering,我们来分析一下


分析

直接转到对应的Jira SPARK-32268,里面涉及到的TPC benchmark,在数据行数比较大的情况下,BloomFilter带来的性能提升还是很明显的,最重要的设计文档在Row-level Runtime Filters in Spark,

里面讲了两个关键点:

  • Runtime Filter涉及两种模式,一种是In Filter(In Filter会转换为Semi Join),一种是Bloom Filter
  • Bloom Filter的参数控制 n_items(多少行数据),n_bits(多个位来标识)


2974a8f9de5c493dbad939043ad566d9.png


代码实现

该功能涉及到两个主要的Rule InjectRuntimeFilter和 RewritePredicateSubquery

      Batch("InjectRuntimeFilter", FixedPoint(1),
      InjectRuntimeFilter,
      RewritePredicateSubquery) :+

第一个规则InjectRuntimeFilter,会根据spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled和spark.sql.optimizer.runtime.bloomFilter.enabled 配置项来开启是否进行RunTime Filter的转换(默认情况下是关闭的)。

private def tryInjectRuntimeFilter(plan: LogicalPlan): LogicalPlan = {
    var filterCounter = 0
    val numFilterThreshold = conf.getConf(SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD)
    plan transformUp {
      case join @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, hint) =>
        var newLeft = left
        var newRight = right
        (leftKeys, rightKeys).zipped.foreach((l, r) => {
          // Check if:
          // 1. There is already a DPP filter on the key
          // 2. There is already a runtime filter (Bloom filter or IN subquery) on the key
          // 3. The keys are simple cheap expressions
          if (filterCounter < numFilterThreshold &&
            !hasDynamicPruningSubquery(left, right, l, r) &&
            !hasRuntimeFilter(newLeft, newRight, l, r) &&
            isSimpleExpression(l) && isSimpleExpression(r)) {
            val oldLeft = newLeft
            val oldRight = newRight
            if (canPruneLeft(joinType) && filteringHasBenefit(left, right, l, hint)) {
              newLeft = injectFilter(l, newLeft, r, right)
            }
            // Did we actually inject on the left? If not, try on the right
            if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
              filteringHasBenefit(right, left, r, hint)) {
              newRight = injectFilter(r, newRight, l, left)
            }
            if (!newLeft.fastEquals(oldLeft) || !newRight.fastEquals(oldRight)) {
              filterCounter = filterCounter + 1
            }
          }
        })
        join.withNewChildren(Seq(newLeft, newRight))
    }
  }


适用RunTime Filter得有一下几个前提:


没超过最大插入Runtime Filter的阈值,spark.sql.optimizer.runtimeFilter.number.threshold(

默认是10)

逻辑计划没有插入动态分区,没有插入Runtime Filter以及该key仅仅是简单的join条件(先做简单的)

应用于Filter的join的一方能够通过join和aggerate 下推

Filter创建的join一方有一个可选择行的操作(用来过滤应用于Filter一方的数据)

应用于Filter的join的一方的scan的文件大小必须大于某个阈值spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold默认是10GB(这样才能达到好的过滤效果)

要么当前的是一个shuffle Join或者是一个存在shuffle的broadcast join只有在这些情况下才能够插入Filter进行过滤)

注意:最后一个条件,单从代码上是看不出来为什么是broadcast join的,难道hash join不行么?.


下面的分析会解释:

  private def injectFilter(
      filterApplicationSideExp: Expression,
      filterApplicationSidePlan: LogicalPlan,
      filterCreationSideExp: Expression,
      filterCreationSidePlan: LogicalPlan): LogicalPlan = {
    require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled)
    if (conf.runtimeFilterBloomFilterEnabled) {
      injectBloomFilter(
        filterApplicationSideExp,
        filterApplicationSidePlan,
        filterCreationSideExp,
        filterCreationSidePlan
      )
    } else {
      injectInSubqueryFilter(
        filterApplicationSideExp,
        filterApplicationSidePlan,
        filterCreationSideExp,
        filterCreationSidePlan
      )
    }
  }

默认情况下是会插入InSubquery节点的,这里有个很重要的判断:

  private def injectInSubqueryFilter(
      filterApplicationSideExp: Expression,
      filterApplicationSidePlan: LogicalPlan,
      filterCreationSideExp: Expression,
      filterCreationSidePlan: LogicalPlan): LogicalPlan = {
    require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType)
    val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
    val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
    val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)
    if (!canBroadcastBySize(aggregate, conf)) {
      // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold,
      // i.e., the semi-join will be a shuffled join, which is not worthwhile.
      return filterApplicationSidePlan
    }
    val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
      ListQuery(aggregate, childOutputs = aggregate.output))
    Filter(filter, filterApplicationSidePlan)
  }

if (!canBroadcastBySize(aggregate, conf)),如果filter的create的一方不能够进行在运行时进行broadcast的转换,那么就跳过。原因是如果不能够进行broadcast join的转换的话,那么就会是shuffle join,这种情况下会得不偿失。对于InSubquery转换为SemiJoin是在RewritePredicateSubquery 规则中的(后续SemiJoin会转BroadcastJoin)。

对于开启了BloomFilter的情况下,就会运行injectBloomFilter代码:

  private def injectBloomFilter(
      filterApplicationSideExp: Expression,
      filterApplicationSidePlan: LogicalPlan,
      filterCreationSideExp: Expression,
      filterCreationSidePlan: LogicalPlan): LogicalPlan = {
    // Skip if the filter creation side is too big
    if (filterCreationSidePlan.stats.sizeInBytes > conf.runtimeFilterCreationSideThreshold) {
      return filterApplicationSidePlan
    }
    val rowCount = filterCreationSidePlan.stats.rowCount
    val bloomFilterAgg =
      if (rowCount.isDefined && rowCount.get.longValue > 0L) {
        new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)),
          Literal(rowCount.get.longValue))
      } else {
        new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))
      }
    val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct = false, None)
    val alias = Alias(aggExp, "bloomFilter")()
    val aggregate =
      ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan)))
    val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)
    val filter = BloomFilterMightContain(bloomFilterSubquery,
      new XxHash64(Seq(filterApplicationSideExp)))
    Filter(filter, filterApplicationSidePlan)
  }


这里有个spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold的判断(默认是10M),如果大于该值,就直接跳过,因为如果Filter 创建的一方过大的话,Bloom Filter的误报率会上升,达到的效果就没有那么好了。

这里会构建BloomFilterAggregate和BloomFilterMightContain以及ScalarSubquery,ScalarSubquery最终会通过PlanSubqueries转化为execution.ScalarSubquery(SubqueryExec)形式,从而在driver端把数据收集过来,继而在BloomFilterMightContain进行计算


与动态分区裁剪的区别

  • 动态分区裁剪是被裁剪的join一边必须是分区的,而且join另一边在exchange之前存在条件过滤,而且默认是存在broadcastJoin的时候,才会进行分区裁剪;Runtime Filter 没有限制,但是Runtime Filter的适用条件更加严格
  • 动态分区剪裁能够减少source scan的IO,而Runtime Filter不行,因为动态分区裁剪是基于分区进行过滤的。
  • Runtime Filter是可以基于非分区的字段作为join key,而动态分区裁剪必须是基于分区字段的join key


994fb8044f8f7c7862a417c9b1a875bd.png

相关文章
|
3月前
|
分布式计算 监控 Spark
Spark 任务运行时日志分析
Spark 任务运行时日志分析
75 0
|
3月前
|
分布式计算 并行计算 大数据
Spark学习---day02、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
Spark学习---day02、Spark核心编程 RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(一)
151 1
|
6天前
|
分布式计算 并行计算 数据处理
|
6天前
|
缓存 分布式计算 数据处理
|
6天前
|
分布式计算 Serverless 数据处理
|
2月前
|
资源调度 分布式计算 监控
Spark Standalone与YARN的区别?
【6月更文挑战第17天】Spark Standalone与YARN的区别?
181 57
|
20天前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
3月前
|
分布式计算 Java Scala
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
Spark学习---day03、Spark核心编程(RDD概述、RDD编程(创建、分区规则、转换算子、Action算子))(二)
|
2月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之使用spark.sql执行rename分区操作,遇到任务报错退出的情况,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
2月前
|
分布式计算 Hadoop 大数据
Spark与Hadoop的区别?
【6月更文挑战第15天】Spark与Hadoop的区别?
32 8