flink 状态后端详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink 状态后端详解

StateBackend

Flink 中, 状态存储被叫做 StateBackend , 它具备两种能力:

1)在计算过程中提供访问 State 能力,开发者在编写业务逻辑中能够使用StateBackend 的接口读写数据。

2)能够将 State 持久化到外部存储,提供容错能力。

一个 State Backend 主要负责两件事:Local State Management(本地状态管理) 和 Remote State Checkpointing(远程状态备份)。

Local State Management(本地状态管理)

State Management 的主要任务是确保状态的更新和访问。类似于数据库系统对数据的管理,State Backends 的状态管理就是提供对 State 的访问或更新操作,从这一点上看,State Backends 与数据库很相似。Flink 提供的 State Backends 主要有两种形式的状态管理:

  • 直接将 State 以对象的形式存储到JVM的堆上面
  • 将 State 对象序列化后存储到 RocksDB 中(RocksDB会写到本地的磁盘上)

以上两种方式,第一种存储到JVM堆中,因为是在内存中读写,延迟会很低,但State的大小受限于内存的大小;第二种方式存储到State Backends上(本地磁盘上),读写较内存会慢一些,但不受内存大小的限制,同时因为state存储在磁盘上,可以减少应用程序对内存的占用。根据使用经验,对延迟不是特别敏感的应用,选择第二种方式较好,尤其是State比较大的情况下。

Remote State Checkpointing(远程状态备份)

Flink程序是分布式运行的,而State都是存储到各个节点上的,一旦TaskManager节点出现问题,就会导致State的丢失。State Backend 提供了 State Checkpointing 的功能,将 TaskManager 本地的 State 的备份到远程的存储介质上,可以是分布式的存储系统或者数据库。不同的 State Backends 备份的方式不同,会有效率高低的区别。

状态后端


Flink提供了三种类型的状态后端,分别是基于内存的状态后端(MemoryStateBackend、基于文件系统的状态后端(FsStateBackend)以及基于RockDB作为存储介质的RocksDB StateBackend。这三种类型的StateBackend都能够有效地存储Flink流式计算过程中产生的状态数据,在默认情况下Flink使用的是MemoryStateBackend,区别见下表。下面分别对每种状态后端的特点进行说明。

60a6bcefe26f4b118e50f46e4d0afd1d.png

1. MemoryStateBackend

MemoryStateBackend,运行时所需的 State 数据全部保存在 TaskManager JVM堆上内存中, KV 类型的 State、窗口算子的 State 使用 HashTable 来保存数据、触发器等。执行检查点的时候,会把 State 的快照数据保存到 JobManager 进程的内存中

MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。

使用 MemoryStateBackend 时的注意点

默认情况下,每一个状态最大为 5 MB。可以通过 MemoryStateBackend 的构造函数增加最大大小。
状态大小受到 Akka 帧大小的限制,所以无论在配置中怎么配置状态大小,都不能大于 Akka 的帧大小。
状态的总大小不能超过 JobManager 的内存。

什么时候使用 MemoryStateBackend:

开发或调试时建议使用 MemoryStateBackend,因为这种场景的状态大小的是有限的。
MemoryStateBackend 非常适合状态比较小的用例和流处理程序。例如一次仅一条记录的函数(Map, FlatMap,或 Filter)或者 Kafka consumer。

2. FsStateBackend


FSStateBackend,运行时所需的 State 数据全部保存在 TaskManager 的内存中, 执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中

可以是分布式或者本地文件系统,路径如:

HDFS 路径:“hdfs://namenode:40010/flink/checkpoints”

本地路径:“file:///data/flink/checkpoints”

当选择 FsStateBackend 时,正在处理的数据会保存在 TaskManager 的内存中。在 checkpoint 时,状态后端会将状态快照写入配置的文件系统目录和文件中,同时会在 JobManager 或者 Zookeeper(在高可用场景下)的内存中存储极少的元数据。

默认情况下,FsStateBackend 会配置提供异步快照,以避免在写状态 checkpoint 时阻塞数据流的处理。该特性可以通过在实例化 FsStateBackend 时将布尔标志设置为 false 来禁用,例如:

new FsStateBackend(path, false);

当前的状态仍然会先存在 TaskManager 中,所以状态的大小同样不能超过 TaskManager 的内存。

使用 FsStateBacken 时的注意点:

 State 数据首先被存在 TaskManager 的内存中。
2) State 大小不能超过 TM 内存。
3) TM 异步将 State 数据写入外部存储。

