Spark——成功解决java.util.concurrent.TimeoutException: Futures timed out after [600 seconds]

简介: Spark——成功解决java.util.concurrent.TimeoutException: Futures timed out after [600 seconds]

前言

最近真是和 Spark 任务杠上了,业务团队说是线上有个Spark调度任务出现了广播超时问题,根据经验来看应该比较好解决。

定位问题

接着就是定位问题了,先给大家看下抛出异常的任务日志信息:

ERROR exchange.BroadcastExchangeExec: Could not execute broadcast in 600 secs.
java.util.concurrent.TimeoutException: Futures timed out after [600 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:388)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:154)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:150)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:162)
at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:150)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:259)
at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:190)
at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:38)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:71)
at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:190)
at org.apache.spark.sql.execution.FileSourceScanExec.consume(DataSourceScanExec.scala:160)
at org.apache.spark.sql.execution.ColumnarBatchScan$class.produceBatches(ColumnarBatchScan.scala:144)
at org.apache.spark.sql.execution.ColumnarBatchScan$class.doProduce(ColumnarBatchScan.scala:83)
at org.apache.spark.sql.execution.FileSourceScanExec.doProduce(DataSourceScanExec.scala:160)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:91)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165)

根据之前的经验,时间超时一般有这几种情况先排查一下:

  1. 网络传输
  2. 广播超时时间阈值太小
  3. 广播变量的数据量是否太大

通过询问集群运维人员,第一个可以排除了。

第二个从日志中可以看到,广播超时时间阈值设置的是600(10分钟)

第三个,从上面的两个图中我们看到,系统设置的 autoBroadcastJoinThreshold 大小为30M,如果小表的大小小于该值,则会将小表广播到所有executor中,需要注意的是ORC格式的表会对数据进行压缩,通常压缩比为2到3左右,但有些表的压缩比就会很高,有时可以达到10。那么设置过大的话,就会导致广播的时间变长,超过广播超时时间阈值;另外还会导致executor内存压力过大,容易出现OOM。

Broadcast Join

当大表 JOIN 小表时,如果小表足够小,可以将大表分片,分别用小表和每个大表的分片进行

JOIN,最后汇总,能够大大提升作业性能。

解决方法

从定位问题中可以得知,我们可以调整相关的参数来解决这个问题!

方法1:调高广播的超时时间

设置 spark.sql.broadcastTimeout ,单位是秒,假如设置是600,那么就是10分钟。

假如我们要调高广播的超时时间为15分钟,可以进行如下设置:

set spark.sql.broadcastTimeout = 900;

方法2:禁用或者调低自动广播的阈值

# 禁止使用自动广播
set spark.sql.autoBroadcastJoinThreshold=-1;
# 调低自动广播的阈值,官方默认值10M,平台默认值31457280(30M)
set spark.sql.autoBroadcastJoinThreshold=10485760;

总结

在进行Spark 任务开发中需要合理配置 spark.sql.broadcastTimeoutspark.sql.autoBroadcastJoinThreshold 参数,并配合 spark.executor.memory,使作业能够顺利执行。

相关文章
|
3月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
52 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
4月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
56 5
|
5月前
|
缓存 NoSQL Java
【Azure Redis 缓存 Azure Cache For Redis】Redis出现 java.net.SocketTimeoutException: Read timed out 异常
【Azure Redis 缓存 Azure Cache For Redis】Redis出现 java.net.SocketTimeoutException: Read timed out 异常
|
7月前
|
分布式计算 资源调度 Hadoop
Java大数据处理:Spark与Hadoop整合
Java大数据处理:Spark与Hadoop整合
|
7月前
|
运维 监控 网络安全
com.jcraft.jsch.JSchException: Session.connect: java.net.SocketTimeoutException: Read timed out 问题
【6月更文挑战第5天】com.jcraft.jsch.JSchException: Session.connect: java.net.SocketTimeoutException: Read timed out 问题
962 1
|
8月前
|
SQL 分布式计算 Java
HiveOnSpark 报错:java.lang.IllegalStateException(Connection to remote Spark driver was lost)‘ Last kno
Hive On Spark 测试时遇到`java.lang.IllegalStateException`和`FileNotFoundException`,问题根源是 Spark 缺少 `hive-exec-3.1.3.jar`。解决方法:从 `$HIVE_HOME/lib/`复制该 jar 到 `$SPARK_HOME/jars/`,并使用 `hdfs dfs -put`命令将其上传至 HDFS 的 `/spark-jars/`(根据实际情况调整路径)。重启 Hive 元数据服务后问题解决。
333 0
HiveOnSpark 报错:java.lang.IllegalStateException(Connection to remote Spark driver was lost)‘ Last kno
|
8月前
|
Java
Java并发Futures和Callables类
Java程序`TestThread`演示了如何在多线程环境中使用`Futures`和`Callables`。它创建了一个单线程`ExecutorService`,然后提交两个`FactorialService`任务,分别计算10和20的阶乘。每个任务返回一个`Future`对象,通过`get`方法获取结果,该方法会阻塞直到计算完成。计算过程中模拟延迟以展示异步执行。最终,打印出10!和20!的结果。
50 10
|
7月前
|
SQL JSON 分布式计算
|
7月前
|
SQL 分布式计算 Java
|
8月前
|
分布式计算 Java 测试技术
Spark 单元测试报Error:(26, 16) java: 程序包sun.misc不存在
Spark 单元测试报Error:(26, 16) java: 程序包sun.misc不存在
157 0