工作经验分享:Spark调优【优化后性能提升1200%】

简介: 工作经验分享:Spark调优【优化后性能提升1200%】

优化后效果



1.业务处理中存在复杂的多表关联和计算逻辑(原始数据达百亿数量级)

2.优化后,spark计算性能提升了约12倍(6h-->30min)

3.最终,业务的性能瓶颈存在于ES写入(计算结果,ES索引document数约为21亿 pri.store.size约 300gb)



c1a2e19983a9954eccb078e8d9864b4b.png

1. 背景


业务数据不断增大, Spark运行时间越来越长, 从最初的半小时到6个多小时

某日Spark程序运行6.5个小时后, 报“Too large frame...”的异常

org.apache.spark.shuffle.FetchFailedException: Too large frame: 2624680416


2. 原因分析


2.1. 抛出异常的原因

Spark uses custom frame decoder

(TransportFrameDecoder) which does not support frames larger than 2G.

This lead to fails when shuffling using large partitions.

根本原因: 源数据的某一列(或某几列)分布不均匀,当某个shuffle操作是根据此列数据进行shuffle时,就会造成整个数据集发生倾斜,即某些partition包含了大量数据,超出了2G的限制。

异常,就是发生在业务数据处理的最后一步left join操作


2.2. 粗暴的临时解决方法

增大partition数, 让partition中的数据量<2g

由于是left join触发了shuffle操作, 而spark默认join时的分区数为200(即spark.sql.shuffle.partitions=200), 所以增大这个分区数, 即调整该参数为800, 即spark.sql.shuffle.partitions=800


2.3. 解决效果

Spark不再报错,而且“艰难”的跑完了, 跑了近6个小时!

通过Spark UI页面的监控发现, 由于数据倾斜导致, 整个Spark任务的运行时间是被少数的几个Task“拖累的”


59b1889dbffb05f731494f21c58e02db.png

3. 思考优化



3.1. 确认数据倾斜

方法一: 通过sample算子对DataSet/DataFrame/RDD进行采样, 找出top n的key值及数量

方法二: 源数据/中间数据落到存储中(如HIVE), 直接查询观察


3.2. 可选方法


1.HIVE ETL 数据预处理

把数据倾斜提前到 HIVE ETL中, 避免Spark发生数据倾斜

这个其实很有用


2.过滤无效的数据 (where / filter)

NULL值数据

“脏数据”(非法数据)

业务无关的数据


3.分析join操作, 左右表的特征, 判断是否可以进行小表广播 broadcast

(1)这样可避免shuffle操作,特别是当大表特别大

(2)默认情况下, join时候, 如果表的数据量低于spark.sql.autoBroadcastJoinThreshold参数值时(默认值为10 MB), spark会自动进行broadcast, 但也可以通过强制手动指定广播

visitor_df.join(broadcast(campaign_df), Seq("random_bucket", "uuid", "time_range"), "left_outer")

业务数据量是100MB

(3)Driver上有一个campaign_df全量的副本, 每个Executor上也会有一个campaign_df的副本

(4)JOIN操作, Spark默认都会进行 merge_sort (也需要避免倾斜)


4.数据打散, 扩容join

分散倾斜的数据, 给key加上随机数前缀

A.join(B)


09908370b67d6f773333e168af3cc899.png

1.提高shuffle操作并行度

spark.sql.shuffle.partitions


2.多阶段

aggregate操作: 先局部聚合, 再全局聚合

给key打随机值, 如打上1-10, 先分别针对10个组做聚合

最后再统一聚合

join操作: 切成多个部分, 分开join, 最后union

判断出,造成数据倾斜的一些key值 (可通过观察或者sample取样)

如主号

单独拎出来上述key值的记录做join, 剩余记录再做join

独立做优化, 如broadcast

结果数据union即可


3.3. 实际采用的方法

HIVE 预处理

过滤无效的数据

broadcast

打散 --> 随机数

shuffle 并行度

Example:

1. ......
2. visitor_leads_fans_df.repartition($"random_index")
3.     .join(broadcast(campaign_df), Seq("random_bucket", "uuid", "time_range"), "left_outer")
4.     .drop("random_bucket", "random_index")
5. ......
复制代码


目录
相关文章
|
4月前
|
分布式计算 Java 数据库连接
回答粉丝疑问:Spark为什么调优需要降低过多小任务,降低单条记录的资源开销?
回答粉丝疑问:Spark为什么调优需要降低过多小任务,降低单条记录的资源开销?
37 1
|
4月前
|
分布式计算 资源调度 大数据
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
大数据开发岗面试复习30天冲刺 - 日积月累,每日五题【Day30】——Spark数据调优(文末附完整文档)
65 0
|
11月前
|
SQL 机器学习/深度学习 分布式计算
「大数据架构」Spark 3.0发布,重大变化,性能提升18倍
「大数据架构」Spark 3.0发布,重大变化,性能提升18倍
|
10月前
|
分布式计算 并行计算 Spark
|
SQL 存储 缓存
工作常用之Spark调优【二】资源调优
使用 kryo 序列化并且使用 rdd 序列化缓存级别。使用 kryo 序列化需要修改 spark 的序列化模式,并且需要进程注册类操作。
168 1
工作常用之Spark调优【二】资源调优
|
SQL 存储 分布式计算
工作常用之Spark调优【一】
Spark 3.0 大版本发布, Spark SQL 的优化占比将近 50% 。 Spark SQL 取代 Spark Core ,成为新一代的引擎内核,所有其他子框架如 Mllib 、 Streaming 和 Graph ,都可以共享 SparkSQL 的性能优化,都能从 Spark 社区对于 Spark SQL 的投入中受益。
169 0
工作常用之Spark调优【一】
|
SQL 消息中间件 存储
每日积累【Day2】SPARK调优
每日积累【Day2】SPARK调优
每日积累【Day2】SPARK调优
|
存储 分布式计算 资源调度
Spark on k8s 在阿里云 EMR 的优化实践
本文整理自阿里云技术专家范佚伦在7月17日阿里云数据湖技术专场交流会的分享。
Spark on k8s 在阿里云 EMR 的优化实践
|
分布式计算 Java API
Spark常见优化原则
提交任务参数请参考这篇文章(包括优化建议):Spark部署模式、任务提交 - GoAl
117 0
Spark常见优化原则
|
分布式计算 Spark
【Spark 调优】Spark 开发调优的十大原则
【Spark 调优】Spark 开发调优的十大原则
179 0
【Spark 调优】Spark 开发调优的十大原则