(1)OperatorState
Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个 状态,流入这个算子子任务的数据可以访问和更新这个状态。
注意: 算子子任务之间的状态不能互相访问
Operator State 的实际应用场景不如 Keyed State 多,它经常 被用在 Source 或 Sink 等算子上 ,用来保存流入数据的偏移量或对输出数据做缓存,以保证 Flink 应用的 ExactlyOnce 语义。
Operator State只和并行的算子实例绑定,和数据元素中的key无关,每个算子实例中持所有数据元素中的一部分状态数据。也即是说整个0perator只对应一个state,但是一个operator可能含有很多key,从而对应很多个keystate。Operator State支持当算子实例并行度发生变化时自动重新分配状态数据。
Flink 为算子状态提供三种基本数据结构:
列表状态(List state)
将状态表示为一组数据的列表
联合列表状态(Union list state)
也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保 存点(savepoint)启动应用程序时如何恢复。
一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个 实例(Union list state)。
广播状态(Broadcast state)
是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那 么这种特殊情况最适合应用广播状态。
Operator State 定义:
单Operator具有一个状态,不区分Key
State需要支持重新分布
不常用,主要用于Source和Sink节点,像KafkaConsumer中,维护Offset,Topic等信息;
实例: BufferSink
三种状态类型:
ListState
UnionListState
BroadcastState
两种定义方式:
实现CheckpointedFunction接口定义
实现ListCheckpointed接口定义
(2)OperatorState代码开发
package com.aikfk.flink.datastream.state; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.util.ArrayList; import java.util.List; /** * @author :caizhengjie * @description:TODO * @date :2021/3/31 4:04 下午 */ public class OperatorState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSink<Tuple2<String,Long>> dataStream = env.fromElements( Tuple2.of("a", 3L), Tuple2.of("a", 5L), Tuple2.of("b", 7L), Tuple2.of("c", 4L), Tuple2.of("c", 2L)) .keyBy(value -> value.f0) .addSink(new BufferingSink()); env.execute("KeyedState"); } static class BufferingSink implements SinkFunction<Tuple2<String,Long>>, CheckpointedFunction { private ListState<Tuple2<String,Long>> listState; private List<Tuple2<String,Long>> bufferedElements = new ArrayList<>(); @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Long>> descriptor = new ListStateDescriptor<Tuple2<String, Long>>("bufferedSinkState", TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {})); listState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()){ for (Tuple2<String, Long> element : listState.get()){ bufferedElements.add(element); } } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { for (Tuple2<String, Long> element : bufferedElements){ listState.add(element); } } @Override public void invoke(Tuple2<String,Long> value, Context context) throws Exception { bufferedElements.add(value); System.out.println("invoke>>> " + value); for (Tuple2<String,Long> element : bufferedElements){ System.out.println(Thread.currentThread().getId() + " >> " + element.f0 + " : " + element.f1); } } } }
运行结果:
invoke>>> (c,4) invoke>>> (a,3) 95 >> c : 4 invoke>>> (b,7) 99 >> a : 3 89 >> b : 7 invoke>>> (c,2) 95 >> c : 4 invoke>>> (a,5) 99 >> a : 3 95 >> c : 2 99 >> a : 5
(3)OperatorState使用
当 Flinkcheckpoint从恢复,或者从 savepoint中重启的时候,就回涉及到状态的重新分配,尤其是当并行度发生改变的时候即 operator改变并行度的时候(Rescale)会触发状态的 Redistribute,即 Operator State里的数据会重新分配到Operato的task实例。