EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
EMR Serverless Spark 免费试用,1000 CU*H 有效期3个月
简介: SparkSQL多年来的性能优化集中在Optimizer和Runtime两个领域。前者的目的是为了获得最优的执行计划,后者的目的是针对既定的计划尽可能执行的更快。

作者:周克勇,花名一锤,阿里巴巴计算平台事业部EMR团队技术专家,大数据领域技术爱好者,对Spark有浓厚兴趣和一定的了解,目前主要专注于EMR产品中开源计算引擎的优化工作。


背景和动机

SparkSQL多年来的性能优化集中在Optimizer和Runtime两个领域。前者的目的是为了获得最优的执行计划,后者的目的是针对既定的计划尽可能执行的更快。

相比于Runtime,Optimizer是更加通用的、跟实现无关的优化。无论是Java世界(Spark, Hive)还是C++世界(Impala, MaxCompute),无论是Batch-Based(Spark, Hive)还是MPP-Based(Impala, Presto),甚至无论是大数据领域还是传统数据库领域亦或HTAP领域(HyPer, ADB),在Optimizer层面考虑的都是非常类似的问题: Stats收集,Cost评估以及计划选择;采用的优化技术也比较类似,如JoinReorder, CTE, GroupKey Elimination等。尽管因为上下文不同(如是否有索引)在Cost Model的构造上会有不同,或者特定场景下采用不同的空间搜索策略(如遗传算法 vs. 动态规划),但方法大体是相同的。

长期以来,Runtime的优化工作基本聚焦在解决当时的硬件瓶颈。如MapReduce刚出来时网络带宽是瓶颈,所以Google做了很多Locality方面的优化;Spark刚出来时解决的问题是磁盘IO,内存缓存的设计使得性能相比MapReduce有了数量级的提升;后来CPU成为了新的瓶颈[1],因此提升CPU性能成了近年来Runtime领域重要的优化方向。

提升CPU性能的两个主流技术是以MonetDB/X100[2](如今演化为VectorWise[3])为代表的向量化(Vectorized Processing)技术和以HyPer[5][6]为代表的代码生成(CodeGen)技术(其中Spark跟进的是CodeGen[9])。简单来说,向量化技术沿用了火山模型,但与其让SQL算子每次计算一条Record,向量化技术会积攒一批数据后再执行。逐批计算相比于逐条计算有了更大的优化空间,例如虚函数的开销分摊,SIMD优化,更加Cache友好等。这个技术的劣势在于算子之间传递的数据从条变成了批,因此增大了中间数据的物化开销。CodeGen技术从另外一个角度解决虚函数开销和中间数据物化问题:算子融合。简单来说,CodeGen框架通过打破算子之间的界限把火山模型“压平”了,把原来迭代器链压缩成了大的for循环,同时生成语义相同的代码(Java/C++/LLVM),紧接着用对应的工具链编译生成的代码,最后用编译后的class(Java)或so(C++,LLVM)去执行,从而把解释执行转变成了编译执行。此外,尽管还是逐条执行,由于抹去了函数调用,一条Record从(Stage内的)初始算子一直执行到结束算子都基本处于寄存器中,不会物化到内存。CodeGen技术的劣势在于难以应用SIMD等优化。

两个门派相爱相杀,在经历了互相发论文验证自家优于对方后[4][8]两家走向了合作,合作产出了一系列项目和论文,而目前学界的主流看法也是两者融合是最优解,一些采用融合做法的项目也应运而生,如进化版HyPer[6], Pelonton[7]等。

尽管学界已走到了融合,业界主流却没有很强的动力往融合的路子走,探究其主要原因一是目前融合的做法相比单独的优化并没有质的提升;二是融合技术目前没有一个广为接受的最优做法,还在探索阶段;三是业界在单一的技术上还没有发挥出最大潜力。以SparkSQL为例,从2015年SparkSQL首次露面自带的Expression级别的Codegen,到后来参考HyPer实现的WholeStage Codegen,再经过多年的打磨,SparkSQL的Codegen技术已趋成熟,性能也获得了两次数量级的跃升。然而,也许是出于可维护性或开发者接受度的考虑,SparkSQL的Codegen一直限制在生成Java代码,并没有尝试过NativeCode(C/C++, LLVM)。尽管Java的性能已经很优,但相比于Native Code还是有一定的Overhead,并缺乏SIMD(Java在做这方面feature),Prefetch等语义,更重要的是,Native Code直接操作裸金属,易于极致压榨硬件性能,对一些加速器(如GPU)或新硬件(如AEP)的支持也更方便。

