StateBackend
在 Flink 中, 状态存储被叫做 StateBackend , 它具备两种能力:
(1)在计算过程中提供访问 State 能力,开发者在编写业务逻辑中能够使用StateBackend 的接口读写数据。
(2)能够将 State 持久化到外部存储,提供容错能力。
一个 State Backend 主要负责两件事:Local State Management(本地状态管理) 和 Remote State Checkpointing(远程状态备份)。
Local State Management(本地状态管理)
State Management 的主要任务是确保状态的更新和访问。类似于数据库系统对数据的管理,State Backends 的状态管理就是提供对 State 的访问或更新操作,从这一点上看,State Backends 与数据库很相似。Flink 提供的 State Backends 主要有两种形式的状态管理:
- 直接将 State 以对象的形式存储到JVM的堆上面
- 将 State 对象序列化后存储到 RocksDB 中(RocksDB会写到本地的磁盘上)
以上两种方式,第一种存储到JVM堆中,因为是在内存中读写,延迟会很低,但State的大小受限于内存的大小;第二种方式存储到State Backends上(本地磁盘上),读写较内存会慢一些,但不受内存大小的限制,同时因为state存储在磁盘上,可以减少应用程序对内存的占用。根据使用经验,对延迟不是特别敏感的应用,选择第二种方式较好,尤其是State比较大的情况下。
Remote State Checkpointing(远程状态备份)
Flink程序是分布式运行的,而State都是存储到各个节点上的,一旦TaskManager节点出现问题,就会导致State的丢失。State Backend 提供了 State Checkpointing 的功能,将 TaskManager 本地的 State 的备份到远程的存储介质上,可以是分布式的存储系统或者数据库。不同的 State Backends 备份的方式不同,会有效率高低的区别。
状态后端
Flink提供了三种类型的状态后端,分别是基于内存的状态后端(MemoryStateBackend、基于文件系统的状态后端(FsStateBackend)以及基于RockDB作为存储介质的RocksDB StateBackend。这三种类型的StateBackend都能够有效地存储Flink流式计算过程中产生的状态数据,在默认情况下Flink使用的是MemoryStateBackend,区别见下表。下面分别对每种状态后端的特点进行说明。
1. MemoryStateBackend
MemoryStateBackend,运行时所需的 State 数据全部保存在 TaskManager JVM堆上内存中, KV 类型的 State、窗口算子的 State 使用 HashTable 来保存数据、触发器等。执行检查点的时候,会把 State 的快照数据保存到 JobManager 进程的内存中。
MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。
使用 MemoryStateBackend 时的注意点
默认情况下,每一个状态最大为 5 MB。可以通过 MemoryStateBackend 的构造函数增加最大大小。 状态大小受到 Akka 帧大小的限制,所以无论在配置中怎么配置状态大小,都不能大于 Akka 的帧大小。 状态的总大小不能超过 JobManager 的内存。
什么时候使用 MemoryStateBackend:
开发或调试时建议使用 MemoryStateBackend,因为这种场景的状态大小的是有限的。 MemoryStateBackend 非常适合状态比较小的用例和流处理程序。例如一次仅一条记录的函数(Map, FlatMap,或 Filter)或者 Kafka consumer。
2. FsStateBackend
FSStateBackend,运行时所需的 State 数据全部保存在 TaskManager 的内存中, 执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中。
可以是分布式或者本地文件系统,路径如:
HDFS 路径:“hdfs://namenode:40010/flink/checkpoints”
本地路径:“file:///data/flink/checkpoints”。
当选择 FsStateBackend 时,正在处理的数据会保存在 TaskManager 的内存中。在 checkpoint 时,状态后端会将状态快照写入配置的文件系统目录和文件中,同时会在 JobManager 或者 Zookeeper(在高可用场景下)的内存中存储极少的元数据。
默认情况下,FsStateBackend 会配置提供异步快照,以避免在写状态 checkpoint 时阻塞数据流的处理。该特性可以通过在实例化 FsStateBackend 时将布尔标志设置为 false 来禁用,例如:
new FsStateBackend(path, false);
当前的状态仍然会先存在 TaskManager 中,所以状态的大小同样不能超过 TaskManager 的内存。
使用 FsStateBacken 时的注意点:
State 数据首先被存在 TaskManager 的内存中。 2) State 大小不能超过 TM 内存。 3) TM 异步将 State 数据写入外部存储。
什么时候使用 FsStateBackend:
FsStateBackend 非常适合处理大状态,长窗口,或大键值状态的有状态流处理作业。 FsStateBackend 非常适合高可用方案。
MemoryStateBackend 和 FSStateBackend 都依赖于HeapKeyedStateBackend,HeapKeyedStateBackend 使用 State 存储数据。
3.RocksDBStateBackend
RocksDBStateBackend 跟内存型和文件型都不同 。
RocksDBStateBackend 使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于 TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中,在 JobManager 内存中会存储少量的检查点元数据。RocksDB 克服了 State 受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
RocksDBStateBackend 的配置同样需要文件系统的 URL(类型,地址,路径)等来配置,如下所示:
hdfs://namenode:40010/flink/checkpoints s3://flink/checkpoints
缺点:
RocksDBStateBackend 相比基于内存的 StateBackend,访问 State 的成本高很 多,可能导致数据流的吞吐量剧烈下降,甚至可能降低为原来的 1/10。
使用 RocksDBStateBackend 时的注意点:
RocksDB 的每个 key 和 value 的最大大小为 2^31 字节。这是因为 RocksDB 的 JNI API 是基于 byte[] 的。 我们需要在此强调,对于使用合并操作的有状态流处理应用程序,例如 ListState,随着时间的推移可能会累积超过 2^31 字节大小,这将会导致后续的任何检索的失败。 RocksDBStateBackend 也需要配置外部文件系统,集中保存 State 。
何时使用 RocksDBStateBackend:
RocksDBStateBackend 非常适合处理大状态,长窗口,或大键值状态的有状态流处理作业。 RocksDBStateBackend 非常适合高可用方案。 RocksDBStateBackend 是目前唯一支持有状态流处理应用程序增量检查点的状态后端。
在使用 RocksDB 时,状态大小只受限于磁盘可用空间的大小。这也使得 RocksDBStateBackend 成为管理超大状态的比较好的选择。使用 RocksDB 的权衡点在于所有状态的访问和检索都需要序列化(或反序列化)才能跨越 JNI 边界。与上面提到的堆上后端相比,这可能会影响应用程序的吞吐量。
状态后端的使用
rocksDB checkpoint常见配置:
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().enableUnalignedCheckpoints(); // 使用RocksDB 作为状态后端 env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); //true 表示增量checkpoint env.getCheckpointConfig().setCheckpointStorage("hdfs:///user/fycmb/flink/RocksDB/news"); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 3 * 1000));
Flink 状态持久化
首选,Flink 的状态最终都要持久化到第三方存储中,确保集群故障或者作业挂掉后能够恢复。RocksDBStateBackend 持久化策略有两种:
全量持久化策略 RocksFullSnapshotStrategy
增量持久化策略 RocksIncementalSnapshotStrategy
1、全量持久化策略每次将全量的 State 写入到状态存储中(HDFS)。内存型、文件型、RocksDB 类型的 StataBackend 都支持全量持久化策略。
2、增量持久化策略
增量持久化就是每次持久化增量的 State,只有 RocksDBStateBackend 支持增量持久化。
Flink 增量式的检查点以 RocksDB 为基础, RocksDB 是一个基于 LSM-Tree 的KV 存储。新的数据保存在内存中, 称为 memtable。如果 Key 相同,后到的数据将覆盖之前的数据,一旦 memtable 写满了,RocksDB 就会将数据压缩并写入磁盘。memtable 的数据持久化到磁盘后,就变成了不可变的 sstable。
因为 sstable 是不可变的,Flink 对比前一个检查点创建和删除的 RocksDB sstable 文件就可以计算出状态有哪些发生改变。为了确保 sstable 是不可变的,Flink 会在 RocksDB 触发刷新操作,强制将memtable 刷新到磁盘上 。在Flink 执行检查点时,会将新的 sstable 持久化到HDFS 中,同时保留引用。这个过程中 Flink 并不会持久化本地所有的 sstable,因为本地的一部分历史 sstable 在之前的检查点中已经持久化到存储中了,只需增加对 sstable 文件的引用次数就可以。
RocksDB 会在后台合并 sstable 并删除其中重复的数据。然后在 RocksDB 删除原来的 sstable,替换成新合成的 sstable.。新的 sstable 包含了被删除的sstable中的信息,通过合并历史的 sstable 会合并成一个新的 sstable,并删除这些历史sstable. 可以减少检查点的历史文件,避免大量小文件的产生。