Flink 遇见 Apache Celeborn:统一的数据 Shuffle 服务

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 最新发布的 0.3.0 版本新增对 Flink 批作业 Shuffle 的支持,从此 Flink、Spark 可以同时使用统一的数据 Shuffle 服务,更大程度节省资源、降低运维成本。

作者| 熊佳树( 履霜)

我们非常高兴的宣布 Apache Celeborn(Inclubating)[1] 正式支持 Flink,Celeborn 于去年 12 月份正式进入 Apache 软件基金会 (ASF) 的孵化器,一直致力打造统一的中间数据服务,助力引擎全方位提升性能、稳定性和弹性,最新发布的 0.3.0 版本新增对 Flink 批作业 Shuffle 的支持,从此 Flink、Spark 可以同时使用统一的数据 Shuffle 服务,更大程度节省资源、降低运维成本。

Celeborn 目前已支持 Flink 大部分版本,设计上 Celeborn 强调与 Flink 在内存管理、调度策略、流控机制、Metrics 等方面全面融合,确保引入 Celeborn 不会导致 Flink 在现有机制上回退,同时能够复用 Celeborn Master HA、多层存储、优雅升级等能力,在弹性、稳定性和性能上等多方面获得收益。

一、为什么需要 Apache Celeborn

Flink、Spark 作为流批一体的大数据计算引擎,Shuffle 是影响计算性能的关键阶段,同时越来越多的用户选择计算存储分离的架构,并将引擎部署在 K8s 集群上,而存算分离架构下计算节点 Local 磁盘不可能很大,另外 Flink、Spark 引擎还提供了根据资源量进行动态伸缩的 Adaptive Scheduler 的能力, 这都要求计算节点能够将中间的 Shuffle 数据及时的卸载到外部存储服务上,以提高资源的利用效率,所以非常有必要使用独立的 Shuffle 服务。

同时 Celeborn 支持多种高效数据 Shuffle 方式,适配多种部署模式,其具备的 HA 架构、优雅下线等能力,也使得 Celeborn 自身具备弹性。所以引入 Apache Celeborn 这样独立的 ShuffleService,是做到真正的资源弹性、提升稳定性和资源效率必经之路。接下来我们重点介绍 Celeborn 如何支持 Flink 及 Celeborn 的重要特性。

二、Celeborn支持引擎及特性一览

  • 支持 Flink 1.14 及以上版本,支持 Flink Adaptive Batch Job Scheduler

  • 支持 Spark 2.4/Spark 3.x,支持 Spark AQE

  • 稳定性 - Flink
    • 内存管理:融合 Flink 内存管理机制、支持类似 Flink Credit-based 流量控制机制
    • 容忍 JM、TM 重启恢复
    • 支持负载均衡、连接复用、心跳机制
    • 支持 Celeborn Worker 故障后,批量 Reproduce 数据
  • 支持多种部署模式:支持 Kubernetes 以及 Standalone 环境下部署

  • 高可用性:Celeborn Master 支持 HA,支持故障自动切换

  • 滚动升级:Celeborn 集群支持 Master/Worker 滚动升级

  • Shuffle 机制:支持 MapPartition/ReducePartition 两种数据 Shuffle 机制,未来在支持 Flink 上将融合两种机制

  • 性能: MapPartition 支持类似 Flink 的 IO 调度机制

  • 多层存储支持 SSD/HDD/HDFS 多层存储

三、支持 Flink 关键设计和重要特性说明

3.1 内存稳定性及协议优化

Celeborn 致力于服务多引擎成为统一的 Shuffle 数据服务,在设计上 Celeborn 通过增强框架和协议的扩展性,采用插件化的方式支持多引擎,这样大大提高了组件的复用性和降低了 Celeborn 的复杂性,但相比于 Spark 而言如何在 Flink 严格的内存管理模型之下支持 Flink 是 Celeborn 一个关键挑战。因为 Celeborn 出于统一的目的复用了之前所有的接口及协议,所以无法在网络栈上与 Flink 统一,这导致 Celeborn 并不能直接使用 Flink 的 NetworkBuffer。所以为了尽可能的使用受管理的内存避免 OOM,提高系统稳定性,Celeborn 在数据读写过程中做了多种优化:

  • 在写出数据时,对持有数据的 Flink 的 NettyBuffer 进行 Wrapper,实现了零拷贝数据传输,发送即释放
  • 在读取数据时,Celeborn 在 FlinkPluginClient 中实现了可以直接在数据读取时使用 Flink Buffer 的 Decoder,这样数据的写出、读取使用内存都是受 FlinkMemory 管理。这一点保持了与原生 Flink 内存模型一致。避免用户在采用了 Celeborn 之后对于作业参数的修改和可能导致的内存稳定性问题。

1

  • 支持 Credit-based 的流控机制: 为了进一步提升 Flink 侧的稳定性,Celeborn 在读数据时也引入了类似于 Flink 的 Credit-based 流量控制机制 [2],即只有在数据接收端(TaskManager)有足够缓冲区来接收数据时,数据发送端(Celeborn Worker)才会发送这些数据。而数据接收端在不断处理数据的过程中,也会将释放的缓冲区(Credit)反馈给发送端继续发送新的数据,而写数据则完全复用了 Celeborn 原有高效的多层存储实现。

2

3.2 MapPartition、ReducePartition 双 Shuffle 机制

