Gluten + Celeborn: 让 Native Spark 拥抱 Cloud Native

本文涉及的产品
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时计算 Flink 版,5000CU*H 3个月
实时数仓Hologres,5000CU*H 100GB 3个月
简介: 本篇文章介绍了 Gluten 项目的背景和目标,以及它如何解决基于 Apache Spark 的数据负载场景中的 CPU 计算瓶颈。此外,还详细介绍了 Gluten 与 Celeborn 的集成。Celeborn 采用了 Push Shuffle 的设计,通过远端存储、数据重组、内存缓存、多副本等设计,不仅进一步提升 Gluten Shuffle 的性能和稳定性,还使得 Gluten 拥有更好的弹性,从而更好的拥抱云原生。

作者:张凯@阿里云、陳韋廷@Intel、周渊@Intel


简介

Apache Celeborn(Incubating) 是阿里云捐赠给 Apache 的通用 Remote Shuffle Service,旨在提升大数据计算引擎的性能/稳定性/弹性,目前已广泛应用于生产场景。Gluten 是 Intel 开源的引擎加速项目,旨在通过把 Spark Java Engine 替换为 Native Engine(Velox, ClickHouse, Arrow 等)来加速 Spark 引擎。过去一段时间,Gluten 社区和 Celeborn 社区相互合作,成功把 Celeborn 集成进 Gluten,本文将对此加以介绍。


Gluten: 给 Spark 换上 Native 引擎

Gluten 项目旨在解决基于 Apache Spark 的数据负载场景中的 CPU 计算瓶颈。随着 IO 技术的提升,特别是SSD和万兆网卡的普及,CPU 计算瓶颈逐渐成为限制性能的主要因素。然而,基于 JVM 进行 CPU 指令优化相对困难,因为与其他本地语言(如C++)相比,JVM 提供的优化功能较少。


为了解决这个问题,开源社区已经有一些成熟的本地引擎(如 ClickHouse、Velox)具备了优秀的向量化执行能力,可以带来显著的性能优势。但是,这些引擎通常与 Spark 生态系统脱离,对于那些已经严重依赖 Spark 计算框架且无法承受大量运维和迁移成本的用户来说,不够友好。Gluten 项目的目标是使 Spark 用户能够享受这些成熟的本地引擎带来的性能优势,而无需迁移。


Gluten 项目利用 Spark 插件机制,拦截并将查询计划发送给本地引擎执行,从而跳过 Spark 本身不够高效的执行路径。该项目支持多个本地引擎作为后端,包括 Velox、ClickHouse 和 Apache Arrow。对于本地引擎无法处理的操作,Gluten 会回退到 Spark 的正常执行路径。在线程模型方面,Gluten 使用 JNI 调用库的形式,在 Spark 执行器任务线程中直接调用本地代码,避免引入复杂的线程模型。


在内存管理方面,Gluten 能够统一管理本地内存和 JVM 内存,通过本地内存池和任务内存管理器分配内存。当内存不足时,会触发溢出操作,释放内存。此外,Gluten 还提供了完整的列式 Shuffle 机制及及统一API接口用于衔接市场受欢迎的第三方 RemoteShuffleService 如 Celeborn,避免了数据转换开销及提供服务。


为了兼容不同的本地引擎,Gluten 定义了清晰的JNI接口,作为 Spark 框架和底层引擎之间的桥梁。这些接口用于请求传递、数据传输和能力检测等方面的需求。开发者只需实现这些接口,并满足相应的语义要求,即可利用 Gluten 完成 Spark 和本地引擎的整合工作。此外, Spark 的架构设计中还预留了 Shim Layer 来适配支持不同版本的 Spark。




Gluten Columnar Shuffle

Shuffle 本身是影响 Spark 性能的重要一环,这里会引入多次序列化/反序列化,网络传输,磁盘读写,因此要想实现高性能才不至于成为瓶颈。由于 Native Engine 采用列式(Columnar)数据结构暂存数据,如果简单的沿用 Spark 的基于行数据模型的 Shuffle,则会在 Shuffle Write 阶段引入数据列转行的环节,在 Shuffle Read 阶段引入数据行转列的环节,才能使数据可以流畅周转。但是无论行转列,还是列转行的成本都不低。因此,Gluten 必须提供完整的 Columnar Shuffle 机制以避开这里的转化开销。具体到 columnar shuffle 实现层,主要分成 shuffle 数据写入和shuffle 数据读取两块。


Columnar Shuffle 写入

和原生 Spark 一样,Columnar Shuffle 目标是将当前 DAG 产生的临时数据写入磁盘,在下一个 stage 需要将数据读出,也需要支持内存不足时的 spill 操作,优先保证查询的健壮性。与Spark里不同的地方主要有以下几点:

  • Spark 默认采用 row based 格式存储数据,Gluten 里 shuffle 采用了 columnar format 来保存数据。
  • 目前的实现是基于 Arrow format,来做序列化的工作,并且支持自定义压缩格式。采用 columnar format 来实现可以方便的引入 SIMD 指令来做优化
  • Spark 默认采用 sort-based shuffle,而 gluten 里默认采用 hash based shuffle
  • Sort based shuffle 比,hash based 算法复杂度更低,但需要占用更多内存,并且引入很多小文件问题。gluten 里实现 hash based shuffle 时,也参考了 sort based shuffle 的部分设计,减少了小文件过多带来的影响


