Flink 遇见 Apache Celeborn:统一的数据 Shuffle 服务

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 最新发布的 0.3.0 版本新增对 Flink 批作业 Shuffle 的支持,从此 Flink、Spark 可以同时使用统一的数据 Shuffle 服务,更大程度节省资源、降低运维成本。

作者| 熊佳树( 履霜)

我们非常高兴的宣布 Apache Celeborn(Inclubating)[1] 正式支持 Flink,Celeborn 于去年 12 月份正式进入 Apache 软件基金会 (ASF) 的孵化器,一直致力打造统一的中间数据服务,助力引擎全方位提升性能、稳定性和弹性,最新发布的 0.3.0 版本新增对 Flink 批作业 Shuffle 的支持,从此 Flink、Spark 可以同时使用统一的数据 Shuffle 服务,更大程度节省资源、降低运维成本。

Celeborn 目前已支持 Flink 大部分版本,设计上 Celeborn 强调与 Flink 在内存管理、调度策略、流控机制、Metrics 等方面全面融合,确保引入 Celeborn 不会导致 Flink 在现有机制上回退,同时能够复用 Celeborn Master HA、多层存储、优雅升级等能力,在弹性、稳定性和性能上等多方面获得收益。

一、为什么需要 Apache Celeborn

Flink、Spark 作为流批一体的大数据计算引擎,Shuffle 是影响计算性能的关键阶段,同时越来越多的用户选择计算存储分离的架构,并将引擎部署在 K8s 集群上,而存算分离架构下计算节点 Local 磁盘不可能很大,另外 Flink、Spark 引擎还提供了根据资源量进行动态伸缩的 Adaptive Scheduler 的能力, 这都要求计算节点能够将中间的 Shuffle 数据及时的卸载到外部存储服务上,以提高资源的利用效率,所以非常有必要使用独立的 Shuffle 服务。

同时 Celeborn 支持多种高效数据 Shuffle 方式,适配多种部署模式,其具备的 HA 架构、优雅下线等能力,也使得 Celeborn 自身具备弹性。所以引入 Apache Celeborn 这样独立的 ShuffleService,是做到真正的资源弹性、提升稳定性和资源效率必经之路。接下来我们重点介绍 Celeborn 如何支持 Flink 及 Celeborn 的重要特性。

二、Celeborn支持引擎及特性一览

  • 支持 Flink 1.14 及以上版本,支持 Flink Adaptive Batch Job Scheduler

  • 支持 Spark 2.4/Spark 3.x,支持 Spark AQE

  • 稳定性 - Flink
    • 内存管理:融合 Flink 内存管理机制、支持类似 Flink Credit-based 流量控制机制
    • 容忍 JM、TM 重启恢复
    • 支持负载均衡、连接复用、心跳机制
    • 支持 Celeborn Worker 故障后,批量 Reproduce 数据
  • 支持多种部署模式:支持 Kubernetes 以及 Standalone 环境下部署

  • 高可用性:Celeborn Master 支持 HA,支持故障自动切换

  • 滚动升级:Celeborn 集群支持 Master/Worker 滚动升级

  • Shuffle 机制:支持 MapPartition/ReducePartition 两种数据 Shuffle 机制,未来在支持 Flink 上将融合两种机制

  • 性能: MapPartition 支持类似 Flink 的 IO 调度机制

  • 多层存储支持 SSD/HDD/HDFS 多层存储

三、支持 Flink 关键设计和重要特性说明

3.1 内存稳定性及协议优化

Celeborn 致力于服务多引擎成为统一的 Shuffle 数据服务,在设计上 Celeborn 通过增强框架和协议的扩展性,采用插件化的方式支持多引擎,这样大大提高了组件的复用性和降低了 Celeborn 的复杂性,但相比于 Spark 而言如何在 Flink 严格的内存管理模型之下支持 Flink 是 Celeborn 一个关键挑战。因为 Celeborn 出于统一的目的复用了之前所有的接口及协议,所以无法在网络栈上与 Flink 统一,这导致 Celeborn 并不能直接使用 Flink 的 NetworkBuffer。所以为了尽可能的使用受管理的内存避免 OOM,提高系统稳定性,Celeborn 在数据读写过程中做了多种优化:

  • 在写出数据时,对持有数据的 Flink 的 NettyBuffer 进行 Wrapper,实现了零拷贝数据传输,发送即释放
  • 在读取数据时,Celeborn 在 FlinkPluginClient 中实现了可以直接在数据读取时使用 Flink Buffer 的 Decoder,这样数据的写出、读取使用内存都是受 FlinkMemory 管理。这一点保持了与原生 Flink 内存模型一致。避免用户在采用了 Celeborn 之后对于作业参数的修改和可能导致的内存稳定性问题。