基于以上动机,EMR团队探索并开发了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来20%左右的性能提升,为EMR再次获取世界第一立下汗马功劳,本文讲详细介绍Native Codegen框架。

核心问题

做Native Codegen,核心问题有三个:
1.生成什么?
2.怎么生成?
3.如何集成到Spark?

生成什么

针对生成什么代码,结合调研的结果以及开发同学的技术栈,有三个候选项:C/C++, LLVM, Weld IR。C/C++的优势是实现相对简单,只需对照Spark生成的Java代码逻辑改写即可,劣势是编译时间过长,下图是HyPer的测评数据,C++的编译时间比LLVM高了一个数量级。compile time.jpg
编译时间过长对小query很不友好,极端case编译时间比运行时间还要长。基于这个考虑,我们排除了C/C++选项。上图看上去LLVM的编译时间非常友好,而且很多Native CodeGen的引擎,如HyPer, Impala, 以及阿里云自研大数据引擎MaxCompute,ADB等,均采用了LLVM作为目标代码。LLVM对我们来说(对你们则不一定:D)最大的劣势就是过于底层,语法接近于汇编,试想用汇编重写SparkSQL算子的工作量会有多酸爽。大多数引擎也不会用LLVM写全量代码,如HyPer仅把算子核心逻辑用LLVM生成,其他通用功能(如spill,复杂数据结构管理等)用C++编写并提前编译好。即使LLVM+C++节省了不少工作量,对我们来说依然不可接受,因此我们把目光转向了第三个选项: Weld IR(Intermediate Representation)。

首先简短介绍以下Weld。Weld的作者Shoumik Palkar是 Matei Zaharia的学生,后者大家一定很熟悉,Spark的作者。Weld最初想解决的问题是不同lib之间互相调用时数据传输的开销,例如要在pandas里调用numpy的接口,首先pandas把数据写入内存,然后numpy读取内存进行计算,对于极度优化的lib来说,内存的写入和读取的时间可能会远超计算本身。针对这个问题,Weld开发了Common Runtime并配套提供了一组IR,再加上惰性求值的特性,只需(简单)修改lib使其符合Weld的规范,便可以做到不同lib共用Weld Runtime,Weld Runtime利用惰性求值实现跨lib的Pipeline,从而省去数据物化的开销。Weld Runtime还做了若干优化,如循环融合,循环展开,向量化,自适应执行等。此外,Weld支持调用C代码,可以方便调用三方库。

我们感兴趣的是Weld提供的IR和对应的Runtime。Weld IR面向数据分析进行设计,因此语义上跟SQL非常接近,能较好的表达算子。数据结构层面,Weld IR最核心的数据结构是vec和struct,能较好地表达SparkSQL的UnsafeRow Batch;基于struct和vec可以构造dict,能较好的表达SQL里重度使用的Hash结构。操作层面,Weld IR提供了类函数式语言的语义,如map, filter, iterator等,配合builder语义,能方便的表达Project, Filter, Agg, BroadCastJoin等算子语义。例如,以下IR表达了Filter + Project语义,具体含义是若第二列大于10,则返回第一列:

|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b))

以下IR表达了groupBy的语义,具体含义是按照第一列做groupBy来计算第二列的sum:

|v: vec[{i32,i32}]| for(v,dictmerger[i32,i32,+],|b,i,n| merge(b,{n.$0,n.$1}))

