背景
本文基于Spark 3.3.0
我们在Spark代码中有时候会看到 exchangeReuseEnabled 和subqueryReuseEnabled 配置,这个配置的作用是什么,结合spark源码我们分析一下
分析
exchangeReuseEnabled
在PlanDynamicPruningFilters中我们可以看到:
val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty && plan.exists { case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) => left.sameResult(sparkPlan) case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) => right.sameResult(sparkPlan) case _ => false } if (canReuseExchange) { val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan) val mode = broadcastMode(buildKeys, executedPlan.output) // plan a broadcast exchange of the build side of the join val exchange = BroadcastExchangeExec(mode, executedPlan) val name = s"dynamicpruning#${exprId.id}" // place the broadcast adaptor for reusing the broadcast results on the probe side val broadcastValues = SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange) DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
只有开启了重用Exchange的情况下且存在物理计划是BroadcastHashJoinExec的情况下(只有这样才能获取最大的收益),才会进行动态分区裁剪。其实在这里并没有做过多的操作,只不过是生成了一个BroadcastExchangeExec的操作,看到这里完全没看出来重用Exchange的作用在哪里。
原因是在 Rule ReuseExchangeAndSubquery中,这里会进行exchange的替换,如果存在一样的Exchange,就会进行替换,所以以上分区裁剪中的DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))涉及的BroadcastExchangeExec才会被复用。
但是为什么BroadcastExchangeExec复用了就会减少spark的计算呢?
还是拿DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) 举例:
InSubqueryExec 中 broadcastValues的类型是SubqueryBroadcastExec,而SubqueryBroadcastExec中的计算逻辑:
@transient private lazy val relationFuture: Future[Array[InternalRow]] = { // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) Future { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(session, executionId) { val beforeCollect = System.nanoTime() val broadcastRelation = child.executeBroadcast[HashedRelation]().value
可以看到这个relationFuture变量是lazy val修饰的(这样多次调动这个变量只会初始化一次,所以会较少driver端的计算量), 而relationFuture这个变量的初始化在:
protected override def doPrepare(): Unit = { relationFuture }
而doPrepare方法的调用是在方法executeQuery driver端形成RDD的时候,如下:
protected final def executeQuery[T](query: => T): T = { RDDOperationScope.withScope(sparkContext, nodeName, false, true) { prepare() waitForSubqueries() query } }
而在val broadcastRelation = child.executeBroadcastHashedRelation.value,这里的child是BroadcastExchangeExec类型的,这样进而触发广播操作。
subqueryReuseEnabled
拿InjectRuntimeFilter举例,在InjectRuntimeFilter的规则中,会最终形成以下逻辑计划:
val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)), ListQuery(aggregate, childOutputs = aggregate.output)) Filter(filter, filterApplicationSidePlan)
而InSubquery最终进过PlanSubqueries规则会形成物理计划(在InjectRuntimeFilte中并不会形成InSubqueryExec,我们这里只是举例):
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, query) InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)
对于InSubqueryExc:
def updateResult(): Unit = { val rows = plan.executeCollect() result = if (plan.output.length > 1) { rows.asInstanceOf[Array[Any]] } else { rows.map(_.get(0, child.dataType)) } if (shouldBroadcast) { resultBroadcast = plan.session.sparkContext.broadcast(result) } }
该updataResult方法也是在executeQuery方法中(也是在drive端调用)被调用,而*plan.executeCollect()*中,plan是BaseSubqueryExec类型的,该类型的实现类中relationFuture变量如下(以SubqueryBroadcastExec为例):
private lazy val relationFuture
也是lazy val,也是只会初始化一次,所以在ReuseExchangeAndSubquery中对BaseSubqueryExec进行复用,就可以减少在driver端的计算