1

  • 支持 Credit-based 的流控机制: 为了进一步提升 Flink 侧的稳定性,Celeborn 在读数据时也引入了类似于 Flink 的 Credit-based 流量控制机制 [2],即只有在数据接收端(TaskManager)有足够缓冲区来接收数据时,数据发送端(Celeborn Worker)才会发送这些数据。而数据接收端在不断处理数据的过程中,也会将释放的缓冲区(Credit)反馈给发送端继续发送新的数据,而写数据则完全复用了 Celeborn 原有高效的多层存储实现。

2

3.2 MapPartition、ReducePartition 双 Shuffle 机制

Celeborn 支持两种数据的 Shuffle 方式,其中 MapPartition 是指 Partition 数据由同一个上游 Map Task 的写入,由下游多个 Reduce Task 读取,而 ReducePartition 则是由多个上游 Map Task 将属于同一个 Partition 数据推给同一个 Worker 做聚合,下游由 Reduce Task 直接读取,两种方式各有优劣:

  • ReducePartition 重算代价高,通常情况下,上游所有 Mapper 都会往某个 Reduce Partition 文件推送数据,当文件丢失时需要重算上游所有的 Task。尽管 Celeborn 的多副本机制可以降低数据丢失的概率,但可能性依然存在,MapPartition 利于容错恢复,出错重跑对应 Map Task 即可。
  • ReducePartition 能够将随机读转换为顺序读,所以 Reduce Task 在 Shuffle Read 时的网络效率和磁盘 IO 效率都能大幅提高,而 Map Partition 更灵活可以支持各种类型的 Shuffle 边。

为了更好的支持 Flink(新引入 Rescale 和 Forward 两种 Shuffle 类型),以及满足更小的重算代价,Celeborn0.3.0支持了 MapPartition 的 Shuffle 类型。在当前的版本 Celeborn 采用了 MapPartition 支持 Flink,ReducePartition 支持 Spark,不过在未来的版本中将考虑结合 Flink 边实现动态切换 Shuffle 机制,以达到最佳的性能。

3

3.3 MapPartition 数据读写与优化

根据 Flink 当前 Shuffle、调度及容错的特点,MapPartition 的方式也采用了目前 Flink 的 Sort-Shuffle 实现,即计算任务的输出数据在输出前对数据进行排序 ,排序后的数据追加写出到 CelebornWorker 的同一个文件中,而在数据读取的过程中,增加对数据读取请求的调度,始终按照文件的偏移顺序读取数据,满足读取请求,在最优的情况下可以实现数据的完全顺序读取。下图展示了数据文件的存储结构与 IO 调度流程。

4

3.4 面向多引擎的 Celeborn

根据上文描述,应该可以看出 Flink、Spark 对于 Celeborn 服务来说只是客户端的区别,两者完全可以复用一套 Celeborn 服务,不仅节省资源、提高运维效率,而且在架构上也会更加的清晰。

我们简单看一下 Celeborn 的架构:Celeborn 整个组成分为三个重要部分:CelebornMaster、CelebornWorker 及 CelebornPlugin(Flink、Spark),其中 CelebornMaster 负责管理整个 Shuffle 集群包括 Worker、Shuffle 资源管理及各种元数据等。Worker 则负责 Shuffle 数据写入读取,前文提到的 Flink 使用的 MapPartition 和 Spark 使用的 ReducePartition 模式复用了所有的服务端组件并在协议上达到了统一,Celeborn 服务端不感知引擎侧的差异。一套 Celeborn 集群可以同时为多种引擎提供服务。下面展现了 Flink、Spark 与 Celeborn 集群的交互架构图。

5

