Flink checkpoint(一)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink checkpoint。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink checkpoint(一)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10044


Flink checkpoint(一)

 

内容简介:

一、Checkpoint 与 state 的关系

二、什么是 state

三、如何在 Flink 中使用 state

四、Checkpoint 的执行机制

五、回答部分问题

 

一、Checkpoint 与 state 的关系

Checkpoint 是一个动词,它最终如果顺利会产生一个 complete Checkpoint ,它是一个动词,也就是在下图中会看到红框里面一共有 Trigger 了569027,没有 Failed ,如下图:

image.png

State 其实就是 Checkpoint 所必备的主要的数据,下图中可以看到官方中 state 的 size 是9.17 KB ,是很小的,如下图:

image.png

 

二、什么是 state

(1)state 的含义

先看一段代码:

env.socketTextStream(localhost,9000)

//split up the lines in pairs (2-tuples)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field 1

.keyBy(0).sum(1)

.print();

这是一段非常直白的 Wellcome 的代码,如果执行上述代码,它会去监控本地的9000端口的数据,然后本地启动 netcat ,如果输入 hello world ,执行程序会自动输出什么?

之前提到它是 Wellcome ,便会输出 hello 1 、 world 1 ,那么如果再次输入 hello world  执行程序会输出什么?

hello a 、 world a ,为什么第二次流式计算,

第一次 hello world 来的时候就已经把结果输入进去了,

为什么第二次 hello world 来的时候 Flink 知道它已经看到过一次 hello world ,这就是 state 的作用, 

因为它把这个存储在 state 里面,它知道 hello 和 world 分别出现过一次, State 我们可以认为是流式计算中持久化的状态,可以看到数据输入进来,最后在每个地方会存储到 state backend 里面,  state backend 就是存储

state 的,如下图:

image.png

(2)state的分类

①Keyed State

在 Flink 中如果对 state 进行分类,

有这样两类:一个是 Keyed State ,它只能应用于 KeyedStream 的函数与操作中,它是已经划分好的,换言之每个 Key 只能属于某一个 Keyed State ,刚才实例中提到的的 hello world 的中的 hello 在作业没有发生变化的情况下,只要它一直在正常的运行中,那么 hello 永远只会出现在某一个State 的并发上面,不会去其他地方,再来看这段

word count 的代码:

env.socketTextStream(localhost,9000)

//split up the lines in pairs (2-tuples)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field 1

.keyBy(0).sum(1)

.print();

其中的 keyBy 便是之前提到的使用 KeyedStream ,要创建KeyedStream ,并要对 key 进行划分,不同的 task 上不会出现相同的 key ,其中 sum 是调用内置的 StreamGroupedReduce UD 进行一个累积的计算。

在进一步讲之前先看一下下图中 keyBy  左边有三个并发,右边也是三个并发,于是左边的词进来之后用 keyBy ,并会根据 key 进行自分发,如下图:

image.png

可以看到 hello world  中的 hello 永远只会去到右下方并发路task 上面去,这个就是 keyBy 语。

②Operator State

另外一个 state 是 Operator State ,这个不需要作用在 KeyStream ,换言之无需 keyBy 也能拿到它,它用每一个 operator state 都仅与一个 operator 的实例绑定, Operator State 中比较常见的是 source state ,例如记录当前 source 的 offset ,

再来看一段 word count 代码 :

env.fromElements(WordCountData.WORDS)

//split up the lines in pairs (2-tuples) containing: (word,1)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field 1

.keyBy(0).sum(1)

.print();

可以看到 env 后面的 Function 换成了 fromElements ,这个源码里面是 Operator State ,

它会调用内置的 romElementsFunction ,以下这段代码就是源码里面的一部分,

如下:

