Spark常见错误剖析与应对策略

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spark常见错误剖析与应对策略

问题一:

日志中出现:org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

原因分析:

shuffle分为shuffle write和shuffle read两部分。

shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。

shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。

shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。

解决方案:

1、减少shuffle数据

主要从代码层面着手,可以将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

2、修改分区

通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度适当提高这个值,例如500。

3、增加失败的重试次数和重试的时间间隔

通过spark.shuffle.io.maxRetries控制重试次数,默认是3,可适当增加,例如10。

通过spark.shuffle.io.retryWait控制重试的时间间隔,默认是5s,可适当增加,例如10s。

4、提高executor的内存

在spark-submit提交任务时,适当提高executor的memory值,例如15G或者20G。

问题二: 日志中出现:Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

原因分析:

从上述日志中可以看出在ShuffleMapStage阶段,也就是ShuffleRead阶段,在Driver在向各个Executor广播输入数据时候,出现了超时现象。

解决方案:

1、适当增加超时时间:spark.sql.broadcastTimeout=800

2、适当增加重试次数:spark.sql.broadcastMaxRetries=3

3、关闭广播变量join:set spark.sql.autoBroadcastJoinThreshold = -1

问题三: 日志中出现:org.apache.spark.sql.catalyst.parser.ParseException

原因分析:

spark在做sql转化时报错。

解决方案:

检查sql是否书写正确

问题四: 日志中出现:SparkException: Could not find CoarseGrainedScheduler

原因分析:

这是一个资源问题应该给任务分配更多的cores和executors,并且分配更多的内存。并且需要给RDD分配更多的分区

解决方案:

1、调大一下资源和cores和executers的数量

2、在配置资源中加入这句话也许能解决你的问题:

–conf spark.dynamicAllocation.enabled=false

问题五: 日志中出现:Exception in thread “main”java.lang.NoSuchMethodError: scala.collection.immutable.c o l o n coloncoloncolon.tl$1()Lscala/collection/immutable/List;

原因分析:

scala版本不一致问题

解决方案:

1、通过给spark任务指定相同版本的镜像

–conf spark.kubernetes.container.image=镜像地址

问题六: 日志中出现:org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 9478 tasks (1024.1 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)

原因分析:

序列化结果集的大小超过了spark任务默认的最大结果集大小(默认spark.driver.maxResultSize为1g)

解决方案:

1、增加spark.driver.maxResultSize的大小

–conf spark.driver.maxResultSize=2g


问题七: 日志中出现:The executor with id 12 exited with exit code 137

原因分析:

executor内存溢出(oom)

解决方案:

1、增加executor内存

示例参数:–conf spark.executor.memory=10g

注:少部分情况为堆外内存(overhead memory)不足,需要增加堆外内存

示例参数:–conf spark.executor.memoryOverhead=5g


