Spark常见错误剖析与应对策略

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spark常见错误剖析与应对策略

问题一:

日志中出现:org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

原因分析:

shuffle分为shuffle write和shuffle read两部分。

shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。

shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。

shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。

解决方案:

1、减少shuffle数据

主要从代码层面着手,可以将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

2、修改分区

通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度适当提高这个值,例如500。

3、增加失败的重试次数和重试的时间间隔

通过spark.shuffle.io.maxRetries控制重试次数,默认是3,可适当增加,例如10。

通过spark.shuffle.io.retryWait控制重试的时间间隔,默认是5s,可适当增加,例如10s。

4、提高executor的内存

在spark-submit提交任务时,适当提高executor的memory值,例如15G或者20G。

问题二: 日志中出现:Caused by: org.apache.spark.SparkException: Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

原因分析:

从上述日志中可以看出在ShuffleMapStage阶段,也就是ShuffleRead阶段,在Driver在向各个Executor广播输入数据时候,出现了超时现象。

解决方案:

1、适当增加超时时间:spark.sql.broadcastTimeout=800

2、适当增加重试次数:spark.sql.broadcastMaxRetries=3

3、关闭广播变量join:set spark.sql.autoBroadcastJoinThreshold = -1

问题三: 日志中出现:org.apache.spark.sql.catalyst.parser.ParseException

原因分析:

spark在做sql转化时报错。

解决方案:

检查sql是否书写正确

问题四: 日志中出现:SparkException: Could not find CoarseGrainedScheduler

原因分析:

这是一个资源问题应该给任务分配更多的cores和executors,并且分配更多的内存。并且需要给RDD分配更多的分区

解决方案:

1、调大一下资源和cores和executers的数量

2、在配置资源中加入这句话也许能解决你的问题:

–conf spark.dynamicAllocation.enabled=false

问题五: 日志中出现:Exception in thread “main”java.lang.NoSuchMethodError: scala.collection.immutable.c o l o n coloncoloncolon.tl$1()Lscala/collection/immutable/List;

原因分析:

scala版本不一致问题

解决方案:

1、通过给spark任务指定相同版本的镜像

–conf spark.kubernetes.container.image=镜像地址

问题六: 日志中出现:org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 9478 tasks (1024.1 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)

原因分析:

序列化结果集的大小超过了spark任务默认的最大结果集大小(默认spark.driver.maxResultSize为1g)

解决方案:

1、增加spark.driver.maxResultSize的大小

–conf spark.driver.maxResultSize=2g


问题七: 日志中出现:The executor with id 12 exited with exit code 137

原因分析:

executor内存溢出(oom)

解决方案:

1、增加executor内存

示例参数:–conf spark.executor.memory=10g

注:少部分情况为堆外内存(overhead memory)不足,需要增加堆外内存

示例参数:–conf spark.executor.memoryOverhead=5g


