Flink 专题 -2 Checkpoint、Savepoint 机制

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在新的大数据计算框架flink中关于 checkpoint 和 savepoint 相关的设置、配置和注意事项 ,希望可以帮助到大家。

CheckPoint

1. checkpoint 保留策略

默认情况下,checkpoint 不会被保留,取消程序时即会删除他们,但是可以通过配置保留定期检查点,根据配置 当作业失败或者取消的时候 ,不会自动清除这些保留的检查点 。
java :

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

ExternalizedCheckpointCleanup 可选项如下:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作业时保留检查点。请注意,在这种情况下,您必须在取消后手动清理检查点状态。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作业时删除检查点。只有在作业失败时,检查点状态才可用。

2. Checkpoint 配置

与SavePoint 类似 ,checkpoint 保留的是元数据文件和一些数据文件
默认情况下checkpoint 只保留 一份最新数据,如果需要进行checkpoint数据恢复 ,可以通过全局设置的方式设置该集群默认的checkpoint 保留数,以保证后期可以从checkpoint 点进行恢复 。 同时为了 及时保存checkpoint状态 还需要在服务级别设置 checkpoint 检查点的 备份速度 。
全局配置:
flink-conf.yaml

// 设置 checkpoint全局设置保存点  
state.checkpoints.dir: hdfs:///checkpoints/
// 设置checkpoint 默认保留 数量  
state.checkpoints.num-retained: 20

注意 如果将 checkpoint保存在hdfs 系统中 , 需要设置 hdfs 元数据信息 : fs.default-scheme:
服务级别设置:
java:

// 设置 checkpoint 保存目录  
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
// 设置checkpoint 检查点间隔时间  
env.enableCheckpointing(5000);

提交任务之后 job 界面 和 hdfs 界面

  • 通过页面可以看出 checkpoint 备份方式是每5秒执行一次 ,保存当前所有task 状态元信息 和状态信息 。
  • hdfs 信息 保存 jobId 为 0171897fa809692093b4a9b223cb35e4 最新的 20次 checkpoint 信息
    image

image

image

3. Checkpoint 状态点恢复

因为 flink checkpoint 目录 分别对应的是 jobId , 每通过 flink run 方式 / 页面提交方式 都会重新生成 jobId ,那么如何通过checkpoint 恢复 失败任务或者重新执行保留时间点的 任务?

flink 提供了 在启动 之时 通过设置 -s 参数指定checkpoint 目录 , 让新的jobId 读取该checkpoint 元文件信息和状态信息 ,从而达到指定时间节点启动 job 。启动方式如下 :

./bin/flink -s /flink/checkpoints/0171897fa809692093b4a9b223cb35e4/chk-50/_metadata  -p  @Parallelism -c @Mainclass @jar  

Savepoint

Savepoint 介绍

Savepoint是通过Flink的检查点机制创建的流作业执行状态的一致图像。您可以使用Savepoints来停止和恢复,分叉或更新Flink作业。保存点由两部分组成:稳定存储(例如HDFS,S3,...)上的(通常是大的)二进制文件和(相对较小的)元数据文件的目录。稳定存储上的文件表示作业执行状态图像的净数据。Savepoint的元数据文件以(绝对路径)的形式包含(主要)指向作为Savepoint一部分的稳定存储上的所有文件的指针。

savepoint 和 checkpoint 区别

从概念上讲,Flink的Savepoints与Checkpoints的不同之处在于备份与传统数据库系统中的恢复日志不同。检查点的主要目的是在意外的作业失败时提供恢复机制。Checkpoint的生命周期由Flink管理,即Flink创建,拥有和发布Checkpoint - 无需用户交互。作为一种恢复和定期触发的方法,Checkpoint实现的两个主要设计目标是:i)being as lightweight to create (轻量级),ii)fast restore (快速恢复) 。针对这些目标的优化可以利用某些属性,例如,JobCode在执行尝试之间不会改变。

与此相反,Savepoints由用户创建,拥有和删除。他们的用例是planned (计划) 的,manual backup( 手动备份 ) 和 resume(恢复) 。例如,这可能是您的Flink版本的更新,更改您的Job graph ,更改 parallelism ,分配第二个作业,如红色/蓝色部署,等等。当然,Savepoints必须在终止工作后继续存在。从概念上讲,保存点的生成和恢复成本可能更高,并且更多地关注可移植性和对前面提到的作业更改的支持。

Assigning Operator IDs ( 分配 operator ids)

为了能够在将来升级你的程序在本节中描述。主要的必要更改是通过该uid(String)方法手动指定操作员ID 。这些ID用于确定每个运算符的状态。
java:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

如果您未手动指定ID,则会自动生成这些ID。只要这些ID不变,您就可以从保存点自动恢复。生成的ID取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些ID。

Savepoint State

触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制此目录的位置

保存Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]
这将触发具有ID的作业的保存点:jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

在yarn 集群中保存Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
这将触发具有ID :jobId和YARN应用程序ID 的作业的保存点:yarnAppId,并返回创建的保存点的路径。

使用 Savepoint 取消job

$ bin/flink cancel -s [:targetDirectory] :jobId
这将以原子方式触发具有ID的作业的保存点:jobid并取消作业。此外,您可以指定目标文件系统目录以存储保存点。该目录需要可由JobManager和TaskManager访问。

Resuming Savepoint

$ bin/flink run -s :savepointPath [:runArgs]
这将提交作业并指定要从中恢复的保存点。您可以指定保存点目录或_metadata文件的路径。

允许未恢复状态启动

$ bin/flink run -s :savepointPath -n [:runArgs]
默认情况下,resume操作将尝试将保存点的所有状态映射回要恢复的程序。如果删除了运算符,则可以通过--allowNonRestoredState(short -n:)选项跳过无法映射到新程序的状态:

全局配置

您可以通过state.savepoints.dir 配置文件设置默认savepoint 位置 。触发保存点时,此目录将用于存储保存点。您可以通过使用触发器命令指定自定义目标目录来覆盖默认值(请参阅:targetDirectory参数)。
flink-conf.yaml

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints

参考地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
http://ju.outofmemory.cn/entry/370841

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
113 3
|
5月前
|
容灾 流计算
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
|
5月前
|
存储 数据处理 Apache
超越传统数据库:揭秘Flink状态机制,让你的数据处理效率飞升!
【8月更文挑战第26天】Apache Flink 在流处理领域以其高效实时的数据处理能力脱颖而出,其核心特色之一便是状态管理机制。不同于传统数据库依靠持久化存储及 ACID 事务确保数据一致性和可靠性,Flink 利用内存中的状态管理和分布式数据流模型实现了低延迟处理。Flink 的状态分为键控状态与非键控状态,前者依据数据键值进行状态维护,适用于键值对数据处理;后者与算子实例关联,用于所有输入数据共享的状态场景。通过 checkpointing 机制,Flink 在保障状态一致性的同时,提供了更适合流处理场景的轻量级解决方案。
73 0
|
5月前
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
|
5月前
|
存储 调度 流计算
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
|
5月前
|
存储 缓存 数据处理
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
|
5月前
|
存储 分布式计算 算法
Flink四大基石——4.Checkpoint容错机制
Flink四大基石——4.Checkpoint容错机制
113 1
|
5月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何从savepoint重新启动作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决