Spark+Celeborn:更快,更稳,更弹性

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 本文整理自阿里云 EMR Spark 团队的周克勇(一锤),在 Spark&DS Meetup 的分享。


摘要:本文整理自阿里云 EMR Spark 团队的周克勇(一锤),在 Spark&DS Meetup 的分享。本篇内容主要分为三个部分:


  1. 传统 Shuffle 的问题
  2. Apache Celeborn (Incubating)简介
  3. Celeborn 在性能、稳定性、弹性上的设计


点击查看直播回放


一、传统 Shuffle 的问题

1.jpg

Apache Spark 是广为流行的大数据处理引擎,它有很多使用场景: Spark SQL、批处理、流处理、MLLIB、GraphX 等。在所有组件下是统一的 RDD 抽象,RDD 血缘通过两种依赖关系描述,窄依赖和宽依赖。其中宽依赖是支撑复杂算子(Join, Agg 等)的关键,而宽依赖实现机制就是 Shuffle。


传统的 Shuffle 实现如上图中间部分所示,每个 Mapper 对 Shuffle Output 的数据,根据 Partition ID 做排序,然后把排序好的数据和索引写入本地盘。Shuffle Read 阶段,Reducer 从所有 Mapper 的 Shuffle 文件里读取属于自己的 Partition 数据。但这种实现有如下几个缺陷:


  • 第一,依赖大容量的本地盘或云盘存储 Shuffle 数据,数据需要驻留直至消费完成。这就限制了存算分离,因为存算分离架构下,计算节点通常不希望有大容量的本地盘,希望计算结束就可以释放节点。
  • 第二,Mapper 做排序会占用较大内存,甚至触发堆外排序,引入额外的磁盘 IO。
  • 第三,Shuffle Read 有大量的网络连接,逻辑连接数是 m×n。
  • 第四,存在大量的随机读盘。假设一个 Mapper 的 Shuffle 数据是 128M,Reducer 的并发是 2000,那么每个文件将会被读 2000 次,每次只随机读 64k,这就很容易达到磁盘 IOPS 的瓶颈。
  • 第五,数据单副本,容错性不高。


以上五点缺陷最终导致不够高效、不够稳定以及不够弹性。


二、Apache Celeborn (Incubating)

2.jpg

Apache Celeborn (Incubating)是我们团队早期为了解决上述问题开发的 Remote Shuffle Service,已经于2022 年 10 月捐赠给了 Apache 基金会。Celeborn 的定位是大数据引擎统一中间数据服务,它是引擎无关的,并且除了支持 Shuffle,未来还会支持 Spilled data,这样计算节点就能真正解除对大容量本地盘的依赖。


在正式介绍 Celeborn 设计之前,简单介绍一些历史。Celeborn 最早诞生于 2020 年,当时的名字是 Remote Shuffle Service,主要为了满足客户需求,2021 年 12 月正式对外开源。开源之后我们吸引了来自小米、Shopee、网易等开发者共建,其中很多人已经成为了核心贡献者。


2022 年 10 月正式进入 Apache 孵化器,截至目前我们积累了 600+的 commits, 32 个 contributor,330+的 star,也希望更多感兴趣的开发者参与共建。


三、Celeborn 的性能、稳定性、弹性


Celeborn 针对性能提升的设计,主要包括核心设计、如何对接 Spark AQE、列式 Shuffle、多层存储。


1. 性能

3.jpg

Celeborn 采用了 Push Shuffle + Partition 数据聚合的核心设计。简单来讲,每个 Mapper 的内部维护一个 Buffer 来缓存 Shuffle 数据,当 Buffer 超过阈值之后会触发推送,Mapper 把属于同一个 Partition 的数据推给预先分配好的 Worker。


如上图所示,Partition1 和 Partition2 的数据推给 Worker1,Partition3 的数据推给 Worker2,每个 Partition 最终会生成一个文件。在 Shuffle Read 阶段 Reducer 只需从一个 Worker 上读取属于自己的数据。在这个设计下 Shuffle 数据不落盘,也不需要做排序。同时 Shuffle Read 从随机读转换成了顺序读,网络的连接数也从乘数关系变成了线性关系。这就解决了传统 Shuffle 的主要缺陷。