什么时候使用 FsStateBackend:

FsStateBackend 非常适合处理大状态,长窗口,或大键值状态的有状态流处理作业。
FsStateBackend 非常适合高可用方案。

MemoryStateBackend FSStateBackend 都依赖于HeapKeyedStateBackendHeapKeyedStateBackend 使用 State 存储数据。

3.RocksDBStateBackend

RocksDBStateBackend 跟内存型和文件型都不同 。

RocksDBStateBackend 使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于 TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中,在 JobManager 内存中会存储少量的检查点元数据。RocksDB 克服了 State 受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。

RocksDBStateBackend 的配置同样需要文件系统的 URL(类型,地址,路径)等来配置,如下所示:

hdfs://namenode:40010/flink/checkpoints
s3://flink/checkpoints

缺点:

RocksDBStateBackend 相比基于内存的 StateBackend,访问 State 的成本高很
多,可能导致数据流的吞吐量剧烈下降,甚至可能降低为原来的 1/10。

使用 RocksDBStateBackend 时的注意点:

RocksDB 的每个 key 和 value 的最大大小为 2^31 字节。这是因为 RocksDB 的 JNI API 是基于 byte[] 的。
我们需要在此强调,对于使用合并操作的有状态流处理应用程序,例如 ListState,随着时间的推移可能会累积超过 2^31 字节大小,这将会导致后续的任何检索的失败。
RocksDBStateBackend 也需要配置外部文件系统,集中保存 State 。

何时使用 RocksDBStateBackend:

RocksDBStateBackend 非常适合处理大状态,长窗口,或大键值状态的有状态流处理作业。
RocksDBStateBackend 非常适合高可用方案。
RocksDBStateBackend 是目前唯一支持有状态流处理应用程序增量检查点的状态后端。

在使用 RocksDB 时,状态大小只受限于磁盘可用空间的大小。这也使得 RocksDBStateBackend 成为管理超大状态的比较好的选择。使用 RocksDB 的权衡点在于所有状态的访问和检索都需要序列化(或反序列化)才能跨越 JNI 边界。与上面提到的堆上后端相比,这可能会影响应用程序的吞吐量。

状态后端的使用

rocksDB checkpoint常见配置:

 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().enableUnalignedCheckpoints();
//         使用RocksDB 作为状态后端
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));    //true 表示增量checkpoint
        env.getCheckpointConfig().setCheckpointStorage("hdfs:///user/fycmb/flink/RocksDB/news");
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 3 * 1000));

Flink 状态持久化

首选,Flink 的状态最终都要持久化到第三方存储中,确保集群故障或者作业挂掉后能够恢复。RocksDBStateBackend 持久化策略有两种:

全量持久化策略 RocksFullSnapshotStrategy

增量持久化策略 RocksIncementalSnapshotStrategy

1、全量持久化策略每次将全量的 State 写入到状态存储中(HDFS)。内存型、文件型、RocksDB 类型的 StataBackend 都支持全量持久化策略。

60a6bcefe26f4b118e50f46e4d0afd1d.png

 
2
、增量持久化策略

增量持久化就是每次持久化增量的 State只有 RocksDBStateBackend 支持增量持久化。


Flink 增量式的检查点以 RocksDB 为基础RocksDB 是一个基于 LSM-Tree KV 存储。新的数据保存在内存中, 称为 memtable。如果 Key 相同,后到的数据将覆盖之前的数据,一旦 memtable 写满了,RocksDB 就会将数据压缩并写入磁盘。memtable 的数据持久化到磁盘后,就变成了不可变的 sstable


