Flink 2.0 状态存算分离改造实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文整理自阿里云智能 Flink 存储引擎团队兰兆千在 FFA 2023 核心技术(一)中 的分享,内容关于 Flink 2.0 状态存算分离改造实践的研究。

本文整理自阿里云智能 Flink 存储引擎团队兰兆千在 FFA 2023 核心技术(一)中 的分享,内容关于 Flink 2.0 状态存算分离改造实践的研究,主要分为以下四部分:

  1. Flink 大状态管理痛点

  2. 阿里云自研状态存储后端 Gemini 的存算分离实践

  3. 存算分离的进一步探索

  4. 批量化存算分离适用场景

点击查看原文视频

一、Flink 大状态管理痛点

1.1 Flink 状态管理

状态管理是有状态流计算的核心。目前在 Flink 生产环境中使用的最多的状态后端是基于 RocksDB 的实现,它是一个以本地磁盘为主的状态管理,将状态文件存储于本地,同时在进行检查点的时候将文件周期性写入 DFS 。这是一种存算一体的架构,它足够简单,在小状态作业下能够保证稳定高效,可以满足绝大部分场景的需求。随着 Flink 的发展,业务场景日益复杂,大状态作业屡见不鲜,在存算一体的架构下涌现了很多与状态管理有关的现实问题。

1.2 大状态作业痛点

大状态作业下,基于 RocksDB 本地磁盘存算一体的状态管理主要会遇到以下四方面的问题:

  • 本地磁盘有可能会出现空间不足的情况,通常解决这类问题的方法就是扩容。在目前集群部署或是云原生部署的模式下,单独进行本地盘的扩容是不方便的,所以用户一般会增加并发度,也就是涉及到存储和计算绑在一起进行扩容,加剧了计算资源的浪费。
  • 作业正常状态访问时,本地磁盘 I/O 也会遇到一些瓶颈。这导致作业整体性能不足,同样需要进行扩并发操作。
  • 检查点的开销比较大。由于状态非常大,在检查点期间对于远端存储访问量呈现一个尖峰态势。
  • 在作业恢复的时候,需要从远端读取全量文件到本地,这个过程也十分缓慢。

上述前两点是影响用户成本的问题,而检查点的开销与恢复速度是 Flink 中影响易用性的关键问题。

1.3 存算分离的架构

对于以上问题,我们提出了存算分离的架构来解决。存算分离可以摆脱本地磁盘的限制,将远端存储(DFS)作为主存储,同时将空闲的本地磁盘作为一个 Cache 来进行使用。同时用户仍可以选择本地磁盘作为主存储,还用原来的模式来运行。这样做的显著的好处是,一方面由于磁盘空间和 I/O 性能不足的问题不再影响计算资源,另一方面是状态检查点与恢复在远端就可以直接完成,变得更加轻量级。从架构上完美解决了大状态作业面临的问题。

二.阿里云自研状态存储后端 Gemini 的存算分离实践

在进入存算分离架构探讨的最开始,我希望先从阿里云自研的企业级状态存储 Gemini 入手,探寻它在存算分离上的一些实践,主要分为以下三项:

2.1 多种文件系统分层管理

Gemini 能够把远端作为状态主存储的一部分。它首先将状态文件存储于本地磁盘,如果本地磁盘不足,则将文件移动到远端存储。本地磁盘中存留的是访问概率高的文件,远端存储的是不容易访问的文件。两部分共同构成了主存储,并在此基础上进行了冷热划分,保证了在给定资源条件下的高效服务。Gemini 的这种文件分层管理模式摆脱了本地磁盘空间的限制。理论上本地空间可以配置为零,以达到纯远端存储的效果。

2.2 状态懒加载

Gemini 能够支持远端文件存储,在作业恢复的场景之下,无需将数据从远端文件加载回本地就可以开启服务,使用户作业进入运行状态。这一功能称为状态懒加载。在实际恢复过程中,Gemini 仅需将元数据以及少量内存中的数据从远端加载回,就可以重建整个存储并启动。