Partition 切分的设计动机是,对于大作业或者存在数据倾斜的数据,一个 Partition 的文件会变得非常大。我们遇到单 Partition 超过百 G 的情况,很容易把磁盘打爆,也会导致磁盘负载不均衡。


针对这种情况,Celeborn 实现了 Partition 切分。具体来讲,Worker 会动态监测每个 Partition 文件的大小,当超过阈值的时候会返回给 Client 一个 Split 标记。Client 收到 Split 标记后,会异步申请新的 Worker,等新的 Worker Ready 后,Client 会往新的 Worker 推送数据。这样就可以保证单个 Partition 的 Split 文件不会过大,在Shuffle Read 的时候会读取这两个 Split 文件。

4.jpg

接下来介绍 Celeborn 如何支持 Spark AQE。AQE 是近几年 Apache Spark 最重要的优化,它主要有三个场景,Partition 合并、Join Strategy 切换、Skew Join 优化。AQE 对 Shuffle 模块的要求是要能够按 Partition 的范围和 Mapper 的范围去读,按 Partition 的范围读会比较自然,如上图中右上角所示,Reducer1 直接读取 Partition2、3、4 的数据。


而根据 Mapper 的范围读,实现起来稍微复杂,可以分为以下三个步骤:


  • 第一步,Split 切分。Skew Join 意味着数据有倾斜,大概率会触发 Partition 切分,例如 Partition1 切分成了 Split1 和 Split2。
  • 第二步,Sort On Read。在首次 Read 某个 Partition Split 文件的时候,会触发 Sort On Read,Worker 会根据 Partition ID 对这个文件做排序。排序之后,Mapper 的范围读就会从排序之前的随机读变成顺序读。比如我要读 Mapper1 到 Mapper2 的数据,如果是排序之前的文件,我需要对这个文件 seek 四次,但如果是排序之后我只需要 seek 一次。
  • 第三步,Range Read。Sub Reducer 从这两个 Partition 里顺序读取属于自己的 Mapper 范围的数据。同时,Split 文件会记录自己的 Mapper 列表,这样就可以裁剪掉不必要的 Split 文件。

5.jpg

接下来介绍 Celeborn 的列式 Shuffle。众所周知,行存和列存是两种常见的数据布局方式。列存的好处是相同类型的数据放在一起,易于编码,如字典编码、行程编码、Delta 编码、前缀编码等,可以非常大程度降低数据量。以往列存主要用于存储源表数据,而计算引擎算子内的中间数据大多用行存,因为以往算子的实现大多基于行存数据。


但近几年向量化引擎越来越流行,包括 Velox、ClickHouse、DuckDB 等,他们都使用了向量化的算子实现,因此算子的中间数据也使用了列存。虽然 Databricks 的 photon 引擎使用了向量化技术,但 Apache Spark 依然是基于行存的引擎。


为了在 Apache Spark 中实现列式 Shuffle,Celeborn 引入了行列转换和代码生成,在 Shuffle Write 的时候把行存的数据转化成列存,在 Shuffle Read 的时候把列转化为行存。同时为了提升行列转换的效率,Celeborn 引入了代码生成的技术来消除解释执行的开销。在 3T TPCDS 的测试中开启列式 Shuffle 后,整体的 Shuffle Size 可以减少 40%,行列转换的开销低于 5%。


6.jpg


接下来介绍 Celeborn 的多层存储。多层存储的设计目标是让 Celeborn 能够灵活适配多种硬件环境,并尽可能让数据存放在更快的存储层。Celeborn 定义了三种存储介质:内存、本地盘、分布式存储(OSS/HDFS)。用户可以任意选择 1-3 种存储,比如可以只用本地盘,也可以只用内存和 OSS。


上图展示了同时选择三种介质的存储机制,首先内存会被划分为两个逻辑区域,Push Data Region 和 Cache Region。Map 推送的数据会先落在 Push Data Region,当某个 Partition 的数据超过预设阈值会触发 Flush,这个时候 Celeborn 会去判断 Partition 的目标存储层,如果是本地盘(P3),这部分数据将被刷到本地;如果是内存 Cache(p4),这部分数据会被逻辑划分给 Cache Region(不会有真正的内存拷贝)。


当 Cache Region 满了时,Celeborn 会把最大的 Partition Evict 到下一层存储,例如 P4 会被刷到本地盘。一旦某个 Partition 的数据被刷盘,它后续的数据将不会被移到 Cache Region。