具体的语法定义请参考Weld文档(https://github.com/weld-project/weld/blob/master/docs/language.md)。
Weld 开发者API提供了两个核型接口:

  1. weld_module_compile, 把Weld IR编译成可执行模块(module)。
  2. weld_module_run, 执行编译好的模块。

基本流程如下图所示,最终也是生成LLVM代码。
weld pipeline.png

由此,Weld IR的优势就显然易见了,既兼顾了性能(最终生成LLVM代码),又兼顾了易用性(CodeGen Weld IR相比LLVM, C++方便很多)。基于这些考虑,我们最终选择Weld IR作为目标代码。

怎么生成

SparkSQL原有的CodeGen框架之前简单介绍过了,详见https://developer.aliyun.com/article/727277。我们参考了Spark原有的做法,支持了表达式级别,算子级别,以及WholeStage级别的Codegen。复用Producer-Consumer框架,每个算子负责生成自己的代码,最后由WholeStageCodeGenExec负责组装。

这个过程有两个关键问题:

1.算子之间传输的介质是什么?
2.如何处理Weld不支持的算子?

传输介质

不同于Java,Weld IR不提供循环结构,取而代之的是vec结构和其上的泛迭代器操作,因此Weld IR难以借鉴Java Codegen在Stage外层套个大循环,然后每个算子处理一条Record的模式,取而代之的做法是每个算子处理一批数据,IR层面做假物化,然后依赖Weld的Loop-Fusion优化去消除物化。例如前面提到的Filter后接Project,Filter算子生成的IR如下,过滤掉第二列<=10的数据:

|v:vec[{i32,i32}]| let res_fil = for(v,appender,|b,i,n| if(n.$1>10, merge(b,n), b)

Project算子生成的IR如下,返回第一列数据:

let res_proj = for(res_fil,appender,|b,i,n| merge(b,n.$0))

表面上看上去Filter算子会把中间结果做物化,实际上Weld的Loop-Fusion优化器会消除此次物化,优化后代码如下:

|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b))

尽管依赖Weld的Loop-Fusion优化可以极大简化CodeGen的逻辑,但开发中我们发现Loop-Fusion过程非常耗时,对于复杂SQL(嵌套3层以上)甚至无法在有限时间给出结果。当时面临两个选择:修改Weld的实现,或者修改CodeGen直接生成Loop-Fusion之后的代码,我们选择了后者。重构后生成的代码如下,其中1,2,11行由Scan算子生成,3,4,5,6,8,9,10行由Filter算子生成,7行由Project算子生成。

|v: vec[{i32,i32}]|
    for(v,appender,|b,i,n|
        if(
            n.$1 > 10, 
            merge(
            b,
            n.$0
            ), 
            b
        )
    )

这个优化使得编译时间重回亚秒级别。

Fallback机制

受限于Weld当前的表达能力,一些算子无法用Weld实现,例如SortMergeJoin,Rollup等。即使是原版的Java CodeGen,一些算子如Outter Join也不支持CodeGen,因此如何做好Fallback是保证正确性的前提。我们采用的策略很直观:若当前算子不支持Native CodeGen,则由Java CodeGen接管。这里涉及的关键问题是Fallback的粒度:是算子级别还是Stage级别?

抛去实现难度不谈,虽然直观上算子粒度的Fallback更加合理,但实际上却会导致更严重的问题:Stage内部Pipeline的断裂。如上文所述,CodeGen的一个优势是把整个Stage的逻辑Pipeline化,打破算子之间的界限,单条Record从初始算子执行到结束算子,整个过程不存在物化。而算子粒度的Fallback则会导致Stage内部一部分走Native Runtime,另一部分走Java Runtime,则两者连接处无可避免存在中间数据物化,这个开销通常会大于Native Runtime带来的收益。

基于以上考虑,我们选择了Stage级别的Fallback,在CodeGen阶段一旦遇到不支持的算子,则整个Stage都Fallback到Java CodeGen。统计显示,整个TPCDS Benchmark,命中Native CodeGen的Stage达到80%。

Spark集成

完成了代码生成和Fallback机制,最后的问题就是如何跟Spark集成了。Spark的WholeStageCodegenExec的执行可以理解为一个黑盒,无论上游是Table Scan,Shuffle Read,还是BroadCast,给到黑盒的输入类型只有两种: RowBatch(上游是Table Scan)或Row Iterator(上游非Table Scan),而黑盒的输出固定为Row Iterator,如下图所示:
Spark Task.png

