Spark结合源码解决数据倾斜造成Too Large Frame

简介:
新公司遇到的第一个spark的坑,寻找原因的过程其实还挺有意思,最终在源码和spark ui上的统计数据的帮助下找到根源,具体如下。
先说下问题
由于严重的数据倾斜,大量数据集中在单个task中,导致shuffle过程中发生异常
完整的exeception是这样的
a6260f80eeb9bd20b5b2742f93088bff80804dfb
但奇怪的是,经过尝试减小executor数量后任务反而成功,增大反而失败,经过多次测试,问题稳定复现。
成功的executor数量是7,失败的则是15,集群的active node是7
这结果直接改变了认知,也没爆内存,cpu也够,怎么会这样,executor数量应该越多越快啊,居然还失败了。
解决过程
这个数在几个失败里不一样,但是都超过了Integer.MaxValue。在spark源码中,这条异常发生在TransportFrameDecoder这个类中

d9742eef63ad6bcff47755849944d6464d1b3526
检查发现是frame的大小超过了MAX_FRAME_SIZE,而MAX_FRAME_SIZE的大小定义如下
8120318c8516da2d665813ca38b5407e61282a38
这个TransportFrameDecoder继承自ChannelInboundHandlerAdapter,这是Netty里的类,好了,看到这就明白了,原来错误发生在网络传输过程中,是数据量超大了。
但是对比了成功与失败的任务,都是单个task严重倾斜啊。再看下两个任务的executor分配。
失败的任务
99c0b107b0ba53ff14a0e0b2e5762b401f2cb3b6
成功的任务
85d4ed621c401ccdbf3873ef0a8e4d7d1fc796c2
失败的任务里,分配到的节点上都有多个executor;成功的任务里则每个节点只有一个executor。
再看下stage,失败的任务失败在stage26,这个stage依赖于stage24。看图说话
9e565d06f30de1fcbcc366ef49a504d55e6b95f7
两个任务的stage24都是成功的,看下24的executor的数据量情况
a26bcad17bfe4a1e10a574dc664efc8ed1eff914
1ad2231f25e432dc79ecf4b8de3fecd6b4b20e96
可以看到,两个任务在这个stage上由于数据倾斜,所有数据输入输出都在一个executor中完成。但在stage26中,区别来了
为了提升性能,在hadoop和spark中都会尽量选择数据本地性,尽量让数据local,不行再选择rack等其他方案。而24的输出会作为26的输入。所以24之后自然会选择相同节点上的executor,看下stage26的情况
成功的任务
1f7b3dcd6992877a974fe784eed50639a6f5d007
失败的任务
f75e6810fa397b743bac5663f86e6a439e78b210
在成功的任务里,stage26与24的executor完全是同一个,这样数据是完全本地化的,甚至是同一个进程,因而经过优化不再需要通过网络传输
而在失败的任务里,stage26在执行时发现这个node上有3个executor,为了性能的提升,将数据分配给3个executor执行计算。可见其中也成功了一半,32686这个端口的executor是24中执行的那个,因而虽然它要处理3.3g的数据,但是因为不需要网络传输,也仍然可以成功。可是对于另外两个,即使是同一个节点,但是由于是不同进程,仍然需要通过netty的server拉取数据,而这一次的拉取最大不能超过int最大值,因而失败了一个,导致整个stage失败,也就导致了整个job的失败。
总结
由此可见在数据极度倾斜的情况下,增大executor的数量未见得是好事,还是要根据具体情况而来。减小了数量解决了问题,但是这其实并不是最好的解决方案,因为这种情况下,可见数据基本等同于本地执行,完全浪费了集群的并发性,更好的解决方案还需要再继续深入理解。
目录
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
48 3
|
3月前
|
存储 分布式计算 Java
|
3月前
|
分布式计算 监控 大数据
如何处理 Spark 中的倾斜数据?
【8月更文挑战第13天】
246 4
|
3月前
|
存储 缓存 分布式计算
|
3月前
|
SQL 存储 分布式计算
|
3月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
49 1
|
4月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
弹性计算 分布式计算 DataWorks
DataWorks产品使用合集之spark任务如何跨空间取表数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
41 1
|
5月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 分布式计算 NoSQL
使用Spark高效将数据从Hive写入Redis (功能最全)
使用Spark高效将数据从Hive写入Redis (功能最全)
375 1