Spark结合源码解决数据倾斜造成Too Large Frame

简介:
新公司遇到的第一个spark的坑,寻找原因的过程其实还挺有意思,最终在源码和spark ui上的统计数据的帮助下找到根源,具体如下。
先说下问题
由于严重的数据倾斜,大量数据集中在单个task中,导致shuffle过程中发生异常
完整的exeception是这样的
a6260f80eeb9bd20b5b2742f93088bff80804dfb
但奇怪的是,经过尝试减小executor数量后任务反而成功,增大反而失败,经过多次测试,问题稳定复现。
成功的executor数量是7,失败的则是15,集群的active node是7
这结果直接改变了认知,也没爆内存,cpu也够,怎么会这样,executor数量应该越多越快啊,居然还失败了。
解决过程
这个数在几个失败里不一样,但是都超过了Integer.MaxValue。在spark源码中,这条异常发生在TransportFrameDecoder这个类中

d9742eef63ad6bcff47755849944d6464d1b3526
检查发现是frame的大小超过了MAX_FRAME_SIZE,而MAX_FRAME_SIZE的大小定义如下
8120318c8516da2d665813ca38b5407e61282a38
这个TransportFrameDecoder继承自ChannelInboundHandlerAdapter,这是Netty里的类,好了,看到这就明白了,原来错误发生在网络传输过程中,是数据量超大了。
但是对比了成功与失败的任务,都是单个task严重倾斜啊。再看下两个任务的executor分配。
失败的任务
99c0b107b0ba53ff14a0e0b2e5762b401f2cb3b6
成功的任务
85d4ed621c401ccdbf3873ef0a8e4d7d1fc796c2
失败的任务里,分配到的节点上都有多个executor;成功的任务里则每个节点只有一个executor。
再看下stage,失败的任务失败在stage26,这个stage依赖于stage24。看图说话
9e565d06f30de1fcbcc366ef49a504d55e6b95f7
两个任务的stage24都是成功的,看下24的executor的数据量情况
a26bcad17bfe4a1e10a574dc664efc8ed1eff914
1ad2231f25e432dc79ecf4b8de3fecd6b4b20e96
可以看到,两个任务在这个stage上由于数据倾斜,所有数据输入输出都在一个executor中完成。但在stage26中,区别来了
为了提升性能,在hadoop和spark中都会尽量选择数据本地性,尽量让数据local,不行再选择rack等其他方案。而24的输出会作为26的输入。所以24之后自然会选择相同节点上的executor,看下stage26的情况
成功的任务
1f7b3dcd6992877a974fe784eed50639a6f5d007
失败的任务
f75e6810fa397b743bac5663f86e6a439e78b210
在成功的任务里,stage26与24的executor完全是同一个,这样数据是完全本地化的,甚至是同一个进程,因而经过优化不再需要通过网络传输
而在失败的任务里,stage26在执行时发现这个node上有3个executor,为了性能的提升,将数据分配给3个executor执行计算。可见其中也成功了一半,32686这个端口的executor是24中执行的那个,因而虽然它要处理3.3g的数据,但是因为不需要网络传输,也仍然可以成功。可是对于另外两个,即使是同一个节点,但是由于是不同进程,仍然需要通过netty的server拉取数据,而这一次的拉取最大不能超过int最大值,因而失败了一个,导致整个stage失败,也就导致了整个job的失败。
总结
由此可见在数据极度倾斜的情况下,增大executor的数量未见得是好事,还是要根据具体情况而来。减小了数量解决了问题,但是这其实并不是最好的解决方案,因为这种情况下,可见数据基本等同于本地执行,完全浪费了集群的并发性,更好的解决方案还需要再继续深入理解。
目录
相关文章
|
5月前
|
SQL 分布式计算 HIVE
Spark数据倾斜问题分析和解决
Spark数据倾斜问题分析和解决
43 0
|
3月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
|
2天前
|
分布式计算 Java 关系型数据库
|
7天前
|
SQL 分布式计算 数据可视化
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
数据分享|Python、Spark SQL、MapReduce决策树、回归对车祸发生率影响因素可视化分析
|
16天前
|
新零售 分布式计算 数据可视化
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
|
2月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
139 1
|
2月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
108 2
|
2月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
|
4月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
4月前
|
JSON 分布式计算 关系型数据库
Spark中使用DataFrame进行数据转换和操作
Spark中使用DataFrame进行数据转换和操作