EMR Spark-SQL性能极致优化揭秘 RuntimeFilter Plus

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 在 2019 年的打榜测试中,我们基于 Spark SQL Catalyst Optimizer 开发的 RuntimeFilter 优化 对于 10TB 数据 99 query 的整体性能达到 35% 左右的提升。

作者:陆路,花名世仪,阿里巴巴计算平台事业部EMR团队高级开发工程师,大数据领域技术爱好者,对Spark、Hive等有浓厚兴趣和一定的了解,目前主要专注于EMR产品中开源计算引擎的优化工作。


背景介绍

TPC-DS 测试集采用星型和雪花型等多维数据模型,包含 7 张事实表和 17 张维度表,以 store channel 为例,事实表和维度表的关联关系如下所示:
image.png

分析 TPC-DS 全部 99 个查询语句不难发现,绝大部分语句的过滤条件都不是直接作用于事实表,而是通过过滤维度表并将结果集与事实表 join 来间接完成。因此,优化器很难直接利用事实表索引来减少数据扫描量。如何利用好查询执行时的维度表过滤信息,并将这些信息下推至存储层来完成事实表的过滤,对于性能提升至关重要。

在 2019 年的打榜测试中,我们基于 Spark SQL Catalyst Optimizer 开发的 RuntimeFilter 优化 对于 10TB 数据 99 query 的整体性能达到 35% 左右的提升。简单来说,RuntimeFilter 包括两点核心优化:

  1. 动态分区裁剪:事实表以日期列(date_sk)为分区列建表,当事实表与 date_dim 表 join 时,optimizer 在运行时收集 date_dim 过滤结果集的所有 date_sk 取值,并在扫描事实表前过滤掉所有未命中的分区文件。
  2. 非分区列动态过滤:当事实表与维度表的 join 列为非分区列时,optimizer 动态构建和收集维度表结果集中 join 列的 Min-Max Range 或 BloomFilter,并在扫描事实表时下推至存储层,利用存储层索引(如 Parquet、ORCFile 的 zone map 索引)来减少扫描数据量。

问题分析

为了进一步挖掘 RuntimeFilter 优化的潜力,我们选取了部分执行时间较长的 query 进行了细致的性能剖析。这些 query 均包含大于一个事实表和多个维度表的复杂 join。在分析了 RuntimeFilter 对各个 query 的性能提升效果后,我们发现:

  1. 动态分区裁剪的性能提升效果明显,但很难有进一步的优化空间
  2. 非分区列动态过滤对整体提升贡献相比分区裁剪小很多,主要是因为很多下推至存储层的过滤条件并没有达到索引扫描的效果

聪明的同学应该已经发现,只有 date_dim 这一张维度表和分区列相关,那么所有与其它维度表的 join 查询从 RuntimeFilter 优化中受益都较为有限。对于这种情况,我们做了进一步的拆解分析:

  1. 绝大部分 join 列均为维度表的自增主键,且与过滤条件没有相关性,因此结果集取值常常均匀稀疏地散布在该列的整个取值空间中
  2. 对于事实表,考虑最常见的 Zone Map 索引方式,由于 load 阶段没有针对非分区列做任何聚集操作(Clustering),每个 zone 的取值一般也稀疏分散在各个列的值域中。
  3. 相比 BloomFilter,Min-Max Range 的构建开销和索引查询开销要低得多,但由于信息粒度太粗,索引过滤命中的效果也会差很多

综合以上几点考虑,一种可能的优化方向是在 load 阶段按照 join 列对事实表进行 Z-Order 排序。但是这种方式会显著增加 load 阶段执行时间,有可能导致 TPC-DS 评测总分反而下降。同时,由于建表阶段优化的复杂性,实际生产环境的推广使用也会比较受限。

RuntimeFilter Plus

基于上述分析,我们认为依赖过滤条件下推至存储层这一方式很难再提升查询性能,尝试往其它方向进行探索:

  1. 不依赖存储层索引
  2. 不仅优化事实表与维度表 join

最终我们提炼两个新的运行时过滤优化点:维度表过滤广播和事实表 join 动态过滤,并在原版 RuntimeFilter 优化的基础上进行了扩展实现。

维度表过滤广播

