开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :Flink checkpoint(一)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10044
Flink checkpoint(一)
内容简介:
一、Checkpoint 与 state 的关系
二、什么是 state
三、如何在 Flink 中使用 state
四、Checkpoint 的执行机制
五、回答部分问题
一、Checkpoint 与 state 的关系
Checkpoint 是一个动词,它最终如果顺利会产生一个 complete Checkpoint ,它是一个动词,也就是在下图中会看到红框里面一共有 Trigger 了569027,没有 Failed ,如下图:
State 其实就是 Checkpoint 所必备的主要的数据,下图中可以看到官方中 state 的 size 是9.17 KB ,是很小的,如下图:
二、什么是 state
(1)state 的含义
先看一段代码:
env.socketTextStream(
“
localhost
”
,9000)
//split up the lines in pairs (2-tuples)
.flatMap(new Tokenizer())
//group by the tuple field
“
0
”
and sum up tuple field
“
1
”
.keyBy(0).sum(1)
.print();
这是一段非常直白的 Wellcome 的代码,如果执行上述代码,它会去监控本地的9000端口的数据,然后本地启动 netcat ,如果输入 hello world ,执行程序会自动输出什么?
之前提到它是 Wellcome ,便会输出 hello 1 、 world 1 ,那么如果再次输入 hello world 执行程序会输出什么?
hello a 、 world a ,为什么第二次流式计算,
第一次 hello world 来的时候就已经把结果输入进去了,
为什么第二次 hello world 来的时候 Flink 知道它已经看到过一次 hello world ,这就是 state 的作用,
因为它把这个存储在 state 里面,它知道 hello 和 world 分别出现过一次, State 我们可以认为是流式计算中持久化的状态,可以看到数据输入进来,最后在每个地方会存储到 state backend 里面, state backend 就是存储
state 的,如下图:
(2)state的分类
①Keyed State
在 Flink 中如果对 state 进行分类,
有这样两类:一个是 Keyed State ,它只能应用于 KeyedStream 的函数与操作中,它是已经划分好的,换言之每个 Key 只能属于某一个 Keyed State ,刚才实例中提到的的 hello world 的中的 hello 在作业没有发生变化的情况下,只要它一直在正常的运行中,那么 hello 永远只会出现在某一个State 的并发上面,不会去其他地方,再来看这段
word count 的代码:
env.socketTextStream(
“
localhost
”
,9000)
//split up the lines in pairs (2-tuples)
.flatMap(new Tokenizer())
//group by the tuple field
“
0
”
and sum up tuple field
“
1
”
.keyBy(0).sum(1)
.print();
其中的 keyBy 便是之前提到的使用 KeyedStream ,要创建KeyedStream ,并要对 key 进行划分,不同的 task 上不会出现相同的 key ,其中 sum 是调用内置的 StreamGroupedReduce UD 进行一个累积的计算。
在进一步讲之前先看一下下图中 keyBy 左边有三个并发,右边也是三个并发,于是左边的词进来之后用 keyBy ,并会根据 key 进行自分发,如下图:
可以看到 hello world 中的 hello 永远只会去到右下方并发路task 上面去,这个就是 keyBy 语。
②Operator State
另外一个 state 是 Operator State ,这个不需要作用在 KeyStream ,换言之无需 keyBy 也能拿到它,它用每一个 operator state 都仅与一个 operator 的实例绑定, Operator State 中比较常见的是 source state ,例如记录当前 source 的 offset ,
再来看一段 word count 代码 :
env.fromElements(WordCountData.WORDS)
//split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(new Tokenizer())
//group by the tuple field
“
0
”
and sum up tuple field
“
1
”
.keyBy(0).sum(1)
.print();
可以看到 env 后面的 Function 换成了 fromElements ,这个源码里面是 Operator State ,
它会调用内置的 romElementsFunction ,以下这段代码就是源码里面的一部分,
如下:
public class FromElementsFunction implements SourceFunction, CheckpointedFunction {
private transient ListState checkpointedState;
public FromElementsFunction(TypeSerializer serializer,T...elments) throws I0Exception {
this(serializer,Arrays.asList(elements));
}
可以看到它用到了 ListState 和 checkpointState ,这个就存储了相关的 Operator State 信息。
下面这个图是整个 State 负类子类的关系,这个比较黄的词是平常中经常用到的 State ,那么有个问题来了 这些 State 是类的定义,它们哪些属于 Key State ?
哪些属于 Operator State ?
比如是像 ValueState 代表说是 key 所对应的value 是单值 value , MapState 的 key 所对应的是 value 是map, ListState 的 key 所对应的 value 是 list ,
如下图:
看看其中哪些是 key 的 state ?即 ValueState 、 MapState 、ListState ,说明一下AggregatingState 、 ReducingState、 FoldingState 其实都是对 ValueState 的一个封装,上面加了一个function,那么哪些是 Operator State ?
即 ListState BroadcastState 。
③Managed 和 Raw
除了 key 和 Operator 一个维度来区分 State 还有一个维度,称之为 Managed 和 Raw ,其中 Managed State 是由 Flink 管理的state ,刚才举例的所有 state 均是 managed ;
而 Raw State是Flink 仅提供 stream 可以进行存储数据,从它的角度来看这些可能只是一些 bytes , 比如下面的这段方法,就是相关的 Raw State的东西,
如下:
public interface StateSnapshotContext extends FunctionSnapshotContext {
/**
* Returns an output stream for keyed state
*/
KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception;
/**
* Returns an output stream for operator state
*/
OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception;
三、如何在 Flink 中使用 State
先来看这段 word count 代码:
env.fromElements(WordCountData.WORDS)
//split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(new Tokenizer())
//group by the tuple field
“
0
”
and sum up tuple field
“
1
”
.keyBy(0).sum(1)
.print();
之前说过 sum 里面调用了内置的 StreamGroupedReduce ,
再来看看这段源代码:
public class StreamGroupedReduce extends AbstractUdfStreamOperator>
implements OneInputStreamOperator {
private transient ValueState values;
@
Override
public void open() throws Exception {
Super.open():
ValueStateDescriptor stated = new valueStateDescriptor<>(STATE_NAME, serializer);
values = getRuntimeContext().getState(stateId);
}
@
Override
public void processElement(StreamRecord element) throws Exception {
IN value = element.getValue():
IN currentValue = values.value():
if (currentValue != null) {
IN reduce = userFunction.reduce(currentValue, value);
values.update(reduceed);
output.collect(element.replace(reduced));
} else {
value.update(value):
output.collect(element.replace(value));
}
}
可以看到里面有个 ValueState ,它就是 StreamGroupedReduce 使用 state 的地方,在 open 方法里面可以对它进行初始化;
通过 RuntimeContext 去访问 state ,如果你想使用它就在processElement 中使用,包括 value , update 则是对 state 进行更新;
另外一方面如果在 state 中使用 Operator State ,同样使用内置的 FromElementsFunction ,我们以源码的方式去展示一下,
如下:
public class FromElementsFunction implements SourceFunction, CheckpointedFunction {
private transient ListState checkpointedState
@
Override
public void initializeState(FunctionInitializationContext context) throws Exception {
Precondition.checkState(this.checkpointedState == null,
“
The
”
+ getClass().gesSimpleName() +
”
has already been initialized.
”
);
this.checkpointedState = context.getOperatorStateStore().getLiState(
new ListStateDescriptor<>(
“
from-elements-state
”
,IntSerializer,INSTANCE)
);
if (context.isRestored()) {
List retrievedStates = new ArrayList<>():
for (Integer entry :this.checkpointedState.get()){
retrievedStates.add(entry);
//given that the parallelism of the function is1,we can only have 1 state
Preconditions.checkArgument(retrievedStates,sze() == 1,getClass().getSimpleName() +
”
retrieved invalid state.
”
);
this.numElementsToSkip = retrievedStates.get(0);
}
}
@
Override
public void snapshotState(FunctionSnapshotContextcontext) throws Exception {
Preconditions.checkState(this.checkpointedState != null,
“
The
”
+ getClass().getSimpleName() +
“
has not been properly initialized.
”
);
this.checkpointedState.clear();
this.checkpointedState.add(this.numElementsEmitted);
}
要访问它对 Operator State 进行显示的初始化,即这个 list 里面需要去存储、需要去初始化,是需要显示声明的,换言之需要继承接口来显示的对它进行初始化和存储,checkpointedState就是这里使用的 Operator State ,可以在另外的方法里面,
比如在 process 里就可以对此进行读写、更新的,这个逻辑实际上就是每次在snapshot 时把内存里面的 numElementsEmitted 加载到 ListState里面,相当于就是说下次如果发生 failed ,会直接从 numElementsEmitted 进行恢复。