虽然作业已经从远端文件启动了,但读取远端文件涉及到更长的 I/O 延迟,性能仍旧不理想,此时需要使用内存和本地磁盘进行加速。Gemini 会使用后台线程进行异步下载,将未下载的数据文件逐渐转移至本地磁盘。下载过程分为多种策略,比如按照 LSM-tree 层次的顺序,或者按照实际访问的顺序来下载。这些策略可以在不同场景进一步缩短从懒加载恢复到全速运行性能的时间。

2.3 Gemini 延迟剪裁

在改并发的场景中,比如将两个并发的状态数据合并成一个并发时,目前 RocksDB 是把这两份数据都下载完成之后再做一个合并,涉及到将多余的数据剪裁掉,重建数据文件,其速度是比较慢的。社区针对这个过程进行了很多的针对性优化,但仍然避免不了数据文件的下载。Gemini 只需要把这两部分数据的元数据进行加载,并且把它们合成一个特殊的 LSM-tree 结构,就可以启动服务,这一过程称为延迟剪裁。

重建后 LSM-tree 的层数相比正常情况下会比较多。比如针对图中的例子,有两个 L0 层,两个 L1 层和两个 L2 层。由于 Flink 有 KeyGroup 数据划分的机制存在,层数变多并不会对读链路长度造成影响。由于并未对数据进行实际的裁剪,会存在一些多余的数据,这些数据会在之后的整理 (Compaction) 过程逐步清理掉。延迟剪裁的过程无需对数据本身进行下载和实际合并操作,它可以极大地缩短状态恢复的时间。

2.4 Gemini 恢复效果

有了异步剪裁状态+状态懒加载,对于 Gemini 来说,恢复时间即作业从 INITIALIZING 到 RUNNING 的状态可以变得非常之短,相比于本地状态存储的方案是一个极大的提升。

我们针对 Gemini 与 RocksDB 的改并发时间进行了评测。评测的指标为从作业启动一直到恢复原有性能的时间,这包含了 Gemini 异步下载文件的时间。从上述实验结果中可以看到 Gemini 相比于RocksDB 在缩容、扩容的场景下都有明显的提升。

三.存算分离的进一步探索

Gemini 做存算分离相关的优化部分解决了前述大作业场景的问题。本地空间不足的问题可以通过远端空间来解决。针对检查点开销大的问题,因为已经有一部分文件远端存储上了,无需再次上传,这部分的开销也得以减少。针对作业恢复慢的问题,状态懒加载+延迟剪裁功能,使得作业能够快速的恢复运行状态。

这里还有一个功能是对 Memtable 的快照。Gemini 在做检查点的时候,是将 Memtable 的原样上传到远端存储上,不会影响 Memtable flush 的过程,也不会影响内部的 Compaction。它的效果和通用增量快照的 changelog 的效果是类似的,都会缓解检查点时的 CPU 开销和 DFS I/O 量的尖峰。

3.1 Gemini 存算分离的问题

Gemini 在存算分离方面做了不错的实践,在阿里内部与云上客户的大状态作业场景下均取得了不错的效果。但它仍存在着一些问题:

第一个问题,Gemini 还是把本地磁盘作为主存的一部分,状态文件是优先写到本地磁盘的,这并非最彻底的一个存算分离。这样会导致检查点时上传文件数量还是会比较多,持续时间较长,做不到非常轻量级的检查点。

第二个问题,是所有存算分离方案都会遇到的一个问题,就是与本地方案的性能差距。目前的方案中 Gemini 已经利用了本地磁盘,但本地磁盘的利用效率并不是最高的。如果更多的请求可以落到内存或者本地磁盘,对应的远端 I/O 的请求数降低,作业整体性能会有提升。另外,异步 I/O 是很多存储系统都会采用的优化。它使用提高 I/O 并行度的方式来解决提高作业的吞吐,是值得尝试的下一步优化方向。

针对这几个问题我们进行了简单的探索,首先是做了一个非常彻底的存算分离,直接写入远端存储并且把本地磁盘直接作为 Cache 来使用,在此基础上实践了不同形式的 Cache。第二方面,我们实现了一个简单的异步 I/O PoC,验证其在存算分离场景上的性能提升。