这一优化的思想来源于 Lookahead Information Passing(LIPimage.png

当事实表(lineorder)连续与多个维度表过滤结果做 multi-join 时,可将所有维度表的过滤信息下推至 join 之前。该方法与我们的 RuntimeFilter 的主要不同在于下推时考虑了完整的 multi-join tree 而不是局部 binary-join tree。其优化效果是即使 join ordering 为 bad case,无用的事实表数据也能够被尽早过滤掉,即让查询执行更加 robust。

我们参考论文算法实现了第一版过滤下推规则,但并没有达到预期的性能提升,主要原因在于:

  1. Spark CBO Join-Reorder 结合我们的遗传算法优化,已经达到了接近最优的 join ordering 效果
  2. 前置的 LIP filters 执行性能并没有明显优于 Spark BroadcastHashJoin 算子

基于过滤条件可以传递至复杂 multi-join tree 的任意节点这一思想去发散思考,我们发现,当 multi-join tree 中存在多个事实表时,可将维度表过滤条件广播至所有的事实表 scan,从而减少后续事实表 SortMergeJoin 等耗时算子执行时所需处理的数据量。以一个简化版的 query 64 为例:

with cs_ui as
(select cs_item_sk
,sum(cs_ext_list_price) as sale
from catalog_sales
,catalog_returns
where cs_item_sk = cr_item_sk
and cs_order_number = cr_order_number
group by cs_item_sk)
select i_product_name product_name
,i_item_sk item_sk
,sum(ss_wholesale_cost) s1
from store_sales
,store_returns
,cs_ui
,item
where ss_item_sk = i_item_sk and
ss_item_sk = sr_item_sk and
ss_ticket_number = sr_ticket_number and
ss_item_sk = cs_ui.cs_item_sk and
i_color in ('almond','indian','sienna','blue','floral','rosy') and
i_current_price between 19 and 19 + 10 and
i_current_price between 19 + 1 and 19 + 15
group by i_product_name
,i_item_sk

该查询的 plan tree 如下图所示:
image.png

考虑未实现维度表过滤广播的执行流程,store_sales 数据经过 RuntimeFilter 和 BroadcastHashJoin 算子进行过滤,但由于过滤后数据仍然较大,后续的所有 join 都需要走昂贵的 SortMergeJoin 算子。但如果将 LIP filter 下推至 4 张事实表的 scan 算子(无需下推至存储层),不仅减少了 join 数据量,也减少了 catalog_sales 和 catalog_returns 表 join 后的 group-by aggregation 数据量 。

LIP 实现

在 optimizer 层,我们在原版 RuntimeFilter 的 SyntheticJoinPredicate 规则后插入 PropagateDynamicValueFilter 规则,将合成的动态谓词广播至所有合法的 join 子树中;同时结合原有的谓词下推逻辑,保证动态谓词最终传播到所有相关的 scan 算子上。在算子层,LIP filters 的底层实现可以是 HashMap 或 BloomFilter,针对 TPC-DS 的数据特性,我们选择 BitMap 作为广播过滤条件的底层实现。由于 BitMap 本身是精确的(Exact Filter),可以结合主外键约束信息进一步做 semi-join 消除优化。基于主外键约束的优化规则将在系列后续文章做详细介绍。

应用该优化后,query 64 执行时间由 177 秒降低至 63 秒,加速比达到 2.8 倍。

事实表 Join 动态过滤

使用 BloomFilter 来优化大表 join 是一种常见的查询优化技术,比如在论文《Building a Hybrid Warehouse: Efficient Joins between Data Storedin HDFS and Enterprise Warehouse》中提出对 join 两表交替应用 BloomFilter 的 zig-zag join 方法,降低分布式 join 中的数据传输总量。对于 TPC-DS 测试集,以 query 93 为例,store_sales 与 store_returns join 后的结果集大小远小于 store_sales 原始数据量,非常适合应用这一优化。

BloomFilter 的构建和应用都存在较高的计算开销,对于 selectivity 较大的join,盲目使用这一优化可能反而导致性能回退。基于静态 stats 的 join selectivity 估算往往误差,Spark 现有的 CBO 优化规则难以胜任鲁棒的 BloomFilter join 优化决策。因此,我们基于 Spark Adaptive Execution(AE) 运行时重优化机制来实现动态的 BloomFilter join 优化规则。AE 的基本原理是在查询作业的每个 stage 执行完成后,允许优化器根据运行时采集的 stage stats 信息重新调整后续的物理执行计划。目前主要支持三种优化:
(1)reduce stage 并发度调整;
(2)针对 skew 情况的 shuffle 数据均衡分布;
(3)SortMergeJoin 转换为 BroadcastHashJoin

基于 AE 的优化规则流程如下:

  1. 根据静态 stats 判断 join 的一端的 size 是否可能适合构建 BloomFilter( build side),如果是,则 build side 和 stream side 的 scan stage 会依次串行提交执行;否则这两个 stage 将并行执行。
  2. 在 build side 的 scan stage 执行完成后,AE 根据运行时收集的 size 和 join 列 histogram 进行代价估算,并决定最终走 BroadcastHashJoin、BloomFilter-SortMergeJoinJoin 还是原本的 SortMergeJoin。
  3. 当物理执行计划为 BloomFilter-SortMergeJoinJoin,优化器会插入一个新的作业并行扫描 build side 的 shuffle 数据来构建 BloomFilter,并下推至 stream side 的 scan stage 中。

BloomFilter 算子实现

为了减少 BloomFilter 带来的额外开销,我们重新实现了高效的 BuildBloomFiler 和 Native-InBloomFilter 的算子。在构建阶段,使用 RDD aggregate 来合并各个数据分片的 BloomFiler 会导致 driver 成为数据传输和 bitmap 合并计算的性能瓶颈;使用 RDD treeAggregate 实现并行分层合并显著降低了整体的构建延迟。在过滤阶段,Native-InBloomFilter 的算子会被推入 scan 算子中合并执行。该算子直接访问 Spark 列式读取内存格式,按批量数据来调用 SIMD 优化的 native 函数,降低 CPU 执行开销;同时,我们将原版算法替换为 Blocked BloomFilter 算法实现,该算法通过牺牲少量的 bitmap 存储空间来换取访存时更低的 CPU cache miss 率。

应用该优化后,query 93 执行时间由 225 秒降低至 50 秒,加速比达到 4.5 倍。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
image.png

Apache Spark技术交流社区公众号,微信扫一扫关注

image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
25天前
|
SQL 存储 关系型数据库
如何巧用索引优化SQL语句性能?
本文从索引角度探讨了如何优化MySQL中的SQL语句性能。首先介绍了如何通过查看执行时间和执行计划定位慢SQL,并详细解析了EXPLAIN命令的各个字段含义。接着讲解了索引优化的关键点,包括聚簇索引、索引覆盖、联合索引及最左前缀原则等。最后,通过具体示例展示了索引如何提升查询速度,并提供了三层B+树的存储容量计算方法。通过这些技巧,可以帮助开发者有效提升数据库查询效率。
40 2
|
5天前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
53 2
|
11天前
|
SQL 资源调度 分布式计算
如何让SQL跑快一点?(优化指南)
这篇文章主要探讨了如何在阿里云MaxCompute(原ODPS)平台上对SQL任务进行优化,特别是针对大数据处理和分析场景下的性能优化。
|
16天前
|
存储 分布式计算 监控
Spark如何优化?需要注意哪些方面?
【10月更文挑战第10天】Spark如何优化?需要注意哪些方面?
24 6
|
20天前
|
SQL 监控 数据库
慢SQL对数据库写入性能的影响及优化技巧
在数据库管理系统中,慢SQL(即执行缓慢的SQL语句)不仅会影响查询性能,还可能对数据库的写入性能产生显著的不利影响
|
2月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
104 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
23天前
|
SQL 关系型数据库 PostgreSQL
遇到SQL 子查询性能很差?其实可以这样优化
遇到SQL 子查询性能很差?其实可以这样优化
67 2
|
22天前
|
分布式计算 Java 大数据
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
30 0
大数据-92 Spark 集群 SparkRDD 原理 Standalone详解 ShuffleV1V2详解 RDD编程优化
|
20天前
|
SQL 存储 数据库
慢SQL对数据库写入性能的影响及优化技巧
在数据库管理系统中,慢SQL(即执行缓慢的SQL语句)不仅会影响查询性能,还可能对数据库的写入性能产生显著的不利影响
|
22天前
|
SQL 分布式计算 大数据
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化
28 0