上文介绍我们选择了Stage级别的Fallback,也就决定了黑盒要么是Java Runtime,要么是Native Runtime,不存在混合的情况,因此我们只需要关心如何把Row Batch/Row Iterator转化为Weld认识的内存布局,以及如何把Weld的输出转化成Row Iterator即可。为了进一步简化问题,我们注意到,尽管Shuffle Reader/BroadCast的输入是Row Iterator,但本质上远端序列化的数据结构是Row Batch,只不过Spark反序列化后转换成Row Iterator后再喂给CodeGen Module,RowBatch包装成Row Iterator非常简易。因此Native Runtime的输入输出可以统一成RowBatch。

解决办法呼之欲出了:把RowBatch转换成Weld vec!但我们更进了一步,何不直接把Row Batch喂给Weld从而省去内存转换呢?本质上Row Batch也是满足某种规范的字节流而已,Spark也提供了OffHeap模式把内存直接存堆外(仅针对Scan Stage。Shuffle数据和Broadcast数据需要读到堆外),Weld可以直接访问。Spark UnsafeRow的内存布局大致如下:
Unsaferow.png
针对确定的schema,null bitmap和fixed-length data的结构是固定的,可以映射成struct,而针对var-length data我们的做法是把这些数据copy到连续的内存地址中。如此一来,针对无变长数据的RowBatch,我们直接把内存块喂给Weld;针对有变长部分的数据,我们也只需做大粒度的内存拷贝(把定长部分和变长部分分别拷出来),而无需做列级别的细粒度拷贝转换。

继续举前文的Filter+Project的例子,一条Record包含两个int列,其UnsafeRow的内存布局如下(为了对齐,Spark里定长部分最少使用8字节)。
unsaferow.jpg
显而易见,这个结构可以很方便映射成Weld struct:

{i64,i64,i64}

而整个Row Batch便映射成Weld vec:

vec[{i64,i64,i64}]

如此便解决了Input的问题。而Weld Output转RowBatch本质是以上过程的逆向操作,不再赘述。

解决了Java和Native之间的数据转换问题,剩下的就是如何执行了。首先我们根据当前Stage的Mode来决定走Java Runtime还是Native Runtime。在Native分支,首先会执行StageInit做Stage级别的初始化工作,包括初始化Weld,加载编译好的Weld Module,拉取Broadcast数据(若有)等;接着是一个循环,每个循环读取一个RowBatch(来自Scan或Shuffle Reader)喂给Native Runtime执行,Output转换并喂给Shuffle Writer。如下图所示:
native execution.png

总结

本文介绍了EMR团队在Spark Native Codegen方向的探索实践,限于篇幅若干技术点和优化没有展开,后续可另开文详解,例如:

1.极致Native算子优化
2.数据转换详解
3.Weld Dict优化

大家感兴趣的任何内容欢迎沟通: )

[1] Making Sense of Performance in Data Analytics Frameworks. Kay Ousterhout
[2] MonetDB/X100: Hyper-Pipelining Query Execution. Peter Boncz
[3] Vectorwise: a Vectorized Analytical DBMS. Marcin Zukowski
[4] Efficiently Compiling Efficient Query Plans for Modern Hardware. Thomas Neumann
[5] HyPer: A Hybrid OLTP&OLAP Main Memory Database System Based on Virtual Memory Snapshots. Alfons Kemper
[6] Data Blocks: Hybrid OLTP and OLAP on Compressed Storage using both Vectorization and Compilation. Harald Lang
[7] Relaxed Operator Fusion for In-Memory Databases: Making Compilation, Vectorization, and Prefetching Work Together At Last. Prashanth Menon
[8] Vectorization vs. Compilation in Query Execution. Juliusz Sompolski
[9] https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html


相关阅读推荐:
EMR Spark-SQL性能极致优化揭秘 RuntimeFilter Plus
EMR Spark-SQL性能极致优化揭秘 概览篇


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
image.png

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

image.png