问题八: WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost) WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed java.util.concurrent.TimeoutException: Futures timed out after [120 second ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong

原因分析:

TaskSetManager: Lost task & TimeoutException

因为网络或者gc的原因,worker或executor没有接收到executor或task的心跳反馈

解决方案:

1、提高 spark.network.timeout 的值,根据情况改成300(5min)或更高

2、配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属性


问题九: 日志中出现:java.lang.OutOfMemoryError: Not enough memory to build and broadcast

原因分析:

Driver 端OOM。

Driver 端的 OOM 逃不出 2 类病灶:

创建的数据集超过内存上限

收集的结果集超过内存上限

广播变量在创建的过程中,需要先把分布在所有 Executors 的数据分片拉取到 Driver 端,然后在 Driver 端构建广播变量,最后 Driver 端把封装好的广播变量再分发给各个 Executors。第一步的数据拉取其实就是用 collect 实现的。如果 Executors 中数据分片的总大小超过 Driver 端内存上限也会报 OOM。

解决方案:

增加driver端的内存大小


问题十: java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance

原因分析:

executor端OOM

User Memory 用于存储用户自定义的数据结构,如数组、列表、字典等。因此,如果这些数据结构的总大小超出了 User Memory 内存区域的上限,就会出现这样的报错。


问题十一: spark sql 执行insert overwrite的时候,出现数据重复。

原因分析:

Spark SQL在执行SQL的overwrite的时候并没有删除旧的的数据文件(Spark SQL生成的数据文件),Spark SQL写入Hive的流程如下:


1.Spark写入Hive会先生成一个临时的_temporary目录用于存储生成的数据文件,全部生成完毕后全部移动到输出目录,然后删除_temporary目录,最后创建Hive元数据(写分区);

2.Spark写入数据任务使用了同一个_temporary目录,导致其中一个完成数据生成和移动到Hive路径之后删除_temporary目录失败(任务被kill掉了),进一步导致数据已经到了但是元数据没有创建。

3.上一个任务虽然生成了数据文件但是没有元数据,则后一个任务的overwrite找不到元数据因此无法删除Hive路径下的数据文件(第二个任务会任务目录下没有数据生成)

4.当最后一个执行完成的Spark插入任务结束后,此时Hive路径下已经移动过来多个任务的数据文件,由于已经没有正在执行的Spark写任务,因此删除_temporary目录成功,创建元数据成功,结果就是这个元数据对应了该Hive路径下所有版本的数据文件。


问题十二: Spark任务正常执行10分钟左右,但是偶尔会出现任务运行时间过长比如5个小时左右

原因分析:


通过spark ui看到spark任务的task运行都是在10分钟左右,有一个task运行时间达到了5.4h一直没有运行完成。

解决方案:

设置这个参数spark.speculation=true;

原理:在Spark中任务会以DAG图的方式并行执行,每个节点都会并行的运行在不同的executor中,但是有的任务可能执行很快,有的任务执行很慢,比如网络抖动、性能不同、数据倾斜等等。有的Task很慢就会成为整个任务的瓶颈,此时可以触发 推测执行 (speculative) 功能,为长时间的task重新启动一个task,哪个先完成就使用哪个的结果,并Kill掉另一个task。


问题十三: org.apache.spark.shuffle.FetchFailedException: The relative remote executor(Id: 21), which maintains the block data to fetch is dead.

原因分析:

资源不足导致executor没有心跳,driver就判定其丢失,就去连其他的executor,但其他的因为配置都一样,所以也连不上。重试n次后,就会报错


解决方案:

减少使用触发shuffle的操作,例如reduceByKey,从而减少使用内存

增大spark.network.timeout,从而允许有更多时间去等待心跳响应

增加spark.executor.cores,从而减少创建的Executor数量,使得总使用内存减少

同时增大spark.executor.memory,保证每个Executor有足够的可用内存

增大spark.shuffle.memoryFraction,默认为0.2(需要spark.memory.useLegacyMode配置为true,适用于1.5或更旧版本,已经deprecated)

例:

-conf spark.driver.memory=10g —conf spark.executor.cores=2 --conf spark.executor.memory=24g --conf spark.executor.memoryOverhead=4g --conf spark.default.parallelism=1500 --conf spark.sql.shuffle.partitions=1500 —conf spark.network.timeout=300


问题十四: java.io.IOException: java.io.EOFException: Unexpected end of input stream

原因分析:

spark任务输入数据异常,spark任务读取gz格式压缩的csv文件时,由于存在异常数据发生报错。gz格式压缩的文件存在空数据


解决方案:

1.定位到异常数据清除即可

2.过滤异常数据直接写入


问题十五: Exception in thread “main” java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object;

原因分析:

scala版本不一致


解决方案:

更换 服务scala版本一致的镜像


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
分布式计算 调度 Spark
|
消息中间件 缓存 分布式计算
Spark调优策略
在利用Spark处理数据时,如果数据量不大,那么Spark的默认配置基本就能满足实际的业务场景。但是当数据量大的时候,就需要做一定的参数配置调整和优化,以保证业务的安全、稳定的运行。并且在实际优化中,要考虑不同的场景,采取不同的优化策略。
|
9天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
37 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
58 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
38 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
75 0
|
10天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
35 6
|
8天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
41 2
|
9天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
39 1