Celeborn 支持两种数据的 Shuffle 方式,其中 MapPartition 是指 Partition 数据由同一个上游 Map Task 的写入,由下游多个 Reduce Task 读取,而 ReducePartition 则是由多个上游 Map Task 将属于同一个 Partition 数据推给同一个 Worker 做聚合,下游由 Reduce Task 直接读取,两种方式各有优劣:

  • ReducePartition 重算代价高,通常情况下,上游所有 Mapper 都会往某个 Reduce Partition 文件推送数据,当文件丢失时需要重算上游所有的 Task。尽管 Celeborn 的多副本机制可以降低数据丢失的概率,但可能性依然存在,MapPartition 利于容错恢复,出错重跑对应 Map Task 即可。
  • ReducePartition 能够将随机读转换为顺序读,所以 Reduce Task 在 Shuffle Read 时的网络效率和磁盘 IO 效率都能大幅提高,而 Map Partition 更灵活可以支持各种类型的 Shuffle 边。

为了更好的支持 Flink(新引入 Rescale 和 Forward 两种 Shuffle 类型),以及满足更小的重算代价,Celeborn0.3.0支持了 MapPartition 的 Shuffle 类型。在当前的版本 Celeborn 采用了 MapPartition 支持 Flink,ReducePartition 支持 Spark,不过在未来的版本中将考虑结合 Flink 边实现动态切换 Shuffle 机制,以达到最佳的性能。

3

3.3 MapPartition 数据读写与优化

根据 Flink 当前 Shuffle、调度及容错的特点,MapPartition 的方式也采用了目前 Flink 的 Sort-Shuffle 实现,即计算任务的输出数据在输出前对数据进行排序 ,排序后的数据追加写出到 CelebornWorker 的同一个文件中,而在数据读取的过程中,增加对数据读取请求的调度,始终按照文件的偏移顺序读取数据,满足读取请求,在最优的情况下可以实现数据的完全顺序读取。下图展示了数据文件的存储结构与 IO 调度流程。

4

3.4 面向多引擎的 Celeborn

根据上文描述,应该可以看出 Flink、Spark 对于 Celeborn 服务来说只是客户端的区别,两者完全可以复用一套 Celeborn 服务,不仅节省资源、提高运维效率,而且在架构上也会更加的清晰。

我们简单看一下 Celeborn 的架构:Celeborn 整个组成分为三个重要部分:CelebornMaster、CelebornWorker 及 CelebornPlugin(Flink、Spark),其中 CelebornMaster 负责管理整个 Shuffle 集群包括 Worker、Shuffle 资源管理及各种元数据等。Worker 则负责 Shuffle 数据写入读取,前文提到的 Flink 使用的 MapPartition 和 Spark 使用的 ReducePartition 模式复用了所有的服务端组件并在协议上达到了统一,Celeborn 服务端不感知引擎侧的差异。一套 Celeborn 集群可以同时为多种引擎提供服务。下面展现了 Flink、Spark 与 Celeborn 集群的交互架构图。

5

同时 Celeborn Master 使用 raft 协议同步集群元数据、Worker 及 App 信息,客户端/Worker 与 Leader 节点交互,不依赖外部组件即可实现 HA,客户端/Worker 在 Master 升级或故障时可自动切换至新的 Leader 节点。在设计上 Celeborn 抽象 Register Shuffle、Reserve Slots、Partition Split 及 Commit 等概念和接口,引擎侧完全可以使用这些接口插件化的实现管理逻辑,所以 Celeborn 自身具备的高可用、可扩展等特点非常适合接入新的引擎,欢迎有需求的用户和开发者一起丰富和发展 Celeborn。

3.5 Celeborn 更多特性和优化

Celeborn 0.3.0 版本还增加了诸如多级存储、多级黑名单等特性,优化了 RPC 请求数量和缩短了优雅升级的时间及进行了大量的 corner case 的修复稳定性,社区正在进行该版本的 release 流程,大家可以关注 Celeborn 的邮件组或 Apache Celeborn 官网 [3] 获得最新的 Release 信息。

四、Celeborn 在阿里内部生产实践及未来之路

Celeborn 支持 Flink 已经得到生产作业的验证。在阿里内部,Celeborn 承接的最大 Flink Batch 作业单 Shuffle 超过 600T,作业运行平稳、稳定性和性能优异。

另外 Apache Celeborn 对 Flink 的支持得到了 flink-remote-shuffle 社区 [4] 的大力支持,很多设计也源于 flink-remote-shuffle 项目,我们对此表示诚挚的感谢。

未来除了前文提到的 Celeborn 社区将结合 Flink 特点实现动态切换 Shuffle 的机制,还规划多级存储引入内存、支持 Flink Hybird Shuffle 等特性,最后感谢 Celeborn 的用户和开发者,并欢迎更多的用户和开发者加入!

Reference

[1] https://github.com/apache/incubator-celeborn

[2] https://www.alibabacloud.com/blog/analysis-of-network-flow-control-and-back-pressure-flink-advanced-tutorials_596632

[3] https://celeborn.apache.org/

[4] https://github.com/flink-extended/flink-remote-shuffle

更多 Celeborn 相关技术问题,可加入社区钉钉交流群 41594456~


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
21天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
309 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
877 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
107 3
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
188 61
|
3月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
3月前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
84 1
|
3月前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
93 1
|
3月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
215 0
|
5月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
52 1
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多