大数据SQL数据倾斜与数据膨胀的优化与经验总结

简介: 目前市面上大数据查询分析引擎层出不穷,但在业务使用过程中,大多含有性能瓶颈的SQL,主要集中在数据倾斜与数据膨胀问题中。本文结合业界对大数据SQL的使用与优化,尝试给出相对系统性的解决方案。

1.背景

目前市面上大数据查询分析引擎层出不穷,如Spark,Hive,Presto等,因其友好的SQL语法,被广泛应用于各领域分析,公司内部也有优秀的ODPS SQL供用户使用。笔者所在团队的项目也借用ODPS SQL去检测业务中潜在的安全风险。在给业务方使用与答疑过程中,我们发现大多含有性能瓶颈的SQL,主要集中在数据倾斜与数据膨胀问题中。因此,本文主要基于团队实际开发经验与积累,并结合业界对大数据SQL的使用与优化,尝试给出相对系统性的解决方案。本文主要涉及业务SQL执行层面的优化,暂不涉及参数优化。若设置参数,首先确定执行层面哪个阶段(Map/Reduce/Join)任务执行时间较长,从而设置对应参数。

本文主要分为以下三个部分:第一部分,会引入数据倾斜与数据膨胀问题。第二部分,介绍当数据倾斜与数据膨胀发生时,如何排查与定位。第三部分,会从系统层面给出常见优化思路。

2.问题篇

2.1 数据倾斜

数据倾斜是指在分布式计算时,大量相同的key被分发到同一个reduce节点中。针对某个key值的数据量比较多,会导致该节点的任务数据量远大于其他节点的平均数据量,运行时间远高于其他节点的平均运行时间,拖累了整体SQL执行时间。其主要原因是key值分布不均导致的Reduce处理数据不均匀。本文将从Map端优化,Reduce端优化和Join端优化三方面给出相应解决方案。

2.2 数据膨胀

数据膨胀是指任务的输出条数/数据量级比输入条数/数据量级大很多,如100M的数据作为任务输入,最后输出1T的数据。这种情况不仅运行效率会降低,部分任务节点在运行key值量级过大时,有可能发生资源不足或失败情况。

3.排查定位篇

本节主要关注于业务SQL本身引起的长时间运行或者失败,对于集群资源情况,平台故障本身暂不考虑在内。1.首先检查输入数据量级。与其他天相比有无明显量级变化,是否因为数据量级的问题天然引起任务运行时间过长,如双11,双十二等大促节点。2.观察执行任务拆分后各个阶段运行时间。与其他天相比有无明显量级变化;在整个执行任务中时间耗时占比情况。3.最耗时阶段中,观察各个Task的运行情况。Task列表中,观察是否存在某几个Task实例耗时明显比平均耗时更长,是否存在某几个Task实例处理输入/输出数据量级比平均数据量级消费产出更多。4.根据步骤3中定位代码行数,定位问题业务处理逻辑。

4.优化篇

4.1数据倾斜

4.1.1Map端优化

4.1.1.1读取数据合并

在数据源读取查询时,动态分区数过多可能造成小文件数过多,每个小文件至少都会作为一个块启动一个Map任务来完成。对于文件数量而言,等于 map数量 * 分区数。对于一个Map任务而言,其初始化的时间可能远远大于逻辑处理时间,因此通过调整Map参数把小文件合并成大文件进行处理,避免造成很大的资源浪费。

4.1.1.2列裁剪

减少使用select * from table语句,过多选择无用列会增加数据在集群上传输的IO开销;对于数据选择,需要加上分区过滤条件进行筛选数据。

4.1.1.3谓词下推

在不影响结果的情况下,尽可能将过滤条件表达式靠近数据源位置,使之提前执行。通过在map端过滤减少数据输出,降低集群IO传输,从而提升任务的性能。

4.1.1.4数据重分布

在Map阶段做聚合时,使用随机分布函数distribute by rand(),控制Map端输出结果的分发,即map端如何拆分数据给reduce端(默认hash算法),打乱数据分布,至少不会在Map端发生数据倾斜。

4.1.2Reduce端优化

4.1.2.1关联key空值检验

部分实例发生长尾效应,很大程度上由于null值,空值导致,使得Reduce时含有脏值的数据被分发到同一台机器中。针对这种问题SQL,首先确认包含无效值的数据源表是否可以在Map阶段直接过滤掉这些异常数据;如果后续SQL逻辑仍然需要这些数据,可以通过将空值转变成随机值,既不影响关联也可以避免聚集。

SELECT  ta.id
FROM    ta
LEFT JOIN tb
ON      coalesce(ta.id , rand()) = tb.id;

4.1.2.2排序优化

Order by为全局排序,当表数据量过大时,性能可能会出现瓶颈;

Sort by为局部排序,确保Reduce任务内结果有序,全局排序不保证;

Distribute by按照指定字段进行Hash分片,把数据划分到不同的Reducer中;

CLUSTER BY:根据指定的字段进行分桶,并在桶内进行排序,可以认为cluster by是distribute by+sort by。对于排序而言,尝试用distribute by+sort by确保reduce中结果有序,最后在全局有序。

-- 原始脚本
select *
from user_pay_table
where dt = '20221015'
order by amt
limit 500
;

-- 改进脚本
SELECT  *
FROM    user_pay_table
WHERE   dt = '20221015'
DISTRIBUTE BY ( CASE
                   WHEN amt < 100                  THEN 0
                   WHEN amt >= 100 AND age <= 2000 THEN 1
                   ELSE 2
                 END )
 SORT BY amt
LIMIT 500
;

4.1.3Join端优化

4.1.3.1大表join小表

通过将需要join的小表分发至map端内存中,将Join操作提前至map端执行,避免因分发key值不均匀引发的长尾效应,复杂度从(M*N)降至(M+N),从而提高执行效率。ODPS SQL与Hive SQL使用mapjoin,SPARK使用broadcast。

4.1.3.2大表join大表

长尾效应由热点数据导致,可以将热点数据加入白名单中,通过对白名单数据和非白名单数据分别处理,再合并数据。

具体表现为打散倾斜key,进行两端聚合(针对聚合)或者拆分倾斜key进行打散然后再合并数据。

4.2数据膨胀

4.2.1避免笛卡尔积

Join关联条件有误,表Join进行笛卡尔积,造成数据量爆炸。

4.2.2关联key区分度校验

关注JoinKey区分度,key值区分度越低(distinct数量少),越有可能造成数据爆炸情况。如用户下的性别列,交易下的省市列等。

4.2.3聚合操作误用

部分聚合操作需要将中间结果记录下来,最后再生成最终结果,这使得在select操作时,按照不同维度去重Distinct、不同维度开窗计算over Partition By可能会导致数据膨胀。针对这种业务逻辑,可以将一个SQL拆分成多个SQL分别进行处理操作。

5.总结

大数据SQL优化是一项涉及知识面较广的工作,除了分析现有执行计划之外,还需要学习相应查询分析引擎设计原理。针对我们日常遇到的问题现总结分享给大家,供大家查阅。


参考资料

ODPS SELECT语法

Presto Query Lifecycle

A Definitive Guide To Hive Performance Tuning- 10 Excellent Tips

Presto Performance: Speed, Optimization & Tuning

Hive Optimizing Joins

作者介绍
目录