Spark SQL向量化执行引擎框架Gluten-Velox在AArch64使能和优化
内容分析:
1. 技术背景
2. 算子库构成
3. 算子操作优化
4. 未来工作
01、技术背景
下图描述了当今大数据相关典型使用场景下,用户使用较多的大数据软件栈的构成:
1.1 相关技术
在大数据领域,Hadoop 三件套(HDFS 用于分布式文件系统、YARN 用于集群的资源管理、MapReduce 分布式计算框架)广为人知。
在此基础之上,用于使用类似与 Query的 Work 会在这三件套之上部署Hive 的Query 引擎。同时,在 Hive 中集成相应的 Mysql 作为元数据的存储。对于 Hive 而言,其前端有一些 Query的输入,其内置了 Metastore 存储 Table,还有自己的 Schema Management和 Plan的转化模块。
1.2 技术瓶颈
该架构主要的性能瓶颈在于,MapReduce作为分布式的计算框架,其在中间处理环节以及整个 MapReduce的过程中都会进行不停的读写磁盘,这会严重影响性能。此外,MapReduce对错误会实时进行 Snapshop 到磁盘,出错后会读回,这些无效的磁盘 IO 使之受困于用户夜间处理 Log 的 ETL 场景,对于实时在线分析场景十分受限,性能影响非常大。
Spark作为热门大数据处理框架应运而生,现今几乎所有的用户都从 MapReduce 切换到 Spark。
1.3 Spark 框架
其所有的数据都 Cache 在内存中,虽也会做少量的磁盘读写,但在消费过程中,相较于 Map Reduce,其减少的磁盘的读写约少于后者两到三个数量级,速度非常快。同时,Spark 也内置了基于 RDD 内存的模型Spark core。此外,SparkSQL 作为高性能的 Query 引擎逐步替代了 Hive。社区对 Hive 做过很多历史性的部署,包含很多Udf Query的代码。为实现代码兼容,可以通过原生的 Hive代码直接使用Spark SQL加速查询执行。
用户使用较多的还有消息处理 Kafka 和实时的 Streaming工具 Apache Flink。Spark 也在做Spark streaming 模块,用户可基于自身需求进行选择。此外,还有类似于 NoSQL 的 Hbase,可提供低代码的数据库访问。
在大数据模块中,每个用户都可以构建自己的大数据栈,使数据在其中流转。在数据的流转过程中,还存在很多历史问题。如序列化和反序列化,其格式不同,会存在某些序列化和反序列化的无效 CPU 的 SQL,社区在底部采用Apache Arrow作为统一内存列格式,大大减少序列化和反序列化的过程。
最初,Arrow 在 JAVA的系统中是作为了一种 JNI 的方式。而 JNI 与使用 Native Gluten-Velox 加速密切相关。由于大数据系统基于 JVM,它支持通过 JNI 的方式直接调用 CPU 新的 Extension 指令,而 JDK对新的 CPU Extension 指令的支持有滞后。因此,JNI可作为一种加速系统的重要手段,使新的指令快速 upgrade到新的JDK版本中。
1.4 SparkSQL
(1)执行流程
Spark SQL 属于 Spark Query的模块,可以通过 JDBC 连接数据库,它也提供了Console,用户可以直接通过控制台进行 Query。还有非常多的类似于 UDF 的用户自定义代码做 Query 查询,这些代码可以是 Scala的。通过这些Query,Sparksql可以将其转化为 Dataframe(Query最初步的一种软件抽象)的形式,将所有的内容落地到代码上。因此,Query 可能较小,其 DataFrame 的接口可以实现数据帧的抽象。然后,通过 DataFrame 在SparkSQL 中用逻辑的plan 输入到 catalyst引擎作为物理 plan 的优化。最后,通过 SparkSQL自身的 Execution 引擎做全过程的 Cover 件。由于 Plan 是JAVA base的,所以 Cover出的都是JAVA 相关的代码。整个过程把前端的 Query 输出到 JAVA代码。
上图中右下角过程涉及很多 Operators 算子库。当用户查询时,可从高到低排序,最终调用 JAVA 实现的 Operators。每个客户对自己的SparkSQL都有不同的优化。从公开的资料看,其优化最终都会遇到纯JAVA 优化共同的瓶颈,较难利用 CPU 的 SMID 实现相应的优化。
因此,面对客户优化的需求,有两种应对方式。其一,使用较新的 JDK future,类似于Vector API;其二,直观调用 SMID或Crippled Extension指令的 Native 库实现。
(2)JNI与Vector API
针对以上情况,Luhenry 是Spark的Commuter,他基于 Netlib 数据库利用原生的 C++ 实现了数据库的加速。同时,也通过Vector API 特性在 JDK上实现了向量化的Netlib。在提供Benchmark 后,内部的 JDK 工程师在Arm N2 上做过 Benchmark,如下图所示:
蓝色部分是Vector API 实现,橘色部分是 JNI 实现,灰色是 JAVA 实现。分析结果可以看到,当数据量较小时,Vector API性能较好,因为JNI因使用 JAVA 调用Native,开销远超于对 CPU 加速的性能提升。
当数据量逐步变大后,在 JDK 17上的测试结果显示,JNI 优势更加明显。这是因为 JDK 的 Vector API 受 Java 语义限制,无法高效调用底层的代码。此外,JDK 还有JAVA代码以及 GC 的Overhead。
综上,当数据量较大时,选择通过 JNI 的方式直接优化调用的 SMID 的指令;当数据量较小时,可以使用 Vector API。
在同客户联调时发现,使用 JDK 23,Vector API 的性能会由于 JNI。在JDK 20上测试时,我们在发现了 Vector API 的短板之后,在JAVA 中进行了各类子项目的逐步优化,在最近的 JDK 23中,其性能得到了长足的进步。但是,若要选用 Vector API,JDK需升级至最新版本。这种大规模的用户场景难以轻易实现最新版本的升级。其次,Vector API中所有的JAVA 代码都要做适配和重构。因为原本是 Scala 的形式,而现在需要调用不同 API 进行优化。在优化结束后,还需要额外承担调优、测试以及 JDK 的成本,且还可能伴随其他的风险,如 JDK 可能在这个Work Lose上提升,在其他Work Lose上反而会回退。
目前,国内外业界主流各大互联网公司对SparkSQL多通过 JNI 的方式直接在大数据量的情况下以 Gluten+Velox 的形式进行 native 算子库加速而不选择 Vector API 。这种情况可能在将来会发生变化,但在遭遇瓶颈之后,算子库开源是业界共同的需求。
综上,不难发现 JNI 在Gluten+Velox项目在社区和应用场景中广泛使用。
02、算子库构成
2.1 Gluten
在拉丁语中,Glue 是粘合剂。因此,Gluten可以粘合后端的算子(如Velox),以及粘合前端的JAVA SQL。它不需要 Spark的开发者做任何的代码迁移,可以直接使用后端的算子库。其算子库除 Velox 之外,还有 ClickHouse、Arrow(可提供自己的 Compute 算子库)。如果成本预算较高,也可以使用 FPGA 硬件逻辑上的算子。
(1)Fallback
Fallback 是 Gluten的主要功能之一,所有的算子可在Spark Executor到每个Task。如果 Clickhouse 和 Velox 算子能被承接,所有的算子都Offload到 JNI 三方库上,否则 Fallback 到Spark原生算子。网易的相关工程师也在公开的会议上分享了关于 Velox 和 Gluten 加速SparkSQL 的情况,他们提到,现在对于 Fallback 情况下性能的惩罚很高。因此,他们建议先运行主要业务逻辑,再将有无 Fallback 的进行区分。进行了 Fallback的按照原始方案继续运行,否则全部进行 Gluten,一定程度上提升了 Gluten。
(2)Memory Management
Gluten 会把 Native 内存空间纳入到 JDK 中的Memory Manager中进行统一管理。对于真正的用户说来,可以忽略。
2.2 Velox
Velox是基于 Meta 开源出的算子库,其特别之处在于:运行 Benchmark后需要内嵌Substtrait Plan(Physical Plan的再优化版)。然后,Gluten承接并进行 Fallback 做 Plan 转换工作。
Gluten 和 Velox 的具体实现细节可在GitHub上查看。
03、算子操作优化
基于客户需求,开始关注到 Gluten + Velox 在 Arm 上的使能和优化。
3.1 优化工作
首先,做了 Upstream,即对 Gluten + Velox 的编译使能优化。Gluten的构建基于英特尔,很多开发者只在 X86 平台上进行验证。因此,后续的工作重点也可能在于CSED的建立上。其次,第三方库的编译。由于目前 Gluten 项目非常大,编译系统纷繁复杂,分为动态编译和静态编译,动态编译需 Local 编译的 Host,即相关的第三方库,可能会出现 Arm不兼容。此外,还实现了 TPC-DS 的Bench Mapped Script 的使能。
3.2 性能评估
上图是在倚天 710 ECS云实例上的性能评估结果。图中显示,相较于纯 Java 实现的后端,Gluten + Velox后端在运行多个查询时性能提升显著,平均最低提升 25% 以上,最高的 Query 4 提升达 80+%。
3.3 优化方向
上图是微软公开分享的TPC-DS工作负载热点函数占比图,该图可以指导后续 Velox 算子的性能优化方向分析表明,其后端 Velox算子库中近 60% 是与 IO 相关的 Scan 操作。
算子库中有很多类似于 Hash Join、Hash Build 的操作,由于 Hash Join需要等值判断应扫描的键值,把两个表 Join 并 Output,因此,需要进行大量的 Memory 相关的工作。CPU 相关的包括Hash Aggregation、Hash Join 等操作。对于 Memory-Bound 的优化,微软仍未开始,提供的只限于 CPU-Bound。该饼图可作为后续的验证参考。
此外,还要思考Memory Latency 相关的问题。在 Arm 上的 Memory Bond Wise的Profiling 是一项极具挑战的工作。
3.4 Velox 算子优化案例
以 TPC-DS 的 Query 23 为例:
运行 Velox 的 Benchmark,需输入 Substrait Plan,然后,在Spark中配置相应的 Task Stage ID,不需要配置 Partition ID,因为 RDD 不做分区,默认为零。选取 Query 23 中最耗时的 Stage37 和 Task53,运行 TPC-DS 相应的 Benchmark,得到上图底部的数据,包括 Data 和Plan 的数据以及 Plan 的 Configure 文件作为 Benchmark 的输入。Hash Join中有相关的子操作HashProbe和Aggregation,其中有很多 TobitMask 操作。
对于操作 x86,通过一条指令 Pmovmskb 实现,Latency较低,Arm 未做相应的优化,其中有很多 For 循环以及逻辑或、右移等操作,即使完成了编译器优化,其性能相较于 X86 仍较差。
针对以上问题,采取了如下操作:
假定了一个 128 Bit的向量(16进制),Epi8数据宽度是 1 个 Byte,通过命令_mm_movemask_epi8(comp) = 0b0000000010010000获取每个 byte 最高位,但如果大部分应用场景都以该字节是否被置位做 flag 用,如果与 flag端发生了置位,后续在调用端做代码的分支,可能会执行其他的代码流程。如果 128 Bit 的向量数据宽度是 1 个 Byte,最后返回 16 Bit;如果数据宽度是 4 个 Byte,最后返回 4 Bit。通过这条指令可以最多返回 16 Bit的位图。原生的代码只对数据宽度为 1 Byte的做等效替代,其中有近似的表进行 Load,较为耗时,还有 Add操作。其他的数据宽度则没有等效替代。
针对上述情况,我们采用 Arm Neon shrn 指令,通过逻辑右移和截取操作,并保持两者的位数统一,案例中以 4 位为例,如果前面是 8 个 0,右移四位,再截取四位,最终形成 64 位的位图,每四位对应上面一位。调用侧只做轻量化适配即可等效替代 X86 的 Pmovmskb 指令。有的调用在获取位图后判断零或一的数量等操作,与原来调用的区别仅在于另外除以 4,多右移两位,性能提升较大。其 Pipeline只用到了Neon的 SMID的,但效能的提升效率足以掩盖这个不足。
以下是一个 HashProbe 操作:
经过子操作的优化,性能提升2.74% - 6.7%,Velox 中有大量的操作,我们的后续目标也是对于每个类似的操作进行优化,聚沙成塔,形成合力,在 Arm 上提升 Velox 性能。
对于 Velox 自带的Filter Benchmark,可以理解为其自带的Micro Benchmark,代表统计位图中一的个数,其性能提升约 20%。
04、未来工作
未来,我们将与CE团队合作,在 Arm 上持续使能 Velox 社区。首先,建立 CI/CD,因多数工程师在 x86 环境验证,建立后可更好支持 Arm 开发。其次,深入优化 Velox 中的 CPU 相关操作,如 SMID 等相关的可优化点。同时,适配底层接口变化带来的Benchmark 变化,提升Benchmark 测试套件。最后,针对 Velox 在 Arm 上的瓶颈(主要是Memory Bound 问题)进行 EQE 测试,做针对性优化,降低相关工作负载。
以上就是本次分享的全部内容。