问题八: WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost) WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed java.util.concurrent.TimeoutException: Futures timed out after [120 second ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong

原因分析:

TaskSetManager: Lost task & TimeoutException

因为网络或者gc的原因,worker或executor没有接收到executor或task的心跳反馈

解决方案:

1、提高 spark.network.timeout 的值,根据情况改成300(5min)或更高

2、配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属性


问题九: 日志中出现:java.lang.OutOfMemoryError: Not enough memory to build and broadcast

原因分析:

Driver 端OOM。

Driver 端的 OOM 逃不出 2 类病灶:

创建的数据集超过内存上限

收集的结果集超过内存上限

广播变量在创建的过程中,需要先把分布在所有 Executors 的数据分片拉取到 Driver 端,然后在 Driver 端构建广播变量,最后 Driver 端把封装好的广播变量再分发给各个 Executors。第一步的数据拉取其实就是用 collect 实现的。如果 Executors 中数据分片的总大小超过 Driver 端内存上限也会报 OOM。

解决方案:

增加driver端的内存大小


问题十: java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance

原因分析:

executor端OOM

User Memory 用于存储用户自定义的数据结构,如数组、列表、字典等。因此,如果这些数据结构的总大小超出了 User Memory 内存区域的上限,就会出现这样的报错。


问题十一: spark sql 执行insert overwrite的时候,出现数据重复。

原因分析:

Spark SQL在执行SQL的overwrite的时候并没有删除旧的的数据文件(Spark SQL生成的数据文件),Spark SQL写入Hive的流程如下:


1.Spark写入Hive会先生成一个临时的_temporary目录用于存储生成的数据文件,全部生成完毕后全部移动到输出目录,然后删除_temporary目录,最后创建Hive元数据(写分区);

2.Spark写入数据任务使用了同一个_temporary目录,导致其中一个完成数据生成和移动到Hive路径之后删除_temporary目录失败(任务被kill掉了),进一步导致数据已经到了但是元数据没有创建。

3.上一个任务虽然生成了数据文件但是没有元数据,则后一个任务的overwrite找不到元数据因此无法删除Hive路径下的数据文件(第二个任务会任务目录下没有数据生成)

4.当最后一个执行完成的Spark插入任务结束后,此时Hive路径下已经移动过来多个任务的数据文件,由于已经没有正在执行的Spark写任务,因此删除_temporary目录成功,创建元数据成功,结果就是这个元数据对应了该Hive路径下所有版本的数据文件。


问题十二: Spark任务正常执行10分钟左右,但是偶尔会出现任务运行时间过长比如5个小时左右

原因分析:


通过spark ui看到spark任务的task运行都是在10分钟左右,有一个task运行时间达到了5.4h一直没有运行完成。

解决方案:

设置这个参数spark.speculation=true;

原理:在Spark中任务会以DAG图的方式并行执行,每个节点都会并行的运行在不同的executor中,但是有的任务可能执行很快,有的任务执行很慢,比如网络抖动、性能不同、数据倾斜等等。有的Task很慢就会成为整个任务的瓶颈,此时可以触发 推测执行 (speculative) 功能,为长时间的task重新启动一个task,哪个先完成就使用哪个的结果,并Kill掉另一个task。


问题十三: org.apache.spark.shuffle.FetchFailedException: The relative remote executor(Id: 21), which maintains the block data to fetch is dead.

原因分析:

资源不足导致executor没有心跳,driver就判定其丢失,就去连其他的executor,但其他的因为配置都一样,所以也连不上。重试n次后,就会报错


解决方案:

减少使用触发shuffle的操作,例如reduceByKey,从而减少使用内存

增大spark.network.timeout,从而允许有更多时间去等待心跳响应

增加spark.executor.cores,从而减少创建的Executor数量,使得总使用内存减少

同时增大spark.executor.memory,保证每个Executor有足够的可用内存

增大spark.shuffle.memoryFraction,默认为0.2(需要spark.memory.useLegacyMode配置为true,适用于1.5或更旧版本,已经deprecated)

例:

-conf spark.driver.memory=10g —conf spark.executor.cores=2 --conf spark.executor.memory=24g --conf spark.executor.memoryOverhead=4g --conf spark.default.parallelism=1500 --conf spark.sql.shuffle.partitions=1500 —conf spark.network.timeout=300


问题十四: java.io.IOException: java.io.EOFException: Unexpected end of input stream

原因分析:

spark任务输入数据异常,spark任务读取gz格式压缩的csv文件时,由于存在异常数据发生报错。gz格式压缩的文件存在空数据


解决方案:

1.定位到异常数据清除即可

2.过滤异常数据直接写入


问题十五: Exception in thread “main” java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object;

原因分析:

scala版本不一致


解决方案:

更换 服务scala版本一致的镜像


相关实践学习
日志服务之数据清洗与入湖
本教程介绍如何使用日志服务接入NGINX模拟数据,通过数据加工对数据进行清洗并归档至OSS中进行存储。
相关文章
|
9月前
|
SQL 存储 分布式计算
Hive 和 Spark 分区策略剖析
Hive 和 Spark 分区策略剖析
|
分布式计算 调度 Spark
|
消息中间件 缓存 分布式计算
Spark调优策略
在利用Spark处理数据时,如果数据量不大,那么Spark的默认配置基本就能满足实际的业务场景。但是当数据量大的时候,就需要做一定的参数配置调整和优化,以保证业务的安全、稳定的运行。并且在实际优化中,要考虑不同的场景,采取不同的优化策略。
|
9天前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
26 1
Spark快速大数据分析PDF下载读书分享推荐
|
2月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
28天前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
124 59
|
11天前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
26 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
1月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
50 6
|
29天前
|
分布式计算 Hadoop 大数据
大数据技术:Hadoop与Spark的对比
【6月更文挑战第15天】**Hadoop与Spark对比摘要** Hadoop是分布式系统基础架构,擅长处理大规模批处理任务,依赖HDFS和MapReduce,具有高可靠性和生态多样性。Spark是快速数据处理引擎,侧重内存计算,提供多语言接口,支持机器学习和流处理,处理速度远超Hadoop,适合实时分析和交互式查询。两者在资源占用和生态系统上有差异,适用于不同应用场景。选择时需依据具体需求。