1.State Vs Checkpoint
State:
维护/存储的是某一个Operator的运行的状态/历史值,是维护在内存中!
一般指一个具体的Operator的状态(operator的状态表示一些操作/算子在运行的过程中会产生的一些历史结果,如前面的maxBy底层会维护当前的最大值,也就是会维护一个keyedOperator,这个State里面存放就是maxBy这个Operator中的最大值)
State数据默认保存在Java的堆内存中/TaskManage节点的内存中
State可以被记录,在失败的情况下数据还可以恢复
Checkpoint:
某一时刻,Flink中所有的Operator的当前State的全局快照,一般存在磁盘上
表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有Operator的状态
可以理解为Checkpoint是把State数据定时持久化存储了
比如FlinkKafkaConsumer算子中维护的Offset状态,当任务重新恢复的时候可以从Checkpoint中获取Checkpoint就是State的全局的分布式快照
- 注意:
Flink中的Checkpoint底层使用了Chandy-Lamport algorithm分布式快照算法可以保证数据的在分布式环境下的一致性!
https://zhuanlan.zhihu.com/p/53482103
Chandy-Lamport algorithm算法的作者也是ZK中Paxos 一致性算法的作者
https://www.cnblogs.com/shenguanpu/p/4048660.html
Flink中使用Chandy-Lamport algorithm分布式快照算法取得了成功,后续Spark的StructuredStreaming也借鉴了该算法
2.Checkpoint的执行流程
JobManager创建CheckpointCoordinator,并根据设置的Checkpoint时间间隔,向SourceOperator发送Barrier栅栏/其实就是告诉SourceOperator要将State进行快照/进行Checkpoint的命令!
SourceOperator接收到Barrier会暂停当前的工作,并异步调用API将当前的State状态数据保存到指定位置,一般为HDFS,并和CheckpointCoordinator确认已经完成Checkpoint操作,同时将Barrier发送给下游的TransformationOperator,同时恢复自己的工作!
下游的TransformationOperator接收到Barrier,同样也暂停当前的工作,并异步调用API将当前的State状态数据保存到指定位置,一般为HDFS,并和CheckpointCoordinator确认已经完成Checkpoint操作,同时将Barrier发送给下游,同时恢复自己的工作!
直到Barrier被发送给SinkOperator,SinkOperator同样也暂停当前的工作,并异步调用API将当前的State状态数据保存到指定位置,一般为HDFS,并和CheckpointCoordinator确认已经完成Checkpoint操作,同时恢复自己的工作!
CheckpointCoordinator接收到所有的Operator的确认消息,那么本次Checkpoint结束!如果有没有收到的,超时之后可以认为失败,Checkpoint失败可以让任务失败也可以不管,直接进行下一次Checkpoint!这些都可以通过配置参数来实现!
注意:
1.在往介质(如HDFS)中写入快照数据的时候是异步的(为了提高效率)
2.分布式快照执行时的数据一致性由Chandy-Lamport algorithm分布式快照算法保证!
3.State状态后端/State存储介质
注意:
前面学习了Checkpoint其实就是Flink中某一时刻,所有的Operator的State的全局快照,
那么快照应该要有一个地方进行存储,而这个存储的地方叫做状态后端
Flink中的State状态后端有很多种:
MemStateBackend[了解]
FsStateBackend–一般情况使用
RocksDBStateBackend—超大状态使用
三种状态存储介质的配置方式
全局配置
修改flink-conf.yaml #这里可以配置 #jobmanager(即MemoryStateBackend), #filesystem(即FsStateBackend), #rocksdb(即RocksDBStateBackend) state.backend: filesystem state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
代码配置
/1.MemoryStateBackend--开发中不用 env.setStateBackend(new MemoryStateBackend) //2.FsStateBackend--开发中可以使用--适合一般状态--秒级/分钟级窗口... env.setStateBackend(new FsStateBackend("hdfs路径或测试时的本地路径")) //3.RocksDBStateBackend--开发中可以使用--适合超大状态--天级窗口... env.setStateBackend(new RocksDBStateBackend(filebackend, true)) 注意:RocksDBStateBackend还需要引入RocksDB依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.7.2</version> </dependency>
总结:
后面的学习测试和开发都使用:
env.setStateBackend(new FsStateBackend(“hdfs路径或测试时的本地路径”))
特殊情况下的超大状态用:
env.setStateBackend(new RocksDBStateBackend(filebackend, true))