3.2 直接写入远端与本地磁盘 Cache 的探索

3.2.1 原始方案:基于文件的本地磁盘 Cache

直接使用远端存储作为主存的改动我们不作详述,在这里主要探讨 Cache 的形态与优化问题。最简单的架构是基于文件的 Cache 。如果远端的文件被访问到,它会被加载到本地磁盘 Cache。与此同时内存 Cache 仍然存在,并且仍旧采用 BlockCache 的形式。这个形式是非常简单高效的架构。但是内存 BlockCache 和本地磁盘的文件 Cache 有很大的一个数据重复,这相当于浪费了很多空间。另一方面,由于文件的粒度相对较粗,对于同一个文件的不同 block ,其访问的概率并不一样,所以会有一些冷的 block 维持在磁盘中,降低了本地磁盘的命中率。针对这两个问题,我们设计了全新的本地磁盘 Cache 的形态,对上述问题进行优化。

3.2.2 优化方案:基于 Block 的本地磁盘 Cache

我们提出将本地磁盘与内存结合起来,组成一个以 block 为粒度的混合式 Cache。它使用一个整体的 LRU 进行统一的管理,不同 block 只有介质上的不同。内存上相对冷的 block 会异步地刷到本地磁盘上,磁盘的 block 是按照顺序以追加写的形式来写在底层文件中。如果由于 LRU 策略淘汰了某些磁盘的 block,必然会映射到某个文件上形成空洞。为了维持 Cache 空间有效性,我们采取了空间回收来进行优化。空间回收的过程是一个空间和 CPU 开销的权衡。

不同层的文件如 L0 file 、L1 file 以及 L2 file,它们的生命周期是不一样的。对于 L0 file 来讲,它的生命周期比较短一些,但是热度相对高。对于 L2 file 来讲,文件本身更容易存活,但是热度是相对低的。根据这些不同的特点,我们可以采取不同的策略来进行空间回收。来自不同层文件 block 会被 Cache 在不同的底层文件中。针对不同的底层文件可以执行不同的空间回收阈值与频率,这样可以保证最大的空间回收效率。

另外我们针对 block 淘汰策略也提出了优化方案。最原始的 LRU 是根据命中频率来进行管理的,某个 block 一段时间内不命中则会被淘汰。这种策略并没有考虑到在缓存某一个block 的空间开销。也就是说可能为了缓存某个 block,却有更多的 block 没有办法进行缓存。在这里引入了一个新的评判体系叫做缓存效率,用一段时间内命中次数除以 block 大小,来更好的评判每一个缓存的 block 是否应该被缓存。这种评判方式的缺点是开销会比较大。最基本的 LRU 针对于查询都是 O(1) 的,但缓存效率的评分需要实现一个优先队列,其运行效率会有较大下降。所以在这里的思路还是在保持 LRU 主体管理的情况下,针对 block 的缓存效率异常的情况进行特殊化处理。

目前发现有两部分异常,第一部分是内存中的 data block 。它的命中率是内存中相对低的,但是它的占比能达到 50%。目前对于它的策略就是进行压缩,其代价是每次访问涉及到解压,但这个开销要比进行一个 I/O 的开销要小得多。第二部分是磁盘中的 filter block 。虽然它有命中,但它的大小是比较大的,缓存效率并不高。在这里实现了一个倾向于把磁盘中的 filter block 优先踢出的策略,使得相对上层的数据可以缓存进来。在测试作业场景中,这两条特殊规则与 LRU 相结合,相比于没有这两条规则的时候,整体 TPS 会上升 22%,效果比较显著。

但直接写入远端使系统出现了远端文件冷读问题,即文件第一次生成后的读取仍然需要涉及到远端 I/O。为了解决这个问题,我们在这里也做了一个小的优化,在本地磁盘上提供一个上传远端的队列,并且让其中的文件多缓存一段时间。这个时间并不会很长,大概是二三十秒的一个级别,在此期间队列文件的远端 I/O 会变为本地 I/O。这样的做法能够让远端冷读的问题大大的缓解。