在一个 TPC-H Like Scale Factor 6TB的测试场景中,Columnar Shuffle Write 和原生Spark 的 row based shuffle 相比,可以达到减少约~12%的 Shuffle Size 的效果。


Columnar Shuffle 读取

在实现 Columnar Shuffle 读取时,Gluten 复用了 Spark 里的 netty based shuffle transfer 机制,只需要提供对应的 de-serializer,将已经写到磁盘上的 shuffle 文件读取上来,并反序列化交给 reducer。Spark 里引入了很多软件栈比如 netty, kryo,导致 reducer 读取时有重复的内存拷贝,Gluten 里也做了一些零拷贝优化来减少这里的软件开销。



Celeborn:解决本地 Shuffle 的限制

Gluten 本地 Shuffle 限制


上图展示了 Gluten Columnar Shuffle 的主流程,其中 Hash-based Shuffle、Native Partitioner、零拷贝等设计是其获得高性能的关键。然而,Gluten 沿用了 Spark 的本地 Shuffle 框架,存在以下主要限制。

  • 依赖大容量本地盘存储 shuffle 数据,一方面无法应用存算分离架构,另一方面计算节点“有状态”无法及时缩容,从而导致难以兼容云原生架构,资源利用率低。
  • Shuffle Write 内存紧张时 Spill 到磁盘,增加额外的磁盘 I/O。
  • Shuffle Read 有大量的网络连接和大量磁盘随机读,导致较差的稳定性和性能。


Celeborn 简介

Apache Celeborn(Incubating) 是较成熟的通用 Remote Shuffle Service,可以很好的解决大数据引擎本地 Shuffle 存在的稳定性、性能、弹性的问题,详见文末索引[1][2][3][4]。Apache Celeborn 社区和 Gluten 社区过去一段时间相互配合,成功把 Celeborn 集成进 Gluten,使得 Native Spark 能更好的拥抱 Cloud Native。接下来将介绍 Gluten 如何集成Celeborn。



Gluten + Celeborn

整体设计

Gluten 集成 Celeborn 的设计目标是同时保留 Gluten Columnar Shuffle 和 Celeborn Remote Shuffle 的核心设计,让两者的优势叠加。


上图描述了 Gluten+Celeborn Columnar Shuffle 的整体设计:Shuffle Writer 复用 Native Partitioner,拦截本地 IO 并改为推向 Celeborn 集群;Celeborn 集群做数据重组(聚合相同 Partition 的数据)和多备份;Shuffle  Reader 从特定 Celeborn Worker 上顺序读取数据并反序列化为 Column Batch。这个设计不仅保留了 Gluten Columnar Shuffle 的高性能设计,又充分利用了 Celeborn 远端存储、数据重组和多副本的能力。


具体而言,Gluten 集成 Celeborn 主要在于实现对应的 ShuffleManager,ShuffleWriter 以及ShuffleReader,接下来将分别介绍。


CelebornShuffleManager

CelebornShuffleManager 继承了 Spark ShuffleManager 接口,作为 Gluten 对接 Celeborn 的 ShuffleManager,主要做了以下工作:

  • 向 Celeborn register shuffle,失败则回退到 Gluten 的本地 Columnar Shuffle。
  • 与 Celeborn 集群建立连接并初始化 Celeborn Shuffle Client。
  • 提供 getWriter 方法获取 CelebornShuffleWriter。
  • 提供 getReader 方法获取 CelebornShuffleReader。


CelebornShuffleWriter

CelebornShuffleWriter 与 Gluten Columnar Shuffle 一致,都采用了 Hash-based Shuffle。核心功能是复用 Gluten 中的 Native Partitioner,并将磁盘 IO 操作(Spill,写 Shuffle 文件)替换为推向 Celeborn 集群。主要流程如下:

  • 通过 JNI 向 Native 模块传递 CelebornPartitionPusher,使得 Native 模块可向 Celeborn 集群推送数据。
  • 复用 Native Partitioner 对列式数据进行 Partition,与 Gluten Columnar Shuffle 保持一致。
  • 向 GlutenMemoryConsumer 注册 Spiller,保证在 Spark 监测到内存不足触发 Spill 时,可以通过 Celeborn SDK 把数据推送到 Celeborn 集群,从而避免额外的磁盘 IO。
  • 在 Native 模块,当全部数据完成 Partition 后,将写文件操作替换成通过 Celeborn SDK 推送到 Celeborn 集群。


CelebornShuffleReader

