Flink(十二)【容错机制】(2)https://developer.aliyun.com/article/1532255
3. 通用增量 checkpoint(changelog)
在 Flink 1.15 之前,只有 RocksDB 支持增量快照。 不同于产生一个包含所有数据的全量备份,增量快照只包含自上一次快照完成后被修改的记录,因此可以显著减少快照完成的耗时。
RocksDB 状态后端启用增量 checkpoint:
从 Flink 1.15 开始,不管是 hashmap 还是 rocksdb 状态后端都可以通过开启 changelog 实现通用的增量 checkpoint。
我们可以在 Flink 官网看到对 增量快照的解释:
执行过程:
1. 有状态的算子任务将状态更改写入变更日志:
这里的 Stateful Changelog 就是变更日志,它记录了一些操作,比如原本的检查点数据为 1,2,3 现在变为了 1,2,3,4 它就会记录 +4 ,代表增加了一个数据 4。
State Table 就是操作后的状态(但它不是 checkpoint)。
Stateful Changelog 会实时同步到检查点存储当中。
2. 状态物化:状态表定期保存,独立于检查点
状态表默认 10 分钟保存一次,可以在配置文件中指定。状态表不是存在检查点的,而是独立于检查点之外的其他地方。
3. 状态物化后,状态变更日志就可以被截断到相应的点
所谓截断就是清理历史的状态操作日志,用新的操作日志替换。
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem # 存储 changelog 数据 dstl.dfs.base-path: hdfs://hadoop102:8020/changelog execution.checkpointing.max-concurrent-checkpoints: 1 execution.savepoint.restore-mode: CLAIM
注意事项
目前为实验性功能,开启后可能会造成资源消耗巨大:
- HDFS 上保存的文件数过多
- 消耗更多的 IO 带宽用于上传变更日志
- 更多的 CPU 用于序列化状态更改
- TaskManager 使用更多内存来缓存状态更改
使用限制:
- checkpoint 的最大并发数必须为1
- 从 Flink 1.15 开始,只有文件系统的存储类型实现可用(memory 内存存储还在测试阶段)
- 不支持 NO_CLAIM 模式
使用方式:
1)配置文件指定:
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem # 存储 changelog 数据 dstl.dfs.base-path: hdfs://hadoop102:8080/changelog execution.checkpointing.max-concurrent-checkpoints: 1 executopn.savepoint-restore-mode: CLAIM
2)代码中设置
引入依赖(打包的时候是不需要打包进去的):
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-changelog</artifactId> <version>${flink.version}</version> <scope>runtime</scope> </dependency>
开启 changelog:
// 开启 cheangelog 需要设置检查点的最大并发为 1 checkpointConfig.setMaxConcurrentCheckpoints(1); env.enableChangelogStateBackend(true);
4. 最终检查点
如果数据源是有界的,就可能出现部分 task 已经处理完所有数据变成finished的状态,不继续工作。从 Flink 1.14开始这些 finished 状态的 task,也可以继续执行检查点。自 1.15 起默认启用此功能,并且可以通过功能标志禁用它(一般我们肯定是不希望关掉的):
Configuration conf = new Configuration(); // 从 Flink1.15 开始默认启用(true) conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,false); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
1.5、保存点(Savepoint)
除了检查点(checkpoint)外,Flink 还提供了另一个非常独特的镜像保存功能——保存点(Savepoint)。
从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据。
事实上,保存点就是通过检查点的机制来创建流式作业状态的一致性镜像(consistent image)的。保存点中的状态快照,是以算子 ID 和状态名称组织起来的,相当于一个键值对。从保存点启动应用程序时,Flink 会将保存点的状态数据重新分配给相应的算子任务。
1. 保存点的用途
保存点与检查点最大的区别,就是触发的时机(检查点就像CSDN草稿的自动保存,而保存点就像我们手动的保存草稿)。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点,然后停止应用,做一些处理调整之后再从保存点重启。它适用的具体场景有:
- 版本管理和归档存储:对重要的节点进行手动备份,设置为某一版本,归档(archive)存储应用程序的状态。
- 更新 Flink 版本:目前 Flink 的底层架构已经非常稳定,所以当 Flink 版本升级时,程序本身一般是兼容的。这时不需要重新执行所有的计算,只要创建一个保存点,停掉应用、升级 Flink 后,从保存点重启就可以继续处理了。
- 更新应用程序:我们不仅可以在应用程序不变的时候,更新 Flink 版本;还可以直接更新应用程序。前提是程序必须是兼容的,也就是说更改之后的程序,状态的拓扑结构和数据类型都是不变的,这样才能正常从之前的保存点去加载。这个功能非常有用。我们可以及时修复应用程序中的逻辑 bug,更新之后接着继续处理;也可以用于有不同业务逻辑的场景,比如 A/B 测试等等。
- 调整并行度:如果应用运行的过程中,发现需要的资源不足或已经有了大量剩余,也可以通过从保存点重启的方式,将应用程序的并行度增大或减小。
- 暂停应用程序:有时候我们不需要调整集群或者更新程序,只是单纯地希望把应用暂停、释放一些资源来处理更重要的应用程序。使用保存点就可以灵活实现应用的暂停和重启,可以对有限的集群资源做最好的优化配置。
需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构(比如原先是 source —> map ——> sum——>sink 之后变成了 source —> map ——> process——>sink 那么愚笨sum中的状态肯定不在了,因为这条算子链的结构已经变了)和数据类型(比如原本sum中存的是 ValueState 类型 之后变成了 MapState,这种也恢复不了)不变。我们知道保存点中状态都是以算子 ID-状态名称这样的 key-value 组织起来的,算子ID 可以在代码中直接调用 SingleOutputStreamOperator 的.uid()方法来进行指定:
DataStream<String> stream = env .addSource(new StatefulSource()) .uid("source-id") .map(new StatefulMapper()) .uid("mapper-id") .print();
对于没有设置 ID 的算子,Flink 默认会自动进行设置,所以在重新启动应用后可能会导致ID 不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定 ID。
2. 使用保存点
保存点的使用非常简单,我们可以使用命令行工具来创建保存点,也可以从保存点恢复作业。
(1)创建保存点
要在命令行中为运行的作业创建一个保存点镜像,只需要执行:
bin/flink savepoint :jobId [:targetDirectory]
这里 jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点存储的路径。
对于保存点的默认路径,可以通过配置文件 flink-conf.yaml 中的 state.savepoints.dir 项来设定:
state.savepoints.dir: hdfs:///flink/savepoints
当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:
bin/flink stop --savepointPath [:targetDirectory] :jobId
(2)从保存点重启应用
我们已经知道,提交启动一个 Flink 作业,使用的命令是 flink run;现在要从保存点重启一个应用,其实本质是一样的:
bin/flink run -s :savepointPath [:runArgs]
这里只要增加一个-s 参数,指定保存点的路径就可以了,其他启动时的参数还是完全一样的,如果是基于 yarn 的运行模式还需要加上 -yid application-id。在使用 web UI 进行作业提交时,可以填入的参数除了入口类、并行度和运行参数,还有一个“Savepoint Path”,这就是从保存点启动应用的配置。
3. 使用保存点恢复状态以及切换状态后端
Flink1.17 版本还提供了使用保存点切换状态后端,比如我们原本是 rocksdb 状态后端,想改成 hashmap 状态后端。也就是使用 savepoint 恢复状态的时候,去更换状态后端。需要注意的是,不要在代码中指定状态后端了,通过配置文件来配置或者 -D 参数配置。
// yarn 模式 bin/flink run-application -d -t yarn-application -Dstate.backend=hashmap -c com.lyh.test.wc xxx.jar // 不使用 yarn 模式 bin/flink run-application -d -Dstate.backend=hashmap -c com.lyh.test.wc xxx.jar
关闭程序并指定状态保存点:
我们这里用的是 cancel ,但其实我们更加推荐用 stop。
这次我们再次重启应用,并指定保存点:
为了再次验证是否是从保存点恢复,我们在 netcat 中输入 a (之前已经统计过一次了,如果保存点成功导入,结果将会输出 (a,2))
修改状态后端(这里我没有用 yarn 模式):
bin/flink run-application -d -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/sp/savepoint-7ca51f-5e1c4815e549 -C com.lyh.checkpoint.SavepointDemo.class ./jobs/FlinkStudy-1.0-SNAPSHOT.jar
如果是yarn模式直接加一个 -t yarn-application 就好了。
4. 使用 checkpoint 恢复状态
同样,也可以从 checkpoint 来进行状态的恢复,但是注意,使用 checkpoint恢复的话不能切换状态后端,但是恢复命令还是一样的,都是指定恢复路径来重启应用:
// 路径必须指定到 checkpoint 的 chk-id 目录 bin/flink run-application -d -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/chk/22516b874d99d1478983a9a5b248c6bf/chk-175 -C com.lyh.checkpoint.SavepointDemo.class ./jobs/FlinkStudy-1.0-SNAPSHOT.jar
我们说不能切换状态后端,但是这里指定 -Dstate.backend=rocksdb 并不影响,因为作业本来现在状态本来就是rocksdb 的。
2、状态一致性
2.1、一致性的概念和级别
在分布式系统中,一致性(consistency)是一个非常重要的概念;在事务(transaction)中,一致性也是重要的一个特性。Flink 中一致性的概念,主要用在故障恢复的描述中,所以更加类似于事务中的表述。那到底什么是一致性呢?
简单来讲,一致性其实就是结果的正确性,一般从数据丢失、数据重复角度来评估。对于分布式系统而言,强调的是不同节点中相同数据的副本应该总是“一致的”,也就是从不同节点读取时总能得到相同的值;而对于事务而言,是要求提交更新操作后,能够读取到新的数据。对于 Flink 来说,多个节点并行处理不同的任务,我们要保证计算结果是正确的,就必须不漏掉任何一个数据,而且也不会重复处理同一个数据。流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”。
一般说来,状态一致性有三种级别:
⚫ 最多一次(At-Most-Once)
就是说数据最多只处理一次,不管之后故没故障,丢没丢掉,数据只来一遍。对于 Flink 而言,不开启 checkpoint 就是最多一次。
当任务发生故障时,最简单的做法就是直接重启,别的什么都不干;既不恢复丢失的状态,也不重放丢失的数据。每个数据在正常情况下会被处理一次,遇到故障时就会丢掉,所以就是“最多处理一次”。我们发现,如果数据可以直接被丢掉,那其实就是没有任何操作来保证结果的准确性;所以这种类型的保证也叫“没有保证”。尽管看起来比较糟糕,不过如果我们的主要诉求是“快”,而对近似正确的结果也能接受,那这也不失为一种很好的解决方案。
⚫ 至少一次(AT-LEAST-ONCE)
数据至少处理一次,甚至是多次,所以数据很可能重复处理。对于 Flink 而言,当第一个 Barrier 到达,而其他 Barrier 没有到达时,第一个 Barrier 后面的数据不会等待,而是直接越过 Barrier ,当出现故障需要恢复检查点的时候,会把一些 Barrier 之外的数据(也就是不该恢复的数据)重复处理,这就是至少一次。
在实际应用中,我们一般会希望至少不要丢掉数据。这种一致性级别就叫作“至少一次”(at-least-once),就是说是所有数据都不会丢,肯定被处理了;不过不能保证只处理一次,有些数据会被重复处理。
在有些场景下,重复处理数据是不影响结果的正确性的,这种操作具有“幂等性”。比如,如果我们统计电商网站的 UV,需要对每个用户的访问数据进行去重处理,所以即使同一个数据被处理多次,也不会影响最终的结果,这时使用 at-least-once 语义是完全没问题的。当然,如果重复数据对结果有影响,比如统计的是 PV,或者之前的统计词频 word count,使用at-least-once 语义就可能会导致结果的不一致了。为了保证达到 at-least-once 的状态一致性,我们需要在发生故障时能够重放数据。最常见的做法是,可以用持久化的事件日志系统,把所有的事件写入到持久化存储中。这时只要记录一个偏移量,当任务发生故障重启后,重置偏移量就可以重放检查点之后的数据了。Kafka 就是这种架构的一个典型实现。
⚫ 精确一次(EXACTLY-ONCE)
第一个 Barrier 到达后,Barrier 后面的数据必须老老实实等着,等到所有 Barrier 都对齐之后才进行持久化,持久化完其他数据才能继续处理。或者非 Barrier 对齐情况下,第一个 Barrier 到达后直接跳到输出缓冲区继续往下游传递,把第一个Barrier 和其他Barrier 之间的数据都进行标记。这两种都是精确一次。
最严格的一致性保证,就是所谓的“精确一次”(exactly-once,有时也译作“恰好一次”)。这也是最难实现的状态一致性语义。exactly-once 意味着所有数据不仅不会丢失,而且只被处理一次,不会重复处理。也就是说对于每一个数据,最终体现在状态和输出结果上,只能有一次统计。exactly-once 可以真正意义上保证结果的绝对正确,在发生故障恢复后,就好像从未发生过故障一样。很明显,要做的 exactly-once,首先必须能达到 at-least-once 的要求,就是数据不丢。所以同样需要有数据重放机制来保证这一点。另外,还需要有专门的设计保证每个数据只被处理一次。Flink 中使用的是一种轻量级快照机制——检查点(checkpoint)来保证 exactly-once 语义。
2.2 端到端的状态一致性
我们已经知道检查点可以保证 Flink 内部状态的一致性,而且可以做到精确一次(exactly-once)。那是不是说,只要开启了检查点,发生故障进行恢复,结果就不会有任何问题呢?
没那么简单。在实际应用中,一般要保证从用户的角度看来,最终消费的数据是正确的。而用户或者外部应用不会直接从 Flink 内部的状态读取数据,往往需要我们将处理结果写入外部存储中。这就要求我们不仅要考虑 Flink 内部数据的处理转换,还涉及从外部数据源读取,以及写入外部持久化系统,整个应用处理流程从头到尾都应该是正确的。
所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性,就叫作“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的那一环。一般来说,能否达到 at-least-once 一致性级别,主要看数据源能够重放数据;而能否达到 exactly-once 级别,流处理器内部、数据源、外部存储都要有相应的保证机制。
状态一致性实现难度:At-most-once < At-least-once < Exactly-once 。
Flink(十二)【容错机制】(4)https://developer.aliyun.com/article/1532262