摘要:本文整理自阿里云 EMR Spark 团队的周克勇(一锤),在 Spark&DS Meetup 的分享。本篇内容主要分为三个部分:
- 传统 Shuffle 的问题
- Apache Celeborn (Incubating)简介
- Celeborn 在性能、稳定性、弹性上的设计
点击链接查看直播回放:https://developer.aliyun.com/live/251090
一、传统 Shuffle 的问题
Apache Spark 是广为流行的大数据处理引擎,它有很多使用场景: Spark SQL、批处理、流处理、MLLIB、GraphX 等。在所有组件下是统一的 RDD 抽象,RDD 血缘通过两种依赖关系描述,窄依赖和宽依赖。其中宽依赖是支撑复杂算子(Join, Agg 等)的关键,而宽依赖实现机制就是 Shuffle。
传统的 Shuffle 实现如上图中间部分所示,每个 Mapper 对 Shuffle Output 的数据,根据 Partition ID 做排序,然后把排序好的数据和索引写入本地盘。Shuffle Read 阶段,Reducer 从所有 Mapper 的 Shuffle 文件里读取属于自己的 Partition 数据。但这种实现有如下几个缺陷:
- 第一,依赖大容量的本地盘或云盘存储 Shuffle 数据,数据需要驻留直至消费完成。这就限制了存算分离,因为存算分离架构下,计算节点通常不希望有大容量的本地盘,希望计算结束就可以释放节点。
- 第二,Mapper 做排序会占用较大内存,甚至触发堆外排序,引入额外的磁盘 IO。
- 第三,Shuffle Read 有大量的网络连接,逻辑连接数是 m×n。
- 第四,存在大量的随机读盘。假设一个 Mapper 的 Shuffle 数据是 128M,Reducer 的并发是 2000,那么每个文件将会被读 2000 次,每次只随机读 64k,这就很容易达到磁盘 IOPS 的瓶颈。
- 第五,数据单副本,容错性不高。
以上五点缺陷最终导致不够高效、不够稳定以及不够弹性。
二、Apache Celeborn (Incubating)
Apache Celeborn (Incubating)是我们团队早期为了解决上述问题开发的 Remote Shuffle Service,已经于2022 年 10 月捐赠给了 Apache 基金会。Celeborn 的定位是大数据引擎统一中间数据服务,它是引擎无关的,并且除了支持 Shuffle,未来还会支持 Spilled data,这样计算节点就能真正解除对大容量本地盘的依赖。
在正式介绍 Celeborn 设计之前,简单介绍一些历史。Celeborn 最早诞生于 2020 年,当时的名字是 Remote Shuffle Service,主要为了满足客户需求,2021 年 12 月正式对外开源。开源之后我们吸引了来自小米、Shopee、网易等开发者共建,其中很多人已经成为了核心贡献者。
2022 年 10 月正式进入 Apache 孵化器,截至目前我们积累了 600+的 commits, 32 个 contributor,330+的 star,也希望更多感兴趣的开发者参与共建。
三、Celeborn 的性能、稳定性、弹性
Celeborn 针对性能提升的设计,主要包括核心设计、如何对接 Spark AQE、列式 Shuffle、多层存储。
1. 性能
Celeborn 采用了 Push Shuffle + Partition 数据聚合的核心设计。简单来讲,每个 Mapper 的内部维护一个 Buffer 来缓存 Shuffle 数据,当 Buffer 超过阈值之后会触发推送,Mapper 把属于同一个 Partition 的数据推给预先分配好的 Worker。
如上图所示,Partition1 和 Partition2 的数据推给 Worker1,Partition3 的数据推给 Worker2,每个 Partition 最终会生成一个文件。在 Shuffle Read 阶段 Reducer 只需从一个 Worker 上读取属于自己的数据。在这个设计下 Shuffle 数据不落盘,也不需要做排序。同时 Shuffle Read 从随机读转换成了顺序读,网络的连接数也从乘数关系变成了线性关系。这就解决了传统 Shuffle 的主要缺陷。
Partition 切分的设计动机是,对于大作业或者存在数据倾斜的数据,一个 Partition 的文件会变得非常大。我们遇到单 Partition 超过百 G 的情况,很容易把磁盘打爆,也会导致磁盘负载不均衡。
针对这种情况,Celeborn 实现了 Partition 切分。具体来讲,Worker 会动态监测每个 Partition 文件的大小,当超过阈值的时候会返回给 Client 一个 Split 标记。Client 收到 Split 标记后,会异步申请新的 Worker,等新的 Worker Ready 后,Client 会往新的 Worker 推送数据。这样就可以保证单个 Partition 的 Split 文件不会过大,在Shuffle Read 的时候会读取这两个 Split 文件。
接下来介绍 Celeborn 如何支持 Spark AQE。AQE 是近几年 Apache Spark 最重要的优化,它主要有三个场景,Partition 合并、Join Strategy 切换、Skew Join 优化。AQE 对 Shuffle 模块的要求是要能够按 Partition 的范围和 Mapper 的范围去读,按 Partition 的范围读会比较自然,如上图中右上角所示,Reducer1 直接读取 Partition2、3、4 的数据。
而根据 Mapper 的范围读,实现起来稍微复杂,可以分为以下三个步骤:
- 第一步,Split 切分。Skew Join 意味着数据有倾斜,大概率会触发 Partition 切分,例如 Partition1 切分成了 Split1 和 Split2。
- 第二步,Sort On Read。在首次 Read 某个 Partition Split 文件的时候,会触发 Sort On Read,Worker 会根据 Partition ID 对这个文件做排序。排序之后,Mapper 的范围读就会从排序之前的随机读变成顺序读。比如我要读 Mapper1 到 Mapper2 的数据,如果是排序之前的文件,我需要对这个文件 seek 四次,但如果是排序之后我只需要 seek 一次。
- 第三步,Range Read。Sub Reducer 从这两个 Partition 里顺序读取属于自己的 Mapper 范围的数据。同时,Split 文件会记录自己的 Mapper 列表,这样就可以裁剪掉不必要的 Split 文件。
接下来介绍 Celeborn 的列式 Shuffle。众所周知,行存和列存是两种常见的数据布局方式。列存的好处是相同类型的数据放在一起,易于编码,如字典编码、行程编码、Delta 编码、前缀编码等,可以非常大程度降低数据量。以往列存主要用于存储源表数据,而计算引擎算子内的中间数据大多用行存,因为以往算子的实现大多基于行存数据。
但近几年向量化引擎越来越流行,包括 Velox、ClickHouse、DuckDB 等,他们都使用了向量化的算子实现,因此算子的中间数据也使用了列存。虽然 Databricks 的 photon 引擎使用了向量化技术,但 Apache Spark 依然是基于行存的引擎。
为了在 Apache Spark 中实现列式 Shuffle,Celeborn 引入了行列转换和代码生成,在 Shuffle Write 的时候把行存的数据转化成列存,在 Shuffle Read 的时候把列转化为行存。同时为了提升行列转换的效率,Celeborn 引入了代码生成的技术来消除解释执行的开销。在 3T TPCDS 的测试中开启列式 Shuffle 后,整体的 Shuffle Size 可以减少 40%,行列转换的开销低于 5%。
接下来介绍 Celeborn 的多层存储。多层存储的设计目标是让 Celeborn 能够灵活适配多种硬件环境,并尽可能让数据存放在更快的存储层。Celeborn 定义了三种存储介质:内存、本地盘、分布式存储(OSS/HDFS)。用户可以任意选择 1-3 种存储,比如可以只用本地盘,也可以只用内存和 OSS。
上图展示了同时选择三种介质的存储机制,首先内存会被划分为两个逻辑区域,Push Data Region 和 Cache Region。Map 推送的数据会先落在 Push Data Region,当某个 Partition 的数据超过预设阈值会触发 Flush,这个时候 Celeborn 会去判断 Partition 的目标存储层,如果是本地盘(P3),这部分数据将被刷到本地;如果是内存 Cache(p4),这部分数据会被逻辑划分给 Cache Region(不会有真正的内存拷贝)。
当 Cache Region 满了时,Celeborn 会把最大的 Partition Evict 到下一层存储,例如 P4 会被刷到本地盘。一旦某个 Partition 的数据被刷盘,它后续的数据将不会被移到 Cache Region。
当本地盘满了时,我们有两种策略,第一种是把本地文件 Evict 到 OSS。第二种不用动本地文件,数据直接从内存 Flush 到 OSS。
多层存储既可以通过内存提升小 Shuffle 的性能,也可以利用 OSS 的海量存储空间,支持超大的 Shuffle,还还可以让 Celeborn 不依赖本地盘,比如只选择内存和 OSS,那么 Celeborn 就没有本地盘,这样就可以更好的对 Celeborn 服务本身实现弹性。
2. 稳定性
Celeborn 针对服务本身稳定性的设计,主要包括绍原地升级、拥塞控制、负载均衡。
首先介绍原地升级。可用性是服务必须满足的要求,蓝绿切换的方式虽然可以满足大部分场景,但需要较多人工介入和临时资源扩张。Celeborn 通过协议向前兼容和优雅重启实现了应用无感的原地升级。向前兼容我们通过协议的 PB 化实现,而优雅重启我们利用了 Partition 主动切分的特性,上图展示了优雅重启的过程。
首先,外部系统触发优雅重启,Worker 收到信号后,把自己标记为 graceful shutdown 状态,并上报给Master,此后 Master 将不会向 Worker 分配新的 slots。然后 Worker 给 PushData 的返回打上 HardSplit 的标,Client 收到这个标记后将不会继续向这个 Worker 推送数据,同时向该 Worker 会发起 CommitFile 的消息,当 Worker 上所有缓存在内存中的 Partition 数据完成 CommitFile 后,Worker 会把内存的状态序列化并存到本地的 LevelDB,然后重启。之后从 LevelDB 里读取并恢复状态,最后向Master重新注册。
从这个流程我们可以看到,由于有主动 Split 机制的存在,Celeborn 的优雅重启比起其他系统要更加高效,基本上可以在秒级别完成,且完全不影响作业运行。
接下来介绍 Celeborn 在 Shuffle Write 阶段的拥塞控制。为了避免瞬时的大作业把 Worker 内存打爆,Celeborn 参考了 TCP 的拥塞控制机制,包括慢启动、拥塞避免、拥塞控制三个环节。
Pusher 初始的时候处于慢启动状态,推送数据的速率很慢,但这个速率会以指数级上涨,当它到达某个阈值后会进入拥塞避免阶段。这时推送速率的上涨速度会变慢,变成固定的斜率。而这时如果 Worker 内存达到警戒线,会触发拥塞控制,给每个 Client 发一条标记。Client 收到之后会回到一开始的慢启动状态,Pusher 的速率也相应降到非常低。
流量控制的另一种常见设计是 Credit Based 的流控,简单来说就是每当我推送数据之前,要先向 Worker 拿到一定的 Credit,这意味着 Worker 会为我预留一部分内存,我只能推送不超过我手里的 Credit 的数据。这种机制可以保证对内存的精准控制,但它的 Tradeoff 是增加了控制流,对性能有一定的影响。
Celeborn 在 Shuffle Write 阶段采用的类 TCP 的拥塞控制,能同时兼顾瞬时流量的高峰和稳定态的性能。同时,Celeborn 在支持 Flink 的 Shuffle Read 阶段采用了 Credit Based 的设计。
接下来介绍 Celeborn 的负载均衡设计。当前 Celeborn 关注的负载均衡主要集中在磁盘,设计目标是隔离坏盘,并尽量把负载分配给更快的、空间更足的盘。具体来说,Worker 会监控本地每块可用盘的状态,包括健康度、刷盘速率、预测未来的用量,这些状态信息随心跳发给 Master。Master 维护了整个集群所有可用盘的状态信息,并根据某个算法模型对磁盘进行分组。级别高的组会分配更多的工作负载,如果属于同一个组,会尽量分配给可用容量更大的盘。Celeborn 这种负载均衡的设计在异构环境下有更稳定的表现。
3. 弹性
Celeborn 针对弹性的设计,主要包括 Spark on K8s + Celeborn 方案。
但 Spark on K8s 场景不存在 ESS,为了服务后续的 Shuffle Read,Pod 即使处于空闲状态也无法释放。开源方案为了优化这个场景,加了一个参数 spark.dynamicAllocation.shuffleTracking.enabled,通过跟踪 Shuffle 文件是否被读取来决定是否释放。但根据我们的测试,这个参数的效果有限。集成 Celeborn 之后,Shuffle 数据托管给 Celeborn 集群,Pod 就可以在空闲后立即释放,从而做到真正的弹性。
4. 典型场景
Celeborn 有以下三种典型的场景。
- 第一种是完全混部。也就是 HDFS、Yarn、Celeborn 分布在同一个集群,它的主要收益是可以提升性能和稳定性。
- 第二种是 Celeborn 独立部署,HDFS 和 Yarn 混部。它除了能提升性能和稳定性,还能隔离源表数据的 IO 和 Shuffle 数据的 IO 对磁盘的抢占,提供了一定的资源隔离,以及 Celeborn 集群的部分弹性。
- 第三种是存算分离。源表的数据存在对象存储,计算节点运行在 K8s 或者 Yarn 集群,Celeborn 的集群也独立部署,这种场景下计算集群和 Celeborn 集群都可以享用完整的弹性。
5. Evaluation
接下来分享两个案例,第一个是混部的案例。一位用户把 Celeborn 混部在计算集群中,Celeborn 部署的整体规模达到 1000 台以上,但每个 Worker 给的资源比较有限。
这位用户每天的 Shuffle 数据量在经过压缩后可以达到 4PB,对大数据稳定性的提升也非常的明显。从上图可以看到,存在 8 万多并发,单个 Shuffle 有 16T 规模的作业,在 HDD 环境下也可以稳定的运行,在上 Celeborn 之前这个作业是跑不过的。
第二个是一个存算分离的案例。一位用户采用了完全存算分离的架构,它的计算节点跑在 K8s 上,源表数据存在OSS,Celeborn 集群独立部署。他们的计算节点每天 Pod 的数量有好几万,默认开启 Spark 的动态资源伸缩功能,有非常好的弹性,除此之外,性能和稳定性也有显著提升。
上图是我们在标准测试集 TPCDS 3T 的混部环境的测试结果。Celeborn 在不额外消耗机器资源的情况下,单副本比 External Shuffle Service 性能提升 20%,双副本有 13% 的提升。
点击链接查看直播回放:https://developer.aliyun.com/live/251090