CelebornShuffleReader 跟 Celeborn 集群建立连接,读取 Shuffle 数据。在 Gluten 侧实现 CelebornColumnarBatchSerializer,通过 deserializeStream 方法定制 InputStream 的 deserialize 流程,最后将反序列化的 ColumnarBatch 交给 Gluten 继续处理。从上述两图对比可知,本地 Shuffle 的 Reducer 从多个文件读取数据,而 Celeborn Reducer 只需从一个 Worker 上读取,随机读转换成了顺序读,网络的连接数也从乘数关系变成了线性关系,从而提升了 Shuffle Read 的性能。


性能测试

Celeborn 在磁盘资源受限时有最好的性能表现。我们测试了三组硬件环境:SDD 环境,充分 HDD 环境,有限 HDD 环境。整体结论是:在 SDD 环境, Gluten + Celeborn Columnar Shuffle 性能跟 Gluten 本地 Columnar Shuffle 持平;在充分 HDD 和有限 HDD 环境,Gluten + Celeborn Columnar Shuffle 性能比 Gluten 本地 Columnar Shuffle 分别提升8%12%


充分 HDD 环境

部署方式:Celeborn 集群和 Yarn 集群混部。

硬件环境:1 x Master(64 vCPU, 256 GiB)  5 x worker(40 vCPU, 176 GiB, 15x7300GB HDD)

Spark 版本:3.3.1

Benchmark:3T TPCDS


下图是 Gluten+Celeborn 相比 Gluten 的Top20的加速比:


下图是完整 TPCDS 的时间对比,整体提升8%:



受限 HDD 环境

部署方式:Celeborn 集群和 Yarn 集群混部。

硬件环境:1 x Master(64 vCPU, 256 GiB)  5 x worker(40 vCPU, 176 GiB, 2x7300GB HDD)

Spark 版本:3.3.1

Benchmark:3T TPCDS


下图是 Gluten+Celeborn 相比 Gluten 的Top20的加速比:


下图是完整 TPCDS 的时间对比,整体提升12%:



SSD 环境

最后把磁盘全部换成 SSD,Gluten+Celeborn 在不额外消耗机器资源的情况下,比 Gluten 性能提升 1.2%,性能基本持平。



总结

本篇文章介绍了 Gluten 项目的背景和目标,以及它如何解决基于 Apache Spark 的数据负载场景中的 CPU 计算瓶颈。Gluten 利用 Spark 插件机制,将查询计划发送给本地引擎执行,从而跳过 Spark 本身不够高效的执行路径。该项目支持多个本地引擎作为后端,引入 Columnar Shuffle 设计,并统一管理本地内存和 JVM 内存。此外,Gluten 集成了 Celeborn作为 Remote Shuffle Service,Celeborn 采用了 Push Shuffle 的设计,通过远端存储、数据重组、内存缓存、多副本等设计,不仅进一步提升 Gluten Shuffle 的性能和稳定性,还使得 Gluten 拥有更好的弹性,从而更好的拥抱云原生。


欢迎加入我们的开源项目,并贡献你的代码!我们的项目位于

Gluten: https://github.com/oap-project/gluten

Celeborn: https://github.com/apache/incubator-celeborn


Celeborn 用户交流钉群:41594456


Reference

[1]https://developer.aliyun.com/article/779686

[2]https://developer.aliyun.com/article/857757

[3]https://developer.aliyun.com/article/891951

[4]https://developer.aliyun.com/article/1153123

目录
相关文章
|
存储 分布式计算 Apache
Apache Celeborn 让 Spark 和 Flink 更快更稳更弹性
阿里云/数据湖 Spark 引擎负责人周克勇(一锤)在 Streaming Lakehouse Meetup 的分享。
1037 2
Apache Celeborn 让 Spark 和 Flink 更快更稳更弹性
|
SQL 分布式计算 Cloud Native
杭州 Meetup| Apache Kyuubi & Celeborn,助力 Spark 拥抱云原生
10月14日13:00-17:30,Apache Kyuubi & Celeborn 社区将在杭州举办「Apache Kyuubi & Celeborn (Incubating) 助力 Spark 拥抱云原生」Meetup,欢迎报名参会!
814 0
杭州 Meetup| Apache Kyuubi & Celeborn,助力 Spark 拥抱云原生
|
分布式计算 资源调度 Kubernetes
Apache Kyuubi & Celeborn (Incubating) 助力 Spark 拥抱云原生
网易数帆软件工程师潘成,在 ASF CommunityOverCode Asia 2023(北京)的分享。
737 0
Apache Kyuubi & Celeborn (Incubating) 助力 Spark 拥抱云原生
|
存储 分布式计算 Cloud Native
[实战系列]SelectDB Cloud Spark Connector 最佳实践
Spark SelectDB Connector 以 Spark 这个大数据计算的优秀组件作为核心,实现了利用 Spark 将外部数据源的大数据量同步到 SelectDB Cloud,便于我们实现大批量数据的快速同步,继而利用 SelectDB Cloud 为基石构建新一代的云原生数据仓库,结合 SelectDB Cloud 强大的分析计算性能,能够为企业带来业务便捷性以及增效将本的目标。
152 0
|
17天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
52 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
82 0
|
18天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
47 6
|
16天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
62 2