因为 sstable 是不可变的,Flink 对比前一个检查点创建和删除的 RocksDB sstable 文件就可以计算出状态有哪些发生改变。为了确保 sstable 是不可变的,Flink 会在 RocksDB 触发刷新操作强制将memtable 刷新到磁盘上 。Flink 执行检查点时,会将新的 sstable 持久化到HDFS 中,同时保留引用。这个过程中 Flink 并不会持久化本地所有的 sstable,因为本地的一部分历史 sstable 在之前的检查点中已经持久化到存储中了,只需增加对 sstable 文件的引用次数就可以。


RocksDB 会在后台合并 sstable 并删除其中重复的数据。然后在 RocksDB 删除原来的 sstable,替换成新合成的 sstable.新的 sstable 包含了被删除的sstable中的信息,通过合并历史的 sstable 会合并成一个新的 sstable,并删除这些历史sstable. 可以减少检查点的历史文件,避免大量小文件的产生。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
9月前
|
存储 IDE Java
Flink---12、状态后端(HashMapStateBackend/RocksDB)、如何选择正确的状态后端
Flink---12、状态后端(HashMapStateBackend/RocksDB)、如何选择正确的状态后端
|
存储 Java API
Flink 状态清除的演进之路
对于流计算程序来说,肯定会用到状态(state),假如状态不自动清除,并且随着作业运行的时间越来越久,就会累积越多越多的状态,就会影响任务的性能,为了有效的控制状态的大小,Flink从1.6.0开始引入了状态的生存时间(TTL)功能,这样就可以实现自动清理状态,控制状态的大小.本文主要介绍一下Flink从1.6.0开始到1.9.1的状态清理不断的演进之路. Flink1.6.0状态清除 Apache Flink 的 1.6.0 版本引入了状态生存时间特性。它使流处理应用程序的开发人员能够配置算子的状态,使其在定义的生存时间超时后被清除。
|
BI Apache 流计算
Apache Flink 概念介绍:有状态流式处理引擎的基石(二)| 学习笔记
快速学习 Apache Flink 概念介绍:有状态流式处理引擎的基石。
208 0
Apache Flink 概念介绍:有状态流式处理引擎的基石(二)| 学习笔记
|
存储 缓存 安全
eBay:Flink的状态原理讲一下……
eBay:Flink的状态原理讲一下……
194 0
eBay:Flink的状态原理讲一下……
|
存储 流计算
|
存储 传感器 缓存
【Flink】(七)状态管理
【Flink】(七)状态管理
477 0
【Flink】(七)状态管理
|
SQL 消息中间件 存储
Flink 源码:广播流状态源码解析
Broadcast State 是 Operator State 的一种特殊类型。它的引入是为了支持这样的场景: 一个流的记录需要广播到所有下游任务,在这些用例中,它们用于在所有子任务中维护相同的状态。然后可以在处理第二个流的数据时访问这个广播状态,广播状态有自己的一些特性。 必须定义为一个 Map 结构。
Flink 源码:广播流状态源码解析
|
测试技术 API 数据库
Flink 通过 State Processor API 实现状态的读取和写入
在 1.9 版本之前,Flink 运行时的状态对于用户来说是一个黑盒,我们是无法访问状态数据的,从 Flink-1.9 版本开始,官方提供了 State Processor API 这让用户读取和更新状态成为了可能,我们可以通过 State Processor API 很方便的查看任务的状态,还可以在任务第一次启动的时候基于历史数据做状态冷启动。从此状态对于用户来说是透明的。下面就来看一下 State Processor API 的使用。
Flink 通过 State Processor API 实现状态的读取和写入
|
SQL 消息中间件 存储
字节跳动 Flink 状态查询实践与优化
字节跳动基础架构工程师马越在 FFA 2021 的演讲。
字节跳动 Flink 状态查询实践与优化
|
消息中间件 存储 SQL
【实时数仓篇】(04)利用 Flink 实现实时状态复用场景
【实时数仓篇】(04)利用 Flink 实现实时状态复用场景
336 0
【实时数仓篇】(04)利用 Flink 实现实时状态复用场景