背景
本文基于 SPARK 3.3.0
在最新发布的SPARK RELEASE,第一个显著的特性就是row-level Runtime Filtering,我们来分析一下
分析
直接转到对应的Jira SPARK-32268,里面涉及到的TPC benchmark,在数据行数比较大的情况下,BloomFilter带来的性能提升还是很明显的,最重要的设计文档在Row-level Runtime Filters in Spark,
里面讲了两个关键点:
- Runtime Filter涉及两种模式,一种是In Filter(In Filter会转换为Semi Join),一种是Bloom Filter
- Bloom Filter的参数控制 n_items(多少行数据),n_bits(多个位来标识)
代码实现
该功能涉及到两个主要的Rule InjectRuntimeFilter和 RewritePredicateSubquery
Batch("InjectRuntimeFilter", FixedPoint(1), InjectRuntimeFilter, RewritePredicateSubquery) :+
第一个规则InjectRuntimeFilter,会根据spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled和spark.sql.optimizer.runtime.bloomFilter.enabled 配置项来开启是否进行RunTime Filter的转换(默认情况下是关闭的)。
private def tryInjectRuntimeFilter(plan: LogicalPlan): LogicalPlan = { var filterCounter = 0 val numFilterThreshold = conf.getConf(SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD) plan transformUp { case join @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, hint) => var newLeft = left var newRight = right (leftKeys, rightKeys).zipped.foreach((l, r) => { // Check if: // 1. There is already a DPP filter on the key // 2. There is already a runtime filter (Bloom filter or IN subquery) on the key // 3. The keys are simple cheap expressions if (filterCounter < numFilterThreshold && !hasDynamicPruningSubquery(left, right, l, r) && !hasRuntimeFilter(newLeft, newRight, l, r) && isSimpleExpression(l) && isSimpleExpression(r)) { val oldLeft = newLeft val oldRight = newRight if (canPruneLeft(joinType) && filteringHasBenefit(left, right, l, hint)) { newLeft = injectFilter(l, newLeft, r, right) } // Did we actually inject on the left? If not, try on the right if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) && filteringHasBenefit(right, left, r, hint)) { newRight = injectFilter(r, newRight, l, left) } if (!newLeft.fastEquals(oldLeft) || !newRight.fastEquals(oldRight)) { filterCounter = filterCounter + 1 } } }) join.withNewChildren(Seq(newLeft, newRight)) } }
适用RunTime Filter得有一下几个前提:
没超过最大插入Runtime Filter的阈值,spark.sql.optimizer.runtimeFilter.number.threshold(
默认是10)
逻辑计划没有插入动态分区,没有插入Runtime Filter以及该key仅仅是简单的join条件(先做简单的)
应用于Filter的join的一方能够通过join和aggerate 下推
Filter创建的join一方有一个可选择行的操作(用来过滤应用于Filter一方的数据)
应用于Filter的join的一方的scan的文件大小必须大于某个阈值spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold默认是10GB(这样才能达到好的过滤效果)
要么当前的是一个shuffle Join或者是一个存在shuffle的broadcast join只有在这些情况下才能够插入Filter进行过滤)
注意:最后一个条件,单从代码上是看不出来为什么是broadcast join的,难道hash join不行么?.
下面的分析会解释:
private def injectFilter( filterApplicationSideExp: Expression, filterApplicationSidePlan: LogicalPlan, filterCreationSideExp: Expression, filterCreationSidePlan: LogicalPlan): LogicalPlan = { require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled) if (conf.runtimeFilterBloomFilterEnabled) { injectBloomFilter( filterApplicationSideExp, filterApplicationSidePlan, filterCreationSideExp, filterCreationSidePlan ) } else { injectInSubqueryFilter( filterApplicationSideExp, filterApplicationSidePlan, filterCreationSideExp, filterCreationSidePlan ) } }
默认情况下是会插入InSubquery节点的,这里有个很重要的判断:
private def injectInSubqueryFilter( filterApplicationSideExp: Expression, filterApplicationSidePlan: LogicalPlan, filterCreationSideExp: Expression, filterCreationSidePlan: LogicalPlan): LogicalPlan = { require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType) val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp) val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)() val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan) if (!canBroadcastBySize(aggregate, conf)) { // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold, // i.e., the semi-join will be a shuffled join, which is not worthwhile. return filterApplicationSidePlan } val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)), ListQuery(aggregate, childOutputs = aggregate.output)) Filter(filter, filterApplicationSidePlan) }
if (!canBroadcastBySize(aggregate, conf)),如果filter的create的一方不能够进行在运行时进行broadcast的转换,那么就跳过。原因是如果不能够进行broadcast join的转换的话,那么就会是shuffle join,这种情况下会得不偿失。对于InSubquery转换为SemiJoin是在RewritePredicateSubquery 规则中的(后续SemiJoin会转BroadcastJoin)。
对于开启了BloomFilter的情况下,就会运行injectBloomFilter代码:
private def injectBloomFilter( filterApplicationSideExp: Expression, filterApplicationSidePlan: LogicalPlan, filterCreationSideExp: Expression, filterCreationSidePlan: LogicalPlan): LogicalPlan = { // Skip if the filter creation side is too big if (filterCreationSidePlan.stats.sizeInBytes > conf.runtimeFilterCreationSideThreshold) { return filterApplicationSidePlan } val rowCount = filterCreationSidePlan.stats.rowCount val bloomFilterAgg = if (rowCount.isDefined && rowCount.get.longValue > 0L) { new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)), Literal(rowCount.get.longValue)) } else { new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp))) } val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct = false, None) val alias = Alias(aggExp, "bloomFilter")() val aggregate = ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan))) val bloomFilterSubquery = ScalarSubquery(aggregate, Nil) val filter = BloomFilterMightContain(bloomFilterSubquery, new XxHash64(Seq(filterApplicationSideExp))) Filter(filter, filterApplicationSidePlan) }
这里有个spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold的判断(默认是10M),如果大于该值,就直接跳过,因为如果Filter 创建的一方过大的话,Bloom Filter的误报率会上升,达到的效果就没有那么好了。
这里会构建BloomFilterAggregate和BloomFilterMightContain以及ScalarSubquery,ScalarSubquery最终会通过PlanSubqueries转化为execution.ScalarSubquery(SubqueryExec)形式,从而在driver端把数据收集过来,继而在BloomFilterMightContain进行计算
与动态分区裁剪的区别
- 动态分区裁剪是被裁剪的join一边必须是分区的,而且join另一边在exchange之前存在条件过滤,而且默认是存在broadcastJoin的时候,才会进行分区裁剪;Runtime Filter 没有限制,但是Runtime Filter的适用条件更加严格
- 动态分区剪裁能够减少source scan的IO,而Runtime Filter不行,因为动态分区裁剪是基于分区进行过滤的。
- Runtime Filter是可以基于非分区的字段作为join key,而动态分区裁剪必须是基于分区字段的join key