到目前为止我们有两种存算分离的 Cache 方案,第一种是基于文件的本地磁盘 Cache 方案,它的优点是非常简单和有效,在磁盘充足的场景下有与本地方案类似的性能,因为本地磁盘可以缓存所有文件。第二种是混合式 block cache 的优化,在本地磁盘不足的情况下是一个非常好的方案,因为它提升了 Cache 的命中率。但是它也带来了比较大的管理开销。如果我们想要有一个通用的方案来适配所有场景,应该怎么做呢?

3.2.3 混合方案:自适应变化

将上述两种方案结合,我们设计了一个自适应变化的的混合方案。在磁盘充足的情况下使用的是基于文件的 Cache 方案,在磁盘不足的情况下,会把本地磁盘自动的和内存结合在一起组成混合式 block cache 方案。两种方案的结合会让它们两个的优点结合在一起,在所有的场景下都能够最大化的满足性能效率的需求。

3.2.4 混合方案:评测

我们针对上述提出的混合方案使用测试作业进行评测。可以看到在 TPS 上,新方案相比于文件为粒度的原始缓存方案有 80% 的提升。同时它也伴随着一些 CPU 的开销,用 CPU 效率(TPS/CPU)作为评判标准,新方案也有 40% 的提升。Cache 命中率的提升是 TPS 提升的一个主要来源。

3.3 异步 I/O 的探索

3.3.1 同步单条处理模式

第二项探索是对 Flink 进行的异步 I/O 改造与测试。如图展示了目前 Flink 的单线程处理模型,在 Task 线程上面,所有的数据是按顺序来进行处理的。对于每一条数据处理,会分为算子(operator)的 CPU 开销,状态(State)访问的 CPU 开销,以及状态访问所需的 I/O 时间,其中 I/O 是最大的一块开销。由于存算分离需要访问远端存储,其 I/O 延迟会比本地方案大,最终会导致整体 TPS 有明显下降。

3.3.2 批量处理+内部异步模式

我们对这一模式进行更改,使得 State 操作可以同时进行。在 Task 线程的角度来讲,State 被并行化之后整体的时间被缩小,所以 TPS 会有一个提升。同时,Task 线程需要预先攒批,这和 micro-batch 做的事情是非常类似的,同理也可以借用预聚合的功能,降低 state 访问的数目,TPS 得以进一步提升。

3.3.3 算子异步+批量处理模式

更进一步,在加上状态访问异步的基础之上,可以继续探索从算子的角度上进行异步化的过程。这意味着状态访问已经开始了异步执行后,让 Task 线程得以继续进行其他数据的 CPU 操作。但这样做有一个问题:状态访问 I/O 一般都是时间比较长的,虽然在 Task 线程闲的时候可以做一些其他的数据的处理工作,但是最终会一个速率不匹配的问题,瓶颈最终还会落到状态访问上,会退化到没有做此优化的情况。

经过权衡,我们认为仅采用攒批,再加上批内的状态访问使用异步 I/O 这种方式,是一个比较平衡的方案。

3.3.4 存算分离+批量化:评测

我们做了一个简单的支持批量异步的接口的状态后端,并在社区 Microbenchmark 上面做了一个简单的测试,目前仅涉及到 value get 的场景。从对比结果上可以看到,批量执行加上异步 I/O 是对存算分离场景有很大的提升。

四.批量化异步 I/O 存算分离适用场景

上述探索的批量化执行的存算分离状态访问有独特的应用场景。对于大状态作业来讲,存算分离在功能上解决了最开始所述的几个问题,在性能上,用批量接口的方式来弥补它的低的问题。

4.1 性能分析

此种方案的性能来源是 State 访问在批次内并行化,减少了状态访问的时间,提升了计算节点的 CPU 利用率。这种方案对于大状态作业性能提升是很有用的。

4.2 定性性能分析

