作者| 熊佳树( 履霜)
我们非常高兴的宣布 Apache Celeborn(Inclubating)[1] 正式支持 Flink,Celeborn 于去年 12 月份正式进入 Apache 软件基金会 (ASF) 的孵化器,一直致力打造统一的中间数据服务,助力引擎全方位提升性能、稳定性和弹性,最新发布的 0.3.0 版本新增对 Flink 批作业 Shuffle 的支持,从此 Flink、Spark 可以同时使用统一的数据 Shuffle 服务,更大程度节省资源、降低运维成本。
Celeborn 目前已支持 Flink 大部分版本,设计上 Celeborn 强调与 Flink 在内存管理、调度策略、流控机制、Metrics 等方面全面融合,确保引入 Celeborn 不会导致 Flink 在现有机制上回退,同时能够复用 Celeborn Master HA、多层存储、优雅升级等能力,在弹性、稳定性和性能上等多方面获得收益。
一、为什么需要 Apache Celeborn
Flink、Spark 作为流批一体的大数据计算引擎,Shuffle 是影响计算性能的关键阶段,同时越来越多的用户选择计算存储分离的架构,并将引擎部署在 K8s 集群上,而存算分离架构下计算节点 Local 磁盘不可能很大,另外 Flink、Spark 引擎还提供了根据资源量进行动态伸缩的 Adaptive Scheduler 的能力, 这都要求计算节点能够将中间的 Shuffle 数据及时的卸载到外部存储服务上,以提高资源的利用效率,所以非常有必要使用独立的 Shuffle 服务。
同时 Celeborn 支持多种高效数据 Shuffle 方式,适配多种部署模式,其具备的 HA 架构、优雅下线等能力,也使得 Celeborn 自身具备弹性。所以引入 Apache Celeborn 这样独立的 ShuffleService,是做到真正的资源弹性、提升稳定性和资源效率必经之路。接下来我们重点介绍 Celeborn 如何支持 Flink 及 Celeborn 的重要特性。
二、Celeborn支持引擎及特性一览
支持 Flink 1.14 及以上版本,支持 Flink Adaptive Batch Job Scheduler
支持 Spark 2.4/Spark 3.x,支持 Spark AQE
- 稳定性 - Flink
- 内存管理:融合 Flink 内存管理机制、支持类似 Flink Credit-based 流量控制机制
- 容忍 JM、TM 重启恢复
- 支持负载均衡、连接复用、心跳机制
- 支持 Celeborn Worker 故障后,批量 Reproduce 数据
支持多种部署模式:支持 Kubernetes 以及 Standalone 环境下部署
高可用性:Celeborn Master 支持 HA,支持故障自动切换
滚动升级:Celeborn 集群支持 Master/Worker 滚动升级
Shuffle 机制:支持 MapPartition/ReducePartition 两种数据 Shuffle 机制,未来在支持 Flink 上将融合两种机制
性能: MapPartition 支持类似 Flink 的 IO 调度机制
多层存储支持 SSD/HDD/HDFS 多层存储
三、支持 Flink 关键设计和重要特性说明
3.1 内存稳定性及协议优化
Celeborn 致力于服务多引擎成为统一的 Shuffle 数据服务,在设计上 Celeborn 通过增强框架和协议的扩展性,采用插件化的方式支持多引擎,这样大大提高了组件的复用性和降低了 Celeborn 的复杂性,但相比于 Spark 而言如何在 Flink 严格的内存管理模型之下支持 Flink 是 Celeborn 一个关键挑战。因为 Celeborn 出于统一的目的复用了之前所有的接口及协议,所以无法在网络栈上与 Flink 统一,这导致 Celeborn 并不能直接使用 Flink 的 NetworkBuffer。所以为了尽可能的使用受管理的内存避免 OOM,提高系统稳定性,Celeborn 在数据读写过程中做了多种优化:
- 在写出数据时,对持有数据的 Flink 的 NettyBuffer 进行 Wrapper,实现了零拷贝数据传输,发送即释放
- 在读取数据时,Celeborn 在 FlinkPluginClient 中实现了可以直接在数据读取时使用 Flink Buffer 的 Decoder,这样数据的写出、读取使用内存都是受 FlinkMemory 管理。这一点保持了与原生 Flink 内存模型一致。避免用户在采用了 Celeborn 之后对于作业参数的修改和可能导致的内存稳定性问题。
- 支持 Credit-based 的流控机制: 为了进一步提升 Flink 侧的稳定性,Celeborn 在读数据时也引入了类似于 Flink 的 Credit-based 流量控制机制 [2],即只有在数据接收端(TaskManager)有足够缓冲区来接收数据时,数据发送端(Celeborn Worker)才会发送这些数据。而数据接收端在不断处理数据的过程中,也会将释放的缓冲区(Credit)反馈给发送端继续发送新的数据,而写数据则完全复用了 Celeborn 原有高效的多层存储实现。
3.2 MapPartition、ReducePartition 双 Shuffle 机制
Celeborn 支持两种数据的 Shuffle 方式,其中 MapPartition 是指 Partition 数据由同一个上游 Map Task 的写入,由下游多个 Reduce Task 读取,而 ReducePartition 则是由多个上游 Map Task 将属于同一个 Partition 数据推给同一个 Worker 做聚合,下游由 Reduce Task 直接读取,两种方式各有优劣:
- ReducePartition 重算代价高,通常情况下,上游所有 Mapper 都会往某个 Reduce Partition 文件推送数据,当文件丢失时需要重算上游所有的 Task。尽管 Celeborn 的多副本机制可以降低数据丢失的概率,但可能性依然存在,MapPartition 利于容错恢复,出错重跑对应 Map Task 即可。
- ReducePartition 能够将随机读转换为顺序读,所以 Reduce Task 在 Shuffle Read 时的网络效率和磁盘 IO 效率都能大幅提高,而 Map Partition 更灵活可以支持各种类型的 Shuffle 边。
为了更好的支持 Flink(新引入 Rescale 和 Forward 两种 Shuffle 类型),以及满足更小的重算代价,Celeborn0.3.0支持了 MapPartition 的 Shuffle 类型。在当前的版本 Celeborn 采用了 MapPartition 支持 Flink,ReducePartition 支持 Spark,不过在未来的版本中将考虑结合 Flink 边实现动态切换 Shuffle 机制,以达到最佳的性能。
3.3 MapPartition 数据读写与优化
根据 Flink 当前 Shuffle、调度及容错的特点,MapPartition 的方式也采用了目前 Flink 的 Sort-Shuffle 实现,即计算任务的输出数据在输出前对数据进行排序 ,排序后的数据追加写出到 CelebornWorker 的同一个文件中,而在数据读取的过程中,增加对数据读取请求的调度,始终按照文件的偏移顺序读取数据,满足读取请求,在最优的情况下可以实现数据的完全顺序读取。下图展示了数据文件的存储结构与 IO 调度流程。
3.4 面向多引擎的 Celeborn
根据上文描述,应该可以看出 Flink、Spark 对于 Celeborn 服务来说只是客户端的区别,两者完全可以复用一套 Celeborn 服务,不仅节省资源、提高运维效率,而且在架构上也会更加的清晰。
我们简单看一下 Celeborn 的架构:Celeborn 整个组成分为三个重要部分:CelebornMaster、CelebornWorker 及 CelebornPlugin(Flink、Spark),其中 CelebornMaster 负责管理整个 Shuffle 集群包括 Worker、Shuffle 资源管理及各种元数据等。Worker 则负责 Shuffle 数据写入读取,前文提到的 Flink 使用的 MapPartition 和 Spark 使用的 ReducePartition 模式复用了所有的服务端组件并在协议上达到了统一,Celeborn 服务端不感知引擎侧的差异。一套 Celeborn 集群可以同时为多种引擎提供服务。下面展现了 Flink、Spark 与 Celeborn 集群的交互架构图。
同时 Celeborn Master 使用 raft 协议同步集群元数据、Worker 及 App 信息,客户端/Worker 与 Leader 节点交互,不依赖外部组件即可实现 HA,客户端/Worker 在 Master 升级或故障时可自动切换至新的 Leader 节点。在设计上 Celeborn 抽象 Register Shuffle、Reserve Slots、Partition Split 及 Commit 等概念和接口,引擎侧完全可以使用这些接口插件化的实现管理逻辑,所以 Celeborn 自身具备的高可用、可扩展等特点非常适合接入新的引擎,欢迎有需求的用户和开发者一起丰富和发展 Celeborn。
3.5 Celeborn 更多特性和优化
Celeborn 0.3.0 版本还增加了诸如多级存储、多级黑名单等特性,优化了 RPC 请求数量和缩短了优雅升级的时间及进行了大量的 corner case 的修复稳定性,社区正在进行该版本的 release 流程,大家可以关注 Celeborn 的邮件组或 Apache Celeborn 官网 [3] 获得最新的 Release 信息。
四、Celeborn 在阿里内部生产实践及未来之路
Celeborn 支持 Flink 已经得到生产作业的验证。在阿里内部,Celeborn 承接的最大 Flink Batch 作业单 Shuffle 超过 600T,作业运行平稳、稳定性和性能优异。
另外 Apache Celeborn 对 Flink 的支持得到了 flink-remote-shuffle 社区 [4] 的大力支持,很多设计也源于 flink-remote-shuffle 项目,我们对此表示诚挚的感谢。
未来除了前文提到的 Celeborn 社区将结合 Flink 特点实现动态切换 Shuffle 的机制,还规划多级存储引入内存、支持 Flink Hybird Shuffle 等特性,最后感谢 Celeborn 的用户和开发者,并欢迎更多的用户和开发者加入!
Reference
[1] https://github.com/apache/incubator-celeborn
[3] https://celeborn.apache.org/
[4] https://github.com/flink-extended/flink-remote-shuffle
更多 Celeborn 相关技术问题,可加入社区钉钉交流群 41594456~
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc