前言
最近真是和 Spark
任务杠上了,业务团队说是线上有个Spark
调度任务出现了广播超时问题,根据经验来看应该比较好解决。
定位问题
接着就是定位问题了,先给大家看下抛出异常的任务日志信息:
ERROR exchange.BroadcastExchangeExec: Could not execute broadcast in 600 secs. java.util.concurrent.TimeoutException: Futures timed out after [600 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146) at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:388) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:154) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:162) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:150) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:259) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:190) at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:38) at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:71) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:190) at org.apache.spark.sql.execution.FileSourceScanExec.consume(DataSourceScanExec.scala:160) at org.apache.spark.sql.execution.ColumnarBatchScan$class.produceBatches(ColumnarBatchScan.scala:144) at org.apache.spark.sql.execution.ColumnarBatchScan$class.doProduce(ColumnarBatchScan.scala:83) at org.apache.spark.sql.execution.FileSourceScanExec.doProduce(DataSourceScanExec.scala:160) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:91) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165)
根据之前的经验,时间超时一般有这几种情况先排查一下:
- 网络传输
- 广播超时时间阈值太小
- 广播变量的数据量是否太大
通过询问集群运维人员,第一个可以排除了。
第二个从日志中可以看到,广播超时时间阈值设置的是600(10分钟)
第三个,从上面的两个图中我们看到,系统设置的 autoBroadcastJoinThreshold
大小为30M,如果小表的大小小于该值,则会将小表广播到所有executor中,需要注意的是ORC格式的表会对数据进行压缩,通常压缩比为2到3左右,但有些表的压缩比就会很高,有时可以达到10。那么设置过大的话,就会导致广播的时间变长,超过广播超时时间阈值;另外还会导致executor内存压力过大,容易出现OOM。
Broadcast Join
当大表
JOIN
小表时,如果小表足够小,可以将大表分片,分别用小表和每个大表的分片进行JOIN,最后汇总,能够大大提升作业性能。
解决方法
从定位问题中可以得知,我们可以调整相关的参数来解决这个问题!
方法1:调高广播的超时时间
设置 spark.sql.broadcastTimeout
,单位是秒,假如设置是600,那么就是10分钟。
假如我们要调高广播的超时时间为15分钟,可以进行如下设置:
set spark.sql.broadcastTimeout = 900;
方法2:禁用或者调低自动广播的阈值
# 禁止使用自动广播 set spark.sql.autoBroadcastJoinThreshold=-1; # 调低自动广播的阈值,官方默认值10M,平台默认值31457280(30M) set spark.sql.autoBroadcastJoinThreshold=10485760;
总结
在进行Spark 任务开发中需要合理配置 spark.sql.broadcastTimeout
和 spark.sql.autoBroadcastJoinThreshold
参数,并配合 spark.executor.memory
,使作业能够顺利执行。