1. Paimon
Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。Paimon 采用开放的数据格式和技术理念,可以与 Flink / Spark / Trino 等诸多业界主流计算引擎进行对接,共同推进 Streaming Lakehouse 架构的普及和发展。
2. Paimon x Spark
Apache Spark,作为大数据处理的统一计算分析引擎,不仅支持多种语言的高级API使用,也支持了丰富的大数据场景应用,包括结构化数据处理的 Spark SQL、用于机器学习的 MLlib,用于图形处理的 GraphX,以及用于增量计算和流处理的 Structured Streaming。Spark 已经成为了大数据领域软件栈中必不可少的组成部分。对 Paimon 来说,为了在准实时和离线湖仓场景更加便利的落地,与 Spark 深度、全面的集成势在必行。
在之前的 Paimon Release 版本,我们着重丰富 Paimon 在功能上和 Spark SQL 生态的集成,包括 Schema Evolution,Structured Streaming Read/Write,Dynamic Insert Overwrite Partition,Update/Merge Into 等等。在最近发布的0.6和0.7版本,我们开始在 Paimon 基于 Spark SQL 查询性能上做一些工作。在初期我们会结合 Spark SQL 已有的优化规则和框架,让 Paimon 充分利用到这些。通过一系列优化,我们将 Paimon x Spark 在 TpcDS 上的性能提高了37+%,已基本和 Parquet x Spark 持平。下文将对其中的关键优化点进行详细介绍。
2.1 动态分区裁剪
动态分区裁剪(Dynamic Partition Prunning,DPP)在 SQL 优化中是常见的优化点,本质上是谓词下推(Predicate PushDown)的一种拓展,其目的是最小化从数据源中读取数据的 IO 成本,也进而减少了计算成本。
在数仓中,常常将较大的事实表和很小的维度表关联查询,且事实表需要根据维表中的字段信息来进行过滤,如下面TpcDS Q14中的 SQL 片段:
select ss_quantity quantity ,ss_list_price list_price from store_sales, date_dim where ss_sold_date_sk = d_date_sk and d_year between 1999 and 1999 + 2 order by quantity limit 10;
在不支持 DPP 的情况下的执行计划简化如下:
Paimon 应用的是 Spark DataSource V2的查询框架,该框架在 Spark3.2 后提供了 SupportsRuntimeFiltering
接口用于 V2 表实现运行时的动态过滤。理论上,任何字段(包括普通数据字段和分区字段)的过滤条件都能被应用,但一般而言仅分区字段的过滤条件能够被完全应用,即无需上层的 Filter 的节点再使用该过滤条件去选择数据。 Paimon 表通过该接口实现了动态分区裁剪的能力。在支持 DPP 后执行计划如下所示:
在1T的 TpcDS 数据集下,应用 DPP 后store_sales
表参与 join 的数据量从27亿 减少到16亿。仅应用到该优化后,Q14运行时间减少到原来的~55%,1T TpcDS 数据集的查询性能整体提升20+%;
相关代码:
2.2 Exchange 复用
Exchange 是 Spark 中物理计划中一个关键的操作,对应逻辑计划中的 Shuffle。在执行阶段,Exchange 可以代表某个 SQL 中部分 Plan 输出的数据。在复杂的 SQL 中,我们可以通过公共表表达式(Common Table Expression,CTE)语法定义一个 SQL 片段,用于简化整个 SQL 或者被多次使用。以下面简化的 TpcDS Q23 为例,定义的其中一个 CTEfrequent_ss_items
在整个 SQL 中被两次使用。
with frequent_ss_items as ( select substr(i_item_desc,1,30) itemdesc, i_item_sk item_sk, d_date solddate, count(*) cnt from store_sales, date_dim, item where ss_sold_date_sk = d_date_sk and ss_item_sk = i_item_sk and d_year in (2000,2000+1,2000+2,2000+3) group by substr(i_item_desc,1,30),i_item_sk,d_date having count(*) >4 ), max_store_sales as (...), best_ss_customer as (...) select sum(sales) from ( select cs_quantity*cs_list_price sales from catalog_sales ,date_dim where d_year = 2000 and d_moy = 2 and cs_sold_date_sk = d_date_sk and cs_item_sk in (select item_sk from frequent_ss_items) and cs_bill_customer_sk in (select c_customer_sk from best_ss_customer) union all select ws_quantity*ws_list_price sales from web_sales ,date_dim where d_year = 2000 and d_moy = 2 and ws_sold_date_sk = d_date_sk and ws_item_sk in (select item_sk from frequent_ss_items) and ws_bill_customer_sk in (select c_customer_sk from best_ss_customer) ) y limit 100;
显然在执行阶段,我们希望frequent_ss_items
仅被执行一次,执行后的数据可以缓存,然后分别执行后续和catalog_sales
以及web_sales
表的 Join 操作。针对这个场景,Spark 提供了 Exchange 复用的优化,期待的执行计划简化如下所示:
但该优化依赖算子 Plan 中各个物理操作的hashCode
来确定实际运行时是否可以复用。我们定位并解决了 Paimon 中存在的实现问题,使得 Paimon 可以使用到 Spark 提供的 Exchange 复用的优化,从而减少不必要的冗余计算,也降低了 IO 和网络的开销。仅应用到该优化后,Q23运行时间减少到原来的~50%,1T TpcDS 数据集的查询性能整体提升13+%;
2.3 动态调整 Scan 并发
任务实际执行时的并发度是影响作业运行性能的关键之一。Spark 提供了spark.sql.shuffle.partitions
参数来调整 Join 或者 Agg 等算子的并发,也提供了自适应查询执行(Adative Query Execution,AQE)框架动态调整并发,但这些都无法影响到读取数据源 Scan 阶段的并发。
在 DataSource V2的框架下,数据源的 Scan 方式包括并发完全由 DataSource 自己决定。我们以 TpcDS Q19为例:
select i_brand_id brand_id, i_brand brand, i_manufact_id, i_manufact, sum(ss_ext_sales_price) ext_price from date_dim, store_sales, item,customer,customer_address,store where d_date_sk = ss_sold_date_sk and ss_item_sk = i_item_sk and i_manager_id=8 and d_moy=11 and d_year=1998 and ss_customer_sk = c_customer_sk and c_current_addr_sk = ca_address_sk and substr(ca_zip,1,5) <> substr(s_zip,1,5) and ss_store_sk = s_store_sk group by i_brand ,i_brand_id ,i_manufact_id ,i_manufact order by ext_price desc ,i_brand ,i_brand_id ,i_manufact_id ,i_manufact limit 100 ;
其中customer_address
和store
基于substr(ca_zip,1,5) <> substr(s_zip,1,5)
条件 Join。
在未引入 CBO 对 join 重排序的情况下,这两张表通过 BroadcastNestedLoopJoin 来实现,没有引入 Exchange 调整 Join 的并发。执行计划如下图所示:
在未引入优化之前,由于customer_address
表的数据分片较小,但任务计算负载较高(数据 Join 后严重膨胀),整体执行性能很差。
Paimon 根据这种问题提供了基于当前作业的可用 core 数来动态调整数据源的数据分片的能力,也进而调整并发,从而提升查询效率。
仅应用该优化后,Q19运行时间减少到原来的~25%,1T TpcDS 数据集的查询性能整体提升14+%;
2.4 合并标量子查询
类似于 Exchange 复用,合并标量子查询优化会遍历整个 SQL 逻辑执行计划,提取出标量子查询(ScalarSubQuery),尝试将多个标量子查询合并起来,使得仅执行一次子查询得到多个标量值。
我们以 TpcDS Q9 的片段为例,整个 Q9 由5个 case-when 语句构成。
select case when (select count(*) from store_sales where ss_quantity between 1 and 20) > 74129 then (select avg(ss_ext_discount_amt) from store_sales where ss_quantity between 1 and 20) else (select avg(ss_net_paid) from store_sales where ss_quantity between 1 and 20) end bucket1 from reason where r_reason_sk = 1;
在该 SQL 中,case when
的条件,then
和else
语句三个部分使用同样的过滤条件读取同一张表,仅聚合表达式不同。在没有应用到这个优化的情况下,执行计划如下所示:
Spark 本身提供了MergeScalarSubQueries
的优化规则,但从实现上没法更好的对接到 Paimon 这样的 DataSource V2 表,因此我们在 Paimon 侧单独实现,并通过 Spark 提供的 Extensions 的接口将 Paimon 自实现的优化注入到了 Spark 优化器中。在应用该优化后,执行计划如下所示:
由此可见,合并标量子查询优化有效的减少了冗余的计算,提升了 Paimon 在该场景下的查询性能。仅应用该优化后,Q9 运行时间减少到原来的~57%。
2.5 Cost-Based 优化
Spark SQL 允许使用基于成本的优化(Cost-Based Optimizer,CBO)来提升查询性能,主要用于多路 Join 的场景,使用动态规划算法来选择 Cost 最低的 Join 顺序。要想使得这个优化能更有效,依赖于计算 Cost 的模型,以及表的表级和列级统计信息的收集,而其中列级统计信息在评估 Plan 算子节点的运行时统计信息中尤为重要。
新版本的 Paimon 在元数据中增加了 statistics 的信息,可以通过原生的 Spark Analyze 命令完成收集,并对接到了 Spark SQL,使得 Spark SQL 可以利用 Paimon 的表级/列级信息进行查询优化。我们以 TpcDS Q24a 为例:
with ssales as (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid from store_sales, store_returns, store, item, customer, customer_address where ss_ticket_number = sr_ticket_number and ss_item_sk = sr_item_sk and ss_customer_sk = c_customer_sk and ss_item_sk = i_item_sk and ss_store_sk = s_store_sk and c_current_addr_sk = ca_address_sk and c_birth_country <> upper(ca_country) and s_zip = ca_zip and s_market_id=8 group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size) select c_last_name, c_first_name, s_store_name, sum(netpaid) paid from ssales where i_color = 'peach' group by c_last_name, c_first_name, s_store_name having sum(netpaid) > (select 0.05 * avg(netpaid) from ssales) order by c_last_name, c_first_name, s_store_name;
其中 CTEssales
的部分,仅提供表级统计信息的情况下的执行计划大致如下所示,包括两个 SortMergeJoin,其中左侧虚线框更是大数据量间的 Join 操作,严重影响性能。
而执行 Analyze 提供了列级统计信息后执行计划大致如下所示,对参与 Join 的表进行了重排序,且所有 Join 都是 BroadcastHashJoin 的方式执行。
Paimon 提供了完整的 statistics,借助于 CBO 框架,不仅可以提升相应的查询性能,也可以使得在正常资源配置下无法跑通的 SQL 能够正常运行,比如 TpcDS Q72。在之前优化项基础上叠加应用该优化后,Q24运行时间减少到原来的~23%,1T TpcDS 数据集的查询性能整体提升~30%;
相关代码:
3. 优化效果
本文使用阿里云 EMR 5.16.0版本,集群节点的属性如下:
- master: 1 * ecs.g7.8xlarge 32 vCPU 128 GiB
- core: 6 * ecs.g7.8xlarge 32 vCPU 128 GiB
使用的组件及版本如下:
- Paimon: 0.8-SNAPSHOT (对应到 commit:193df7345aa520f8b45125cdd85588a91a3fc3a9)
- Spark: 3.3.1 (额外 cherry-pick SPARK-41378,以支持 DataSource V2 下的 stats 相关功能)
启用的 Spark 相关配置:
spark.executor.cores |
4 |
spark.executor.memory |
14g |
spark.executor.memoryOverhead |
2g |
spark.dynamicAllocation.enabled |
true |
spark.sql.cbo.enabled |
true |
spark.sql.cbo.joinReorder.enabled |
true |
spark.sql.autoBroadcastJoinThreshold |
128m |
Paimon 表选用 append 表(无主键表),使用 parquet 作为文件格式,设置 bucket=-1(最新代码已经默认设置:PAIMON-2829),这样便于和 Spark parquet 表进行对比。
上图为我们使用 parquet 表(带有表级统计信息,即 rowCount 和 sizeInByte 两个指标)作为基准,以此向右分别为优化前和应用这些优化后的 Paimon 表(仅带表统计信息),以及 Parquet 表和 Paimon 表在收集到 Column 级别统计信息时的查询较基准的性能对比。
对比可见,在一般情况下(无 column 级统计信息)优化后的 Paimon 和 Parquet 已经基本持平。开启 column 级统计信息后,Paimon 较 Parquet 慢~8%,这中间的差距也将是性能优化继续跟进的方向之一。
4. 后续规划
在湖仓体系下,我们认为读写查询优化一直是一项任重而道远的事情。当前的优化主要集中在让 Paimon 充分利用到 Spark SQL 现有的优化规则或者优化框架。在继续推进的同时,我们也会利用 Paimon 自身的特性,比如 Index 或者 Clustering 等,以及优化 Scan 等进一步提升 Paimon 性能。
另外,在当前湖仓场景下,依然有很多无主键表的使用,后续对 append 表支持 Upsert 能力也是重要的规划之一。
EMR Serverless Spark 是 EMR 基于 Spark 提供的一款全托管、一站式的数据计算平台。它为用户提供任务开发、调试、发布、调度和运维等全方位的产品化服务,显著简化了大数据计算的工作流程,使用户能更专注于数据分析和价值提炼。
免费测试 >>
阿里巴巴数据湖技术交流群