在小状态作业的场景下,状态访问可以做到非常的快,将状态访问从 Task 线程抽离出来的提升量很小,且引入了线程之间交互的开销。所以在小状态的场景,这种批量异步状态访问的方案或许还不如原始本地状态管理方案。

随着状态大小逐渐增大,状态 I/O 开销逐渐增大并成为了瓶颈,异步 I/O 的执行当于摊薄了每个 I/O 所耗的时间。这导致了图中红色线的下降是较慢的,而本地状态管理(蓝色线)降低会比较快。在达到某个状态大小后,异步 I/O 的方案性能会显著的好。这种方案需要消耗 I/O 带宽,如果状态访问已经达到了 I/O 上限,异步 I/O 不能减少 I/O 的总时间,故此时它的斜率跟本地状态管理差不多。

如果状态很小的时候就达到 I/O 上限,并行化执行并不会产生效果,上图所示的便是这个场景。

总结一下,批量并异步执行状态访问在满足以下条件时会有优势:

  • 大状态作业场景且状态访问是作业的瓶颈
  • I/O 并没有达到瓶颈(未打满)
  • 业务对于攒批的延迟(亚秒到秒级别)可以接受

绝大部分存算分离场景下,由于 I/O 性能是存储集群提供,可以支撑比较大的 I/O 量且可以灵活伸缩,一般不会过早达到 I/O 瓶颈状态,异步 I/O 可以很好的优化存算分离场景。

结语

以上介绍了我们在存算分离方面做的一些探索。这些工作我们希望借着 Flink 2.0 的机会贡献给社区,一方面是支持纯远端的存算分离方案+混合式缓存的存储后端,另一方面是希望能够引入异步化 I/O 保证存算分离模式下的高性能数据处理。


Flink Forward Asia 2023

本届 Flink Forward Asia 更多精彩内容,可微信扫描图片二维码观看全部议题的视频回放及 FFA 2023 峰会资料!


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
53 1
|
2月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
19天前
|
存储 SQL 缓存
Flink 2.0 存算分离状态存储 — ForSt DB 
本文整理自阿里云技术专家兰兆千在Flink Forward Asia 2024上的分享,主要介绍Flink 2.0的存算分离架构、全新状态存储内核ForSt DB及工作进展与未来展望。Flink 2.0通过存算分离解决了本地磁盘瓶颈、检查点资源尖峰和作业恢复速度慢等问题,提升了云原生部署能力。ForSt DB作为嵌入式Key-value存储内核,支持远端读写、批量并发优化和快速检查点等功能。性能测试表明,ForSt在异步访问和本地缓存支持下表现卓越。未来,Flink将继续完善SQL Operator的异步优化,并引入更多流特性支持。
241 88
Flink 2.0 存算分离状态存储 — ForSt DB 
|
1天前
|
SQL 存储 Apache
基于 Flink 进行增量批计算的探索与实践
本文整理自阿里云高级技术专家、Apache Flink PMC朱翥老师在Flink Forward Asia 2024的分享,内容分为三部分:背景介绍、工作介绍和总结展望。首先介绍了增量计算的定义及其与批计算、流计算的区别,阐述了增量计算的优势及典型需求场景,并解释了为何选择Flink进行增量计算。其次,详细描述了当前的工作进展,包括增量计算流程、执行计划生成、控制消费数据量级及执行进度记录恢复等关键技术点。最后,展示了增量计算的简单示例、性能测评结果,并对未来工作进行了规划。
147 3
基于 Flink 进行增量批计算的探索与实践
|
4月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
214 15
|
15天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
330 2
探索Flink动态CEP:杭州银行的实战案例
|
5月前
|
消息中间件 分布式计算 Hadoop
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
Apache Flink 实践问题之Flume与Hadoop之间的物理墙问题如何解决
66 3
|
5月前
|
消息中间件 运维 Kafka
Apache Flink 实践问题之达到网卡的最大速度如何解决
Apache Flink 实践问题之达到网卡的最大速度如何解决
60 2
|
29天前
|
流计算 开发者
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
|
2月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。

相关产品

  • 实时计算 Flink版