Flink之状态编程OperatorState的使用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)OperatorState


Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个 状态,流入这个算子子任务的数据可以访问和更新这个状态。



25.png

注意: 算子子任务之间的状态不能互相访问


Operator State 的实际应用场景不如 Keyed State 多,它经常 被用在 Source 或 Sink 等算子上 ,用来保存流入数据的偏移量或对输出数据做缓存,以保证 Flink 应用的 ExactlyOnce 语义。


Operator State只和并行的算子实例绑定,和数据元素中的key无关,每个算子实例中持所有数据元素中的一部分状态数据。也即是说整个0perator只对应一个state,但是一个operator可能含有很多key,从而对应很多个keystate。Operator State支持当算子实例并行度发生变化时自动重新分配状态数据。


Flink 为算子状态提供三种基本数据结构:


列表状态(List state)

将状态表示为一组数据的列表


联合列表状态(Union list state)

也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保 存点(savepoint)启动应用程序时如何恢复。

一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个 实例(Union list state)。


广播状态(Broadcast state)

是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那 么这种特殊情况最适合应用广播状态。


Operator State 定义:


单Operator具有一个状态,不区分Key

State需要支持重新分布

不常用,主要用于Source和Sink节点,像KafkaConsumer中,维护Offset,Topic等信息;

实例: BufferSink

三种状态类型:


ListState

UnionListState

BroadcastState

两种定义方式:


实现CheckpointedFunction接口定义

实现ListCheckpointed接口定义


(2)OperatorState代码开发


package com.aikfk.flink.datastream.state;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.List;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/31 4:04 下午
 */
public class OperatorState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSink<Tuple2<String,Long>> dataStream = env.fromElements(
                Tuple2.of("a", 3L),
                Tuple2.of("a", 5L),
                Tuple2.of("b", 7L),
                Tuple2.of("c", 4L),
                Tuple2.of("c", 2L))
                .keyBy(value -> value.f0)
                .addSink(new BufferingSink());
        env.execute("KeyedState");
    }
    static class BufferingSink implements SinkFunction<Tuple2<String,Long>>, CheckpointedFunction {
        private ListState<Tuple2<String,Long>> listState;
        private List<Tuple2<String,Long>> bufferedElements = new ArrayList<>();
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Long>> descriptor =
                    new ListStateDescriptor<Tuple2<String, Long>>("bufferedSinkState",
                            TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {}));
            listState = context.getOperatorStateStore().getListState(descriptor);
            if (context.isRestored()){
                for (Tuple2<String, Long> element : listState.get()){
                    bufferedElements.add(element);
                }
            }
        }
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            for (Tuple2<String, Long> element : bufferedElements){
                listState.add(element);
            }
        }
        @Override
        public void invoke(Tuple2<String,Long> value, Context context) throws Exception {
            bufferedElements.add(value);
            System.out.println("invoke>>> " + value);
            for (Tuple2<String,Long> element : bufferedElements){
                System.out.println(Thread.currentThread().getId() + " >> " + element.f0 + " : " + element.f1);
            }
        }
    }
}

运行结果:

invoke>>> (c,4)
invoke>>> (a,3)
95 >> c : 4
invoke>>> (b,7)
99 >> a : 3
89 >> b : 7
invoke>>> (c,2)
95 >> c : 4
invoke>>> (a,5)
99 >> a : 3
95 >> c : 2
99 >> a : 5


(3)OperatorState使用


当 Flinkcheckpoint从恢复,或者从 savepoint中重启的时候,就回涉及到状态的重新分配,尤其是当并行度发生改变的时候即 operator改变并行度的时候(Rescale)会触发状态的 Redistribute,即 Operator State里的数据会重新分配到Operato的task实例。

26.png


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
12
分享
相关文章
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
flink-sql(table api 编程)
table api 基本使用 tableEnvironment 和 streamTableEnvironment 注册表,临时表,持久表 Table api 和 table sql 混用 table api 和 datastream 混用 table api 的输入和输出(kafka) kafka的高级特性option
flink-sql(table api 编程)
Flink SQL 核心概念剖析与编程案例实战
本文使用了 Docker 镜像快速安装一些基础组件,zk 和 kafka,并通过案例的方式,剖析了 SQL 的概念与详细的使用方式

热门文章

最新文章