相关文章
|
8月前
|
SQL 分布式计算 Serverless
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
鹰角网络为应对游戏业务高频活动带来的数据潮汐、资源弹性及稳定性需求,采用阿里云 EMR Serverless Spark 替代原有架构。迁移后实现研发效率提升,支持业务快速发展、计算效率提升,增强SLA保障,稳定性提升,降低运维成本,并支撑全球化数据架构部署。
803 56
鹰角网络:EMR Serverless Spark 在《明日方舟》游戏业务的应用
|
2月前
|
SQL 存储 监控
SQL日志优化策略:提升数据库日志记录效率
通过以上方法结合起来运行调整方案, 可以显著地提升SQL环境下面向各种搜索引擎服务平台所需要满足标准条件下之数据库登记作业流程综合表现; 同时还能确保系统稳健运行并满越用户体验预期目标.
196 6
|
2月前
|
SQL 关系型数据库 MySQL
为什么这些 SQL 语句逻辑相同,性能却差异巨大?
我是小假 期待与你的下一次相遇 ~
151 0
|
6月前
|
SQL 关系型数据库 PostgreSQL
CTE vs 子查询:深入拆解PostgreSQL复杂SQL的隐藏性能差异
本文深入探讨了PostgreSQL中CTE(公共表表达式)与子查询的选择对SQL性能的影响。通过分析两者底层机制,揭示CTE的物化特性及子查询的优化融合优势,并结合多场景案例对比执行效率。最终给出决策指南,帮助开发者根据数据量、引用次数和复杂度选择最优方案,同时提供高级优化技巧和版本演进建议,助力SQL性能调优。
594 1
|
6月前
|
人工智能 分布式计算 DataWorks
一体系数据平台的进化:基于阿里云 EMR Serverless Spark 的持续演进
本文介绍了一体系汽配供应链平台如何借助阿里云EMR Serverless Spark实现从传统Hadoop平台向云原生架构的迁移。通过融合高质量零部件供应与创新互联网科技,一体系利用EMR Serverless Spark和DataWorks构建高效数据分析体系,解决大规模数据处理瓶颈。方案涵盖实时数据集成、Lakehouse搭建、数仓分层设计及BI/ML应用支持,显著提升数据处理性能与业务响应速度,降低运维成本,为数字化转型奠定基础。最终实现研发效率提升、运维压力减轻,并推动AI技术深度整合,迈向智能化云原生数据平台。
228 4
|
7月前
|
SQL 存储 自然语言处理
SQL的解析和优化的原理:一条sql 执行过程是什么?
SQL的解析和优化的原理:一条sql 执行过程是什么?
SQL的解析和优化的原理:一条sql 执行过程是什么?
|
9月前
|
SQL 关系型数据库 MySQL
如何优化SQL查询以提高数据库性能?
这篇文章以生动的比喻介绍了优化SQL查询的重要性及方法。它首先将未优化的SQL查询比作在自助餐厅贪多嚼不烂的行为,强调了只获取必要数据的必要性。接着,文章详细讲解了四种优化策略:**精简选择**(避免使用`SELECT *`)、**专业筛选**(利用`WHERE`缩小范围)、**高效联接**(索引和限制数据量)以及**使用索引**(加速搜索)。此外,还探讨了如何避免N+1查询问题、使用分页限制结果、理解执行计划以及定期维护数据库健康。通过这些技巧,可以显著提升数据库性能,让查询更高效流畅。
|
DataWorks 数据挖掘 Serverless
阿里云EMR Serverless StarRocks 内容合集
阿里云 EMR StarRocks 提供存算分离架构,支持实时湖仓分析,适用于多种 OLAP 场景。结合 Paimon 与 Flink,助力企业高效处理海量数据,广泛应用于游戏、教育、生活服务等领域,显著提升数据分析效率与业务响应速度。
271 1
|
8月前
|
存储 分布式计算 OLAP
百观科技基于阿里云 EMR 的数据湖实践分享
百观科技为应对海量复杂数据处理的算力与成本挑战,基于阿里云 EMR 构建数据湖。EMR 依托高可用的 OSS 存储、开箱即用的 Hadoop/Spark/Iceberg 等开源技术生态及弹性调度,实现数据接入、清洗、聚合与分析全流程。通过 DLF 与 Iceberg 的优化、阶梯式弹性调度(资源利用率提升至70%)及倚天 ARM 机型搭配 EMR Trino 方案,兼顾性能与成本,支撑数据分析需求,降低算力成本。
542 59