public class FromElementsFunction implements SourceFunction, CheckpointedFunction { 

private transient ListState checkpointedState;

public FromElementsFunction(TypeSerializer serializer,T...elments) throws I0Exception {

this(serializer,Arrays.asList(elements));

}

可以看到它用到了 ListState 和 checkpointState ,这个就存储了相关的 Operator State 信息。

下面这个图是整个 State 负类子类的关系,这个比较黄的词是平常中经常用到的 State ,那么有个问题来了 这些 State 是类的定义,它们哪些属于 Key State ?

哪些属于 Operator State ?

比如是像 ValueState 代表说是 key 所对应的value 是单值 value , MapState 的 key 所对应的是 value 是map, ListState 的 key 所对应的 value 是 list ,

如下图:

image.png

看看其中哪些是 key 的 state ?即 ValueState 、 MapState 、ListState ,说明一下AggregatingState 、 ReducingState、 FoldingState 其实都是对 ValueState 的一个封装,上面加了一个function,那么哪些是 Operator State ?

即 ListState BroadcastState 。

③Managed 和 Raw

除了 key 和 Operator 一个维度来区分 State 还有一个维度,称之为 Managed 和 Raw  ,其中 Managed State 是由 Flink 管理的state ,刚才举例的所有 state 均是 managed ;

而 Raw State是Flink 仅提供 stream 可以进行存储数据,从它的角度来看这些可能只是一些 bytes , 比如下面的这段方法,就是相关的 Raw State的东西,

如下:

public interface StateSnapshotContext extends FunctionSnapshotContext {

/**

* Returns an output stream for keyed state

*/

KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception;

/**

* Returns an output stream for operator state

*/

OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception;


三、如何在 Flink 中使用 State

先来看这段 word count 代码:

env.fromElements(WordCountData.WORDS)

//split up the lines in pairs (2-tuples) containing: (word,1)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field1

.keyBy(0).sum(1)

.print();

之前sum 里面调用了内置的 StreamGroupedReduce

再来看看这段源码:

public class StreamGroupedReduce extends AbstractUdfStreamOperator>

implements OneInputStreamOperator {

private transient ValueState values;

@Override

public void open() throws Exception {

Super.open():

ValueStateDescriptor stated = new valueStateDescriptor<>(STATE_NAME, serializer);

values = getRuntimeContext().getState(stateId);

}

@Override

public void processElement(StreamRecord element) throws Exception {

IN value = element.getValue():

IN currentValue = values.value():

if (currentValue != null) {

IN reduce = userFunction.reduce(currentValue, value);

values.update(reduceed);

output.collect(element.replace(reduced));

} else {

value.update(value):

output.collect(element.replace(value));

}

}

可以看到里面有个 ValueState ,它就是 StreamGroupedReduce 使用 state 的地方 open 方法里面可以对它进行初始化

通过 RuntimeContext 去访问 state如果你想使用它就在processElement 中使用包括 valueupdate 则是对 state 进行更新

另外一方面如果在 state 中使用 Operator State同样使用内置的 FromElementsFunction我们以源码的方式去展示一下

如下:

public class FromElementsFunction implements SourceFunction, CheckpointedFunction {

private transient ListState checkpointedState

@Override

public void initializeState(FunctionInitializationContext context) throws Exception {

Precondition.checkState(this.checkpointedState == null,

The + getClass().gesSimpleName() + has already been initialized.);

this.checkpointedState = context.getOperatorStateStore().getLiState(

new ListStateDescriptor<>(from-elements-state,IntSerializer,INSTANCE)

);

if (context.isRestored()) {

List retrievedStates = new ArrayList<>():

for (Integer entry :this.checkpointedState.get()){

retrievedStates.add(entry);

//given that the parallelism of the function is1,we can only have 1 state

Preconditions.checkArgument(retrievedStates,sze() == 1,getClass().getSimpleName() +retrieved invalid state.);

this.numElementsToSkip = retrievedStates.get(0);

}

}

@Override

public void snapshotState(FunctionSnapshotContextcontext) throws Exception {

Preconditions.checkState(this.checkpointedState != null,

The + getClass().getSimpleName() + has not been properly initialized.);

this.checkpointedState.clear();

this.checkpointedState.add(this.numElementsEmitted);

}

要访问它对 Operator State 进行显示的初始化,即这个 list 里面需要去存储需要去初始化是需要显示声明的换言之需要继承接口来显示的对进行初始化和存储checkpointedState就是这里使用的 Operator State可以在另外的方法里面

比如在 process 里就可以对此进行读写更新的这个逻辑实际上就是每次在snapshot 时把内存里面的 numElementsEmitted 加载到 ListState里面相当于就是说下次如果发生 failed会直接从 numElementsEmitted 进行恢复

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
19天前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
|
23天前
|
存储 分布式数据库 Apache
【Flink】Flink的Checkpoint 存在哪里?
【4月更文挑战第19天】【Flink】Flink的Checkpoint 存在哪里?
|
2月前
|
JSON Java API
Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
【2月更文挑战第17天】Flink CDC 2.0 支持全量故障恢复,可以从 checkpoint 点恢复。
54 3
|
3月前
|
SQL JSON Java
Flink数据问题之checkpoint数据删除失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
消息中间件 Java Kafka
Flink背压问题之checkpoint超时如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
消息中间件 存储 机器人
Flink执行问题之执行checkpoint失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
存储 SQL canal
Flink CDC数据同步问题之同步数据到checkpoint失败如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
4月前
|
存储 数据处理 数据库
Flink CDC里不开启checkPoint有什么影响吗?
【1月更文挑战第23天】【1月更文挑战第112篇】Flink CDC里不开启checkPoint有什么影响吗?
45 6
|
5月前
|
存储 流计算
在Flink CDC中,Checkpoint的清理策略通常有两种设置方式
在Flink CDC中,Checkpoint的清理策略通常有两种设置方式
82 5
|
5月前
|
分布式计算 资源调度 Hadoop
Hadoop学习笔记(HDP)-Part.18 安装Flink
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
122 2
Hadoop学习笔记(HDP)-Part.18 安装Flink