当本地盘满了时,我们有两种策略,第一种是把本地文件 Evict 到 OSS。第二种不用动本地文件,数据直接从内存 Flush 到 OSS。


多层存储既可以通过内存提升小 Shuffle 的性能,也可以利用 OSS 的海量存储空间,支持超大的 Shuffle,还还可以让 Celeborn 不依赖本地盘,比如只选择内存和 OSS,那么 Celeborn 就没有本地盘,这样就可以更好的对 Celeborn 服务本身实现弹性。


2. 稳定性


Celeborn 针对服务本身稳定性的设计,主要包括绍原地升级、拥塞控制、负载均衡。


7.jpg


首先介绍原地升级。可用性是服务必须满足的要求,蓝绿切换的方式虽然可以满足大部分场景,但需要较多人工介入和临时资源扩张。Celeborn 通过协议向前兼容和优雅重启实现了应用无感的原地升级。向前兼容我们通过协议的 PB 化实现,而优雅重启我们利用了 Partition 主动切分的特性,上图展示了优雅重启的过程。


首先,外部系统触发优雅重启,Worker 收到信号后,把自己标记为 graceful shutdown 状态,并上报给Master,此后 Master 将不会向 Worker 分配新的 slots。然后 Worker 给 PushData 的返回打上 HardSplit 的标,Client 收到这个标记后将不会继续向这个 Worker 推送数据,同时向该 Worker 会发起 CommitFile 的消息,当 Worker 上所有缓存在内存中的 Partition 数据完成 CommitFile 后,Worker 会把内存的状态序列化并存到本地的 LevelDB,然后重启。之后从 LevelDB 里读取并恢复状态,最后向Master重新注册。


从这个流程我们可以看到,由于有主动 Split 机制的存在,Celeborn 的优雅重启比起其他系统要更加高效,基本上可以在秒级别完成,且完全不影响作业运行。

8.jpg

接下来介绍 Celeborn 在 Shuffle Write 阶段的拥塞控制。为了避免瞬时的大作业把 Worker 内存打爆,Celeborn 参考了 TCP 的拥塞控制机制,包括慢启动、拥塞避免、拥塞控制三个环节。


Pusher 初始的时候处于慢启动状态,推送数据的速率很慢,但这个速率会以指数级上涨,当它到达某个阈值后会进入拥塞避免阶段。这时推送速率的上涨速度会变慢,变成固定的斜率。而这时如果 Worker 内存达到警戒线,会触发拥塞控制,给每个 Client 发一条标记。Client 收到之后会回到一开始的慢启动状态,Pusher 的速率也相应降到非常低。


流量控制的另一种常见设计是 Credit Based 的流控,简单来说就是每当我推送数据之前,要先向 Worker 拿到一定的 Credit,这意味着 Worker 会为我预留一部分内存,我只能推送不超过我手里的 Credit 的数据。这种机制可以保证对内存的精准控制,但它的 Tradeoff 是增加了控制流,对性能有一定的影响。


Celeborn 在 Shuffle Write 阶段采用的类 TCP 的拥塞控制,能同时兼顾瞬时流量的高峰和稳定态的性能。同时,Celeborn 在支持 Flink 的 Shuffle Read 阶段采用了 Credit Based 的设计。


9.jpg


接下来介绍 Celeborn 的负载均衡设计。当前 Celeborn 关注的负载均衡主要集中在磁盘,设计目标是隔离坏盘,并尽量把负载分配给更快的、空间更足的盘。具体来说,Worker 会监控本地每块可用盘的状态,包括健康度、刷盘速率、预测未来的用量,这些状态信息随心跳发给 Master。Master 维护了整个集群所有可用盘的状态信息,并根据某个算法模型对磁盘进行分组。级别高的组会分配更多的工作负载,如果属于同一个组,会尽量分配给可用容量更大的盘。Celeborn 这种负载均衡的设计在异构环境下有更稳定的表现。


3. 弹性


Celeborn 针对弹性的设计,主要包括 Spark on K8s + Celeborn 方案。


10.jpg

在 Yarn 的场景,External Shuffle Service 是 Spark 开启动态资源伸缩的前提,Shuffle 数据托管给 ESS 后,Executor 就可以释放。