同时 Celeborn Master 使用 raft 协议同步集群元数据、Worker 及 App 信息,客户端/Worker 与 Leader 节点交互,不依赖外部组件即可实现 HA,客户端/Worker 在 Master 升级或故障时可自动切换至新的 Leader 节点。在设计上 Celeborn 抽象 Register Shuffle、Reserve Slots、Partition Split 及 Commit 等概念和接口,引擎侧完全可以使用这些接口插件化的实现管理逻辑,所以 Celeborn 自身具备的高可用、可扩展等特点非常适合接入新的引擎,欢迎有需求的用户和开发者一起丰富和发展 Celeborn。

3.5 Celeborn 更多特性和优化

Celeborn 0.3.0 版本还增加了诸如多级存储、多级黑名单等特性,优化了 RPC 请求数量和缩短了优雅升级的时间及进行了大量的 corner case 的修复稳定性,社区正在进行该版本的 release 流程,大家可以关注 Celeborn 的邮件组或 Apache Celeborn 官网 [3] 获得最新的 Release 信息。

四、Celeborn 在阿里内部生产实践及未来之路

Celeborn 支持 Flink 已经得到生产作业的验证。在阿里内部,Celeborn 承接的最大 Flink Batch 作业单 Shuffle 超过 600T,作业运行平稳、稳定性和性能优异。

另外 Apache Celeborn 对 Flink 的支持得到了 flink-remote-shuffle 社区 [4] 的大力支持,很多设计也源于 flink-remote-shuffle 项目,我们对此表示诚挚的感谢。

未来除了前文提到的 Celeborn 社区将结合 Flink 特点实现动态切换 Shuffle 的机制,还规划多级存储引入内存、支持 Flink Hybird Shuffle 等特性,最后感谢 Celeborn 的用户和开发者,并欢迎更多的用户和开发者加入!

Reference

[1] https://github.com/apache/incubator-celeborn

[2] https://www.alibabacloud.com/blog/analysis-of-network-flow-control-and-back-pressure-flink-advanced-tutorials_596632

[3] https://celeborn.apache.org/

[4] https://github.com/flink-extended/flink-remote-shuffle

更多 Celeborn 相关技术问题,可加入社区钉钉交流群 41594456~


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
消息中间件 关系型数据库 Kafka
flink cdc 数据问题之数据丢失如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
116 0
|
8天前
|
关系型数据库 Apache 流计算
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
本文介绍了如何将数据从 OceanBase 迁移到阿里云数据库 SelectDB 内核版 Apache Doris。提供 3 种数据同步方法 1. 使用 DataX,下载 DataX 并编写配置文件,通过 OceanBaseReader 和 DorisWriter 进行数据迁移。 2. 利用 Apache Doris 的 Catalog功 能,将 OceanBase 表映射到 Doris 并插入数据。 3. 通过Flink CDC,设置 OceanBase 环境,配置 Flink 连接器,实现实时数据同步。
手把手教你实现 OceanBase 数据到阿里云数据库 SelectDB 内核版 Apache Doris 的便捷迁移|实用指南
|
2月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
55 3
|
2月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之flink Oraclecdc 捕获19C数据时报错错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
14天前
|
存储 Linux 数据库
ZooKeeper【搭建 01】apache-zookeeper-3.6.2 单机版安装+配置+添加到service服务+开机启动配置+验证+chkconfig配置(一篇入门zookeeper)
【4月更文挑战第8天】ZooKeeper【搭建 01】apache-zookeeper-3.6.2 单机版安装+配置+添加到service服务+开机启动配置+验证+chkconfig配置(一篇入门zookeeper)
25 0
|
18天前
|
Linux 网络安全 Apache
Redhat 9 搭建Apache服务
Apache HTTP Server,开源且广泛使用的Web服务器,以其高效、可靠和可扩展性著称。它有两种工作模式:prefork(多进程单线程)和worker(多进程多线程)。在Redhat 9.2系统上安装Apache,涉及安装httpd服务及相关依赖,配置文件位于`/etc/httpd/conf/httpd.conf`。安装后,需关闭防火墙和SELinux,重启服务并设置开机启动,确保80端口监听。最后,通过IP地址访问测试页面以验证配置成功。
22 0
Redhat 9 搭建Apache服务
|
2月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1425 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
2月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1370 1
官宣|Apache Flink 1.19 发布公告
|
2月前
|
分布式计算 Hadoop Java
Flink CDC产品常见问题之tidb cdc 数据量大了就疯狂报空指针如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多