每传入一条数据,有状态的算子任务都会 读取和更新状态 。由于有效的状态访问对于处 理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速 的状态访问。
状态的存储、访问以及维护,由一个 可插入 的组件决定,这个组件就叫做 状态后端 (state backend)
状态后端主要负责两件事:
本地的状态管理
将检查点(checkpoint)状态写入远程存储
(1)状态后端的分类
状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适 的状态后端。
Flink 提供了 3 中状态后端:
MemoryStateBackend
内存级别的状态后端
构造方法
env.setStateBackend(new MemoryStateBackend( "file://"+ baseCheckpointPath, null).configure(conf, classLoader))
数据存储
State 数据存储在TaskManager 内存中
Checkpoint 数据数据存储在jobManager 内存
容量限制
单词State maxStateSize默认为5M
maxStateSize <= akka.framesize默认10M
总大小不能超过JobMananger的内存
默认后端状态管理器
推荐场景:
本地测试
状态比较少的作业
不推荐生产环境中使用
特点:快速, 低延迟, 但不稳定
FsStateBackend
构造方法
env.setStateBackend(new FsStateBackend(tmpPath))
数据存储:
状态数据:TaskManager 内存
Checkpoint:外部文件系统(本地或HDFS)
容量限制:
单个TaskManager上State总量不能超过TM内存
总数据大小不超过文件系统容量
推荐场景:
常规状态作业
窗口时间比较长,如分钟级别窗口聚合,Join等
需要开启HA的作业
可在生产环境中使用
特点:拥有内存级别的本地访问速度, 和更好的容错保证
RocksDBStateBackend
将所有的状态序列化之后, 存入本地的 RocksDB 数据库中.(一种 NoSql 数 据库, KV 形式存储)
创建方法
env.setStateBackend(new RocksDBStateBackend("file://"+ basecheckpointPath).configure(conf,classLoader))
数据存储
State: TaskManager 中的KV数据库(实际使用内存+磁盘)
Checkpoint:外部文件系统(本地或HDFS)
容量限制:
单TaskManager 上 State总量不超过其内存+磁盘大小
单 Key最大容量2G
总大小不超过配置的文件系统容量
推荐场景:
超大状态作业
需要开启HA的作业
对状态读写性能要求不高的作业
生产环境可用
(2)配置状态后端
全局配置状态后端
在 flink-conf.yaml 文件中设置默认的全局后端
在代码中配置状态后端
可以在代码中单独为这个 Job 设置状态后端.
env.setStateBackend(new MemoryStateBackend()); env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/checkpoints/fs"));
如何要使用 RocksDBBackend, 需要先引入依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
env.setStateBackend(newRocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));