但 Spark on K8s 场景不存在 ESS,为了服务后续的 Shuffle Read,Pod 即使处于空闲状态也无法释放。开源方案为了优化这个场景,加了一个参数 spark.dynamicAllocation.shuffleTracking.enabled,通过跟踪 Shuffle 文件是否被读取来决定是否释放。但根据我们的测试,这个参数的效果有限。集成 Celeborn 之后,Shuffle 数据托管给 Celeborn 集群,Pod 就可以在空闲后立即释放,从而做到真正的弹性。


4. 典型场景

11.jpg

Celeborn 有以下三种典型的场景。


  • 第一种是完全混部。也就是 HDFS、Yarn、Celeborn 分布在同一个集群,它的主要收益是可以提升性能和稳定性。
  • 第二种是 Celeborn 独立部署,HDFS 和 Yarn 混部。它除了能提升性能和稳定性,还能隔离源表数据的 IO 和 Shuffle 数据的 IO 对磁盘的抢占,提供了一定的资源隔离,以及 Celeborn 集群的部分弹性。
  • 第三种是存算分离。源表的数据存在对象存储,计算节点运行在 K8s 或者 Yarn 集群,Celeborn 的集群也独立部署,这种场景下计算集群和 Celeborn 集群都可以享用完整的弹性。


5. Evaluation

12.jpg

接下来分享两个案例,第一个是混部的案例。一位用户把 Celeborn 混部在计算集群中,Celeborn 部署的整体规模达到 1000 台以上,但每个 Worker 给的资源比较有限。


这位用户每天的 Shuffle 数据量在经过压缩后可以达到 4PB,对大数据稳定性的提升也非常的明显。从上图可以看到,存在 8 万多并发,单个 Shuffle 有 16T 规模的作业,在 HDD 环境下也可以稳定的运行,在上 Celeborn 之前这个作业是跑不过的。


13.jpg

第二个是一个存算分离的案例。一位用户采用了完全存算分离的架构,它的计算节点跑在 K8s 上,源表数据存在OSS,Celeborn 集群独立部署。他们的计算节点每天 Pod 的数量有好几万,默认开启 Spark 的动态资源伸缩功能,有非常好的弹性,除此之外,性能和稳定性也有显著提升。

14.jpg


上图是我们在标准测试集 TPCDS 3T 的混部环境的测试结果。Celeborn 在不额外消耗机器资源的情况下,单副本比 External Shuffle Service 性能提升 20%,双副本有 13% 的提升。



Celeborn 用户交流钉群

lADPJwKt3Ydp9mvNBWXNBD4_1086_1381.jpg

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
目录
相关文章
|
存储 分布式计算 Apache
Apache Celeborn 让 Spark 和 Flink 更快更稳更弹性
阿里云/数据湖 Spark 引擎负责人周克勇(一锤)在 Streaming Lakehouse Meetup 的分享。
1037 2
Apache Celeborn 让 Spark 和 Flink 更快更稳更弹性
|
存储 分布式计算 Kubernetes
Spark+Celeborn:更快,更稳,更弹性
本文整理自阿里云 EMR Spark 团队的周克勇(一锤),在 Spark&DS Meetup 的分享。
|
存储 弹性计算 分布式计算
Hadoop集群伸缩难?DLA Spark助力集群快速加弹性
由于历史原因,很多用户的业务运行在用户自建的Hadoop集群上,随着业务的发展会遇到扩容难,缩容慢,弹不出等问题。DLA团队将Serverless、云原生、Spark技术优势深度整合到一起,提供Serverless Spark产品,可以无缝连接用户Hadoop集群,快捷稳定地为传统Hadoop集群增加弹性算力
|
存储 缓存 分布式计算
Serverless Spark的弹性利器 - EMR Shuffle Service
在传统计算存储混合的架构中,为了兼顾计算和存储,CPU和存储设备都不能太差,因此牺牲了灵活性,提高了成本。在计算存储分离架构中,可以独立配置计算机型和存储机型,具有极大的灵活性,从而降低成本。
Serverless Spark的弹性利器 - EMR Shuffle Service
|
分布式计算 Java Hadoop
[Spark]Spark RDD 指南三 弹性分布式数据集(RDD)
Spark2.3.0 版本: Spark2.3.0 创建RDD Spark的核心概念是弹性分布式数据集(RDD),RDD是一个可容错、可并行操作的分布式元素集合。
1459 0
|
18天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
53 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
82 0