本文整理自蚂蚁集团实时计算组技术专家闵文俊在 FFA 2023 核心技术(一)中 的分享,内容关于蚂蚁流场景状态演进和优化的研究,主要分为以下三部分:
- 状态后端的演进
- 优化与适配
- 未来展望
一. 状态后端的演进
1. 蚂蚁状态后端的演进
在蚂蚁的生产环境中,最早有一套流计算系统内部的项目,代号为 Kepler。在 Kepler 系统实现了一套可插拔的状态后端存储系统。在我们生产环境中使用最广泛的是基于 Hbase 和 OceanBase 这种远程存储的状态后端,它的优势在于 checkpoint 非常的快,因为流任务在计算的过程中,它的数据已经写到远程存储中,并且它的 failover 也会比较快,因为在 failover 的过程中,它不涉及到数据的下载以及恢复的过程。它也不依赖于本地磁盘,对于云化环境会比较友好。
并且在分布式存储系统中,一般都提供了访问这些数据的 API,所以对于这些已经存储在状态后端里面的数据是可以通过这些 API 直接访问、查询出来,比如我们线上会通过 Hbase 的 API 直接将我们存储在这些系统中的流任务的状态的 offset 直接暴露出来,这就类似于 Flink 里面的 qureyable state 这种能力。
它的劣势也比较明显。首先,它的性能会比较差。因为对于这种分布式存储系统,它所能提供的访问延迟,基本都是在毫秒级,而本地存储一般能够到达微秒级,所以这个时延就比较高,它的整体的访问 TPS 就比较受限,一般不太适合于大状态的任务。第二,它的稳定性会不足,因为它会受限于第三方服务的稳定性影响,它整体的 SLA 就不如本地存储。
2. 基于 RocksDB 本地状态后端
在我们陆续从蚂蚁内部资源的流计算,逐步切到社区的 Flink 的过程中,状态后端也就自然切换到 Flink 自带的基于 RocksDB 的本地状态存储中。
RocksDB 状态存储也是现在使用的非常多的状态存储,它的优势主要是,它是基于 RocksDB 这种高性能的本地 KVstore,所以它的状态访问的延时非常低,它能满足时间敏感比较高的任务吞吐的需求。稳定性更好。
它的劣势在于,第一因为 RocksDB 本身是基于 LSMtree 这样的结构的,牺牲了一定的读的性能,来满足它的写的性能。但是当我们写入的前台流量比较大的时候,它的后台的 compaction 会去逐步占用更多的机器的 IO 和 CPU 资源,而影响到前台的写入以及访问的需求,表现的比较明显. 在线上的任务中会看观察到的现象,任务在高峰期的时候,会看到它的吞吐反而会有下降的表现。
第二,这种基于 RocksDB 的本地状态后端,它的 Rescale 和 Failover 也会比较慢。
第三,它的 Checkpoint 的代价会比较高,因为它涉及到本地的状态文件的上传. 为了更好的解决这些问题,蚂蚁内部自研了一套基于 RocksDB 的本地的 KV 存储去尝试优化以上的几个问题。
3. RocksDB compaction 过程
在介绍本地 KV 存储之前,我们先看一下 RocksDB 的 compaction 的过程。它在 compaction 过程中,会将某一层级校验出它所需要进行 compaction 的文件,然后去进行 compact。在 compact 的过程中,它会挑选出下一层级中与这一层的文件,将 key 存在 overlap 的文件进行Merge。在Merge的过程中,它的目的是去降低空间放大,以及通过文件的合并来减少后续查询中的文件的开销。但是在合并的过程中,因为我们本身 KV 的格式,并且一般来说 value 的是比较大的。但是在合并的过程中,其实这个 value 是不需要随着 KV 合并的过程而合并的。所以,基于这样的一个思想,提出了 KV 分离的 RocksDB 的增强的实现。
4. AntKV: 基于 RocksDB KV 分离的本地存储引擎
社区或者工业界有非常多的 KV 分离的思路,比较典型的是论文 whiskey 里面的一个 KV 分离的实践。我们的 KV 整体的思路也是接近于这种 KV 分离的思想,这个图里面显示的是一个 KV value 的写入的过程。
首先,它的 value 部分会被写入到 value log 里,同时会将 value log 所对应的文件 offset 替换成原始的 KV 数据里面 value 部分,然后再写到 LSM tree 里。在后续的 compaction 的过程中,只有 SST file 里面的数据才会参与 compaction,value log 这个数据不会参与 compaction。
注意这里面, 并不是所有的 KV 都参与 KV 分离的实现。因为从观察到的现象来看,流计算里也并不是所有的数据都满足 KV 分离的 pattern。因为从 KV 分离的 pattern 的主要受益场景来说, value 比较大的时候,通过减少 value 的 compaction 的 IO 操作来提升整体吞吐。如果 value 并不大,就会导致后续查询的时候,需要先通过 LSM 查到对应的 key 所对应的 value 的 address ,然后再跳转到 value log 里去查询这个 value 所对应的真实的数据,这样就增加了查询的开销。所以,为了解决这样的问题,在 KV 分离的时候,能够让它自适应的选择 KV 分离的策略,简单来说就是会通过 value 的阈值是否达到配置的大小,来选择它是否 KV 分离。在此之外,还支持了其他的一些功能,包括 value log 的空间回收 GC、TTL 和 check point 、Merge Op、AsyncRestore 还有 Table API 以及其他的一些优化。
这里主要讲一下它在 GC 上面的支持。因为在非 KV 分离的模式下,它的 compaction 的过程中,就已经会将各个 SST 文件中重复的 key 所对应的数据只保留最新的那一份,所以在 compaction 过程中,这个空间就已经被自然而然的回收。
但是在分离的模式下,在参与 compaction 过程中,它其实在 LSM tree 只有 key 和 value Address 的部分去参与 compaction。所以,当这个 value 不再被引用的时候,它就已经成为垃圾被放在 value log 里面,而且需要一个清理机制将其清理。
这里的思路有很多种,我们现在的方案里面采用的是一种被动回收的策略,就是在 compaction 的过程中,我们会去记录每一个 value log 所对应的 value 占用的垃圾比例。因为我们在 compaction 的过程中就可以知道哪些 key 被 compact 掉,所以我们知道这个 key 所对应的 value log 的文件,因此我们就能完全统计出这样的数据。当 value log 垃圾占比达到百分之百的时候,就可以安全的将这个 value log 直接清理掉。
但是还会存在另外一个问题, value log 里面有可能某一些 key 迟迟的没有参与 compaction 过程,就会造成 value log 残留下来,导致一定程度的空间放大。对于这个问题,我们主要是通过设置一个清理的阈值解决,比如当达到百分之九十的时候,我们就需要通过这些垃圾占比过高的文件去反向查找 SST 文件,然后将这些 SST 文件作为下一次 compaction 的候选,然后将它进行 compaction 。在这个过程中,它就能将这些 value log 的空间回收掉。
从以上来看,这套 AntKV 的存储系统的主要的优势在于,首先降低了 compaction 的读写放大,对于线上的效果来看,整体的 get put 的耗时更加稳定,也相比于之前更低。第二点是更高效的缓存,因为它降低的 LSM tree 本身有大小,所以它更多的 key 可以被缓存在 block cache 里面,以及自适应的 KV 分离, 适用多种场景。
它的主要劣势在于 scan 的效率会受影响,因为 scan 在 KV 分离模式中,可能会变成一个随机 IO,以及它的数据清理会延后,于是带来额外的磁盘的开销。
从 AntKV 这个角度来去测试整体的吞吐读写测试的话,相比于 RocksDB 来说,达到了两到四倍左右的提升,比较可观。
二.优化与适配
从引擎侧去和 AntKV 适配,我们主要实现了 AntKV 的 statebackend,Backend 实现了插件化改造,防止和 RocksDB 的 JNI 的命名冲突,并且在 checkpoint 阶段去对 AntKV 所产生的额外的 value log 的数据文件进行持久化管理。其他的优化包括 scan 的 prefetch 优化,为了解决在它 scan 产生随机 IO 的情况下如何提升它 scan 的性能的问题、以及基于 learned index 的这种 O(1) 的查询优化能够极大的提升它的 get 的时效性、以及在 Statebackend 里面内置支持 TTL 的功能。
1. No Unique Key Join 优化
以上是从 AntKV 的角度来看存储引擎。从引擎的角度来看,以上是一个非常典型的 No Unique Key Join 的一个 case。我们知道在 Flink 里面,在它 Join 的场景中,它会根据你 Join Input 流是否包含 Unique Key 以及 Join Key 是否包含 Unique Key 去选择合适的状态存储的方式。
以图为例,Input1 是一个没有 Unique Key 的流, Input2 是一个 Join key 包含了 Unique Key 的流。当 Input1 进来的时候,如果它去查询 Input2 的数据,它的查询的模式是以 Join Key 去查找这种模式。所以它的模式是比较符合于 AntKV 这样的优化,它是一个小 Key 大 Value 的 Pattern,它的查询过程也只需要 get 点查。而当我 Input2 的数据来的时候,它的存储模式是按照 Join Key 加 Record 拼接成一个大 Key,而它的 Value 是一个表示这个 Record 的行数,所以在整体存储的 Pattern 下,它看起来就会比较奇怪。然后它在查询的过程中,就需要按照 Join key 先去 seek 到某一个前缀,然后在 iterator 去遍历出这个 Pattern 下所有的 Record。首先,这个查询效率在非 KV 分离模式或者在 RocksDB 的场景中,它的效果就不好。这是因为,首先这种大 Key 在查找的过程中就涉及比较大的一个字符串的比较以及它的缓存效率会比较低。
那么有没有办法将这种 Pattern 和 KV 分离的优化匹配上去呢?我们就想办法去将存储格式改成一个比较合适的方式。如图的 Join Key 拼对应的一个 List Record ,就是一种设计方式。然后,我们在线上也采用了多种方式去测试这种场景。
方案如图中显示的,它使用的是一个 Value State,然后 Join Key 就对应到一个 List Record,表示这个 Join Key 下所有出现的 Record。方案二是 Map State ,它也是以 Join Key 作为前缀,再加上 Record 的 MD5 值拼接成一个 Key,然后它的 Value 是 Record 以及它出现的次数。方案三是使用的一个 List State 的 Value ,就是在我写入的过程中,有一条 Record,Append 写进去的这样的效果。从我们线上测试的效果来看,就是这三种方案,都看到它的 Join Key 的 Value Pattern 都是小 Key 带 Value 的这种模式。
所以,方案一、方案二、方案三的 TPS 就是优于原始方案的。而方案一的整体吞吐应该能达到原始方案的四倍以上,所以我们最终选择了方案一的实现方案。以上就是蚂蚁在状态后端上的演进。
2. 状态问题优化
2.1 小文件问题
小文件的问题在社区也是一个比较有名的问题,它主要在我们内部可能会导致两个问题。第一个问题是会导致我们线上出现一些 Task 持续 deploying,它没有办法恢复。第二个问题是小文件导致我们的文件数扩大持续不足,持续影响 DFS 的稳定性。
我们先来看一下为什么会有小文件的问题。小文件的问题可能主要和增量 Checkpoint 的方案有关,因为增量 Checkpoint 本身就是问题。
2.2 增量 Checkpoint 问题
增量 Checkpoint 的过程中要上传全量文件的优化方案,但是它在增量 Checkpoint 的过程中,会将每一个 Task 在 Checkpoint 之前,将 RocksDB 先 Flush 生成本次 Checkpoint 之间的新增的 SST 文件,然后再将这些本次和上次之间 Diff 出来的 SST 文件上传到远程,但是这些 SST 文件通常不会很大。在持续的过程中,这些 SST 文件就会导致产生很多的小文件。
3. 对状态恢复的影响
然后我们再来看为什么这种小文件会导致 Task deploy 的问题。首先,我们可以看到在 Checkpoint 的过程中,它是一个增量的过程,比如我们图中的 Subtask Manager 在做 Checkpoint 的过程中,它可能在 value logs 里有一个小于 20KB 的小文件,那么它在现有的机制里就会直接将这个数据传到 Job Manager 中,由 Job Manager 写入到 Metadata 文件里面,这也是一种优化机制,为了无需单独生成一个文件,将那些特别小的文件直接放到 Job Manager 的 MataData 。而在这个持续的过程中,它可能会产生比较多的小文件,并记录在 Matadata Data 里面。因为做 Checkpoint 是一个增量的过程,但是在恢复的过程中,是一个 FullRestore 的过程,就可能会存在提交 Task 到 Task Executor 上,部署的这个 Descriptor 包含了 ByteStream Statehandle 以及 File Stream Handle。这些 ByteStream Statehandle 在持续的增量、Checkpoint 的过程中,可能累加超过 RPC 限制的大小,最终导致任务在跑的过程中,因为挂一次而导致再也恢复不了。因为要传输的 State Handle 已经超过了 RPC 的限制。这个问题可能会和我们的这套 KV 分离的架构有一定的关系。因为在非 KV 分离的模式下,它的 SST 会持续的做 compaction。但是在 KV 分离模式下的 Value Log 的 Compaction 的频率会低于 SST 的 Compaction 的频率,因为本身的目的是为了减少 Value Log 的 Compaction 的频率。
为了解决这样的问题,我们参考了社区之前分享的一套基于 Segment 增量的 Checkpoint 方案。这套方案的主要思路就在于每一个 Task 在做 Checkpoint 过程中,并不会为每一个 SST 文件都生成一个远程的 DFS 文件,而是让每一个 Task 上 Checkpoint 过程中的要上传的文件,尽可能复用一个远端的文件,并且记录每一个 SST 文件。在这个文件中所占用的 Offset 和对应的 Length,去找到它所对应的 Segment。然后通过这种方式,就已经能够非常好的缓解小文件的问题。
我们上线之后发现小文件的问题虽然解决了,但是它还是会有空间放大的问题。如上面的任务里面,它从任务侧汇报上来,这个任务只是占用了 38 G 左右的状态大小。但实际上,从 DFS 测试, Checkpoint 目录所占用的空间可能达到了它原始大小的七百多倍左右,并且它的文件数也比较多。所以,这种 case 并没有达到我们所预期的效果。
4. 基于 Segment 增量 Checkpoint 方案
有一个问题在于,可能在某一些 DFS 的文件里面,它的文件持续在被引用。比如我们这个文件里面的第一个文件的四号 SST,它持续被后面的 checkpoint 引用,并没有被 GC 或者被合并成另外一个 SST 文件。这样的话就导致整个文件的大小无法释放,最终导致它的文件空间放大和文件数无法消减的问题。
现在要解决这个问题,我的思路非常简单,类似于我们之前在 AntKV 里面 Value Log 释放的思路,就是在存储空间和 IO 之间,尽可能寻找到一个平衡,比如在上传的时候检测这个 DFS 文件的有效比例,当文件有效比例低于多少的时候,我们就会将这个原始文件中的 Segment 作为本次 Checkpoint 中新增的 SST 上传到远程。这样的话,原始的 DFS 的文件就可以直接被清理掉。
5. 小文件合并存储空间放大问题
在上线之后,我们可以看到它的效果,实际存储量下降到了 Full State 的1.4倍左右,文件数也大大减少了。
6. 大任务 Union State restore 性能瓶颈
第二个问题是大任务的 Union State restore 的问题。 Union State restore 的过程其实是需要每一个 TM 去 Union State,它本质的意思就是每一个 TM 在做 Checkpoint 的过程中,每一个 sub task 上传自己的这部分状态,但是在恢复过程中,每一个 subtask 都能恢复到原始的状态。当我们以这种状态恢复的时候,就可以看到它是一种 All-to-all 模式的恢复。当我们的并发比较大的时候,比如我们现在这种是基于 File StateHandle 的模式恢复,它的 DFS 就会成为瓶颈。从我们线上的任务来看,比如有两千多左右的并发,它整个恢复时长可能会达到半个小时左右的长度,这个在业务上是无法容忍的。
我们也尝试另一种方式,比如去调低阈值,降低文件数。如果我们让它直接写到 Job Manager 的 Matter Data 里面,这样就只有一个文件去恢复,但是它的 Job Manager 也会成为瓶颈,它会频繁的在恢复的过程中去 OOM 。OOM 的原因也很简单,因为我们放在了它的 ByteStreamStateHandle ,那么在提交任务的时候,TDD 的大小会占用 Job Manager 的内存。
从我们提交的实际情况来看,它在恢复的过程中,这个 StreamStateHandle 可能会占到百万级别的大小,最终 JM 根本无法恢复。
解决这个问题的思路,第一点是在 JobManager 侧,我们只去创建一份原始的 Statehandle,就不用给每一个 Task 都复制一份,第二点是利用 Blob Server 去传输 TDD 中的大状态对象。
这个 Blob Server 在 Job Manager 里面本身就在用,所以我们也是利用了这样的一个机制去传输大对象。因为 Blob Server 本身也是在做这样的事情。
第三点是在同一个 TaskManager 上面,我们只去拉取一次 Union State 的状态,数据 Task 之间的初始化是共享的。经过以上的一系列的优化,最终它原来半小时的重启任务,现在可以没有压力地重启,而且 JM 内存也不再是它的瓶颈。
7. 任务 Rescale 问题
最后一个问题是任务 Rescale 的问题,社区也在持续优化这个问题。
我们在很早的时候就已经在尝试社区的 DeleteRange API 的优化。我们发现在 DeleteRange API 的模式下,InitialDB 的选择可以有更宽松的限制。
因为在早期的版本中,它的 Rescale 的过程就是每个 Task 先去下载属于我的 Handle 数据,下载回来之后,需要将数据导入进去,然后再将不属于我的 DB 数据删除掉。原来实现这种删除是通过遍历来删除,所以它需要一个高比例的 DB 作为它的 base,否则它的删除代价也是很高的。但是现在在 DeleteRange 这种 API 的模式下,从右图可以看到,在我们的测试中,它 Rescale 的耗时分布里面 DeleteRange 所占用的时间可以忽略不计,基本都是毫秒级别。所以在 DeleteRange 模式下,我们应该尽可能的选择一个 DB 作为它的 baseDB。
第二点是,我们发现在 Rescale 的过程中,它磁盘的开销可能会达到原始的两到三倍。因为我们跑的是一个云化环境,对于磁盘有比较高的限制,所以我们对于每一个 Worker 都有磁盘监控,当它磁盘超过某个值的时候,就可能会触发 Kill 的机制,让它释放掉,否则稳定性就无法得到保障。但是在 Rescale 的过程中,以上图为例,它在恢复的过程中,四扩到八个场景中,每一个原始的 Handle 比如1到10部分的状态会被1到5和5到10这两部分都下载。这样的话,状态在恢复的过程中它是 Double 的,并且可能由于 InitialDB 选择的并不合适导致我在 Open 时创建一个全新的 DB 将原始的数据全部导入进去,这样就可能会达到两到三倍的存储开销。
要解决这个问题,第一点就是刚刚我们说到的,DeleteRange InitialDB 的选择。第二个点是我们在恢复的过程中会去做磁盘的一定弹性,提供了磁盘一定的弹性能力,不让它在 Rescale 的过程中因为恢复的过程直接将一个 Worker 干掉。第三个是尝试使用 SST ingest API 去实现常量时间的 Rescale,对于这个功能,社区也在做这样的尝试。在我们内部的实现过程中,第一点最开始实现的也是类似于社区之前的方案,就是在恢复的过程中,将原始的 DB 去遍历,然后通过 SST writer 的 API 去写出图里的测试结果。可以看到,它的主要耗时还是在于原始的 DB 遍历和 Put 的开销上,所以通过 SST writer 这样的方式,还是不能解决问题。所以我们也在看社区现有的方案,以及我们内部的 AntKV 同学努力去将整体的耗时尽可能的降下来。
三.未来展望
未来展望主要分为两方面。
一方面是期望能够在现有的状态后端上,实现更快的 Failover和 Rescale 的能力。第二块是基于我们现在的 AntKV 的这套存储引擎的特性,去做出更多的算子级别的性能优化,包括 Join 场景的优化,以及在 Windows 上的场景也会去做这样的探索。
另一方面是降低本地存储的依赖,因为主流的还是在迁往云化环境,所以对于本地存储的依赖也将会成为一个问题。
其次是部分计算下沉存储引擎,我们现在也在探索 Paimon 数据湖产品,它也是基于 LSM tree 的湖存储。从图可以看到,它所支持的是比较灵活的去定义两个 SST 文件合并,它的 key 合并的 Merge engine,以及现在已经支持的 Aggregation 的 Merge engine,在生产业务中比较简单的聚合场景就已经可以通过这种模式玩转起来。
它的好处在于,因为有很多业务在 Flink 里面做聚合的时候,需要 Flink 状态去帮它存储全量的数据。但是我们不推荐它将 State作为一个持久化的存储。然而在数据湖里面,它完全可以通过这种方式,因为在计算存储端的 Merge 过程中,就已经被完成了。
另外一个好处在于,这种 Merge 的过程是大批量的去做的,它会比原始的通过 Flink State 的 Record 级别去 Get Put 的这种方式的性能会更好。但是它的代价也是会用一定的时延去换整体的吞吐,对于那些可以接受这样的时延开销的业务场景,我们已经在尝试使用这种方式去接入这种业务,来满足他们的业务需求。
Flink Forward Asia 2023
本届 Flink Forward Asia 更多精彩内容,可微信扫描图片二维码观看全部议题的视频回放及 FFA 2023 峰会资料!
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
59 元试用 实时计算 Flink 版(3000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc