Flink1.9 Sate Processor API 介绍和实例demo

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 功能介绍 Flink1.9 新添加的功能,其能够帮助用户直接访问Flink中存储的State,API能够帮助用户非常方便地读取、修改甚至重建整个State。这个功能的强大之处在于几个方面,第一个就是灵活地读取外部的数据,比如从一个数据库中读取自主地构建Savepoint,解决作业冷启动问题,这样就不用从N天前开始重跑整个数据 可以使用的场景 异步校验或者查看某个阶段的状态,一般而言,flink作业的最终结果都会持久化输出,但在面临问题的时候,如何确定哪一级出现问题,state processor api也提供了一种可能,去检验state中的数据是否与预期的一致。

功能介绍

Flink1.9 新添加的功能,其能够帮助用户直接访问Flink中存储的State,API能够帮助用户非常方便地读取、修改甚至重建整个State。这个功能的强大之处在于几个方面,第一个就是灵活地读取外部的数据,比如从一个数据库中读取自主地构建Savepoint,解决作业冷启动问题,这样就不用从N天前开始重跑整个数据

可以使用的场景

  • 异步校验或者查看某个阶段的状态,一般而言,flink作业的最终结果都会持久化输出,但在面临问题的时候,如何确定哪一级出现问题,state processor api也提供了一种可能,去检验state中的数据是否与预期的一致。
  • 脏数据订正,比如有一条脏数据污染了State,就可以用State Processor API对于状态进行修复和订正。
  • 状态迁移,当用户修改了作业逻辑,还想要复用原来作业中大部分的State,或者想要升级这个State的结构就可以用这个API来完成相应的工作。
  • 解决作业冷启动问题,这样就不用从N天前开始重跑整个数据。

一些限制点

  • window state暂时修改不了
  • 每个有状态的算子都必须手动指定uid
  • 无法通过读取savepoint 直接获取到metadata 信息(existing operator ids)

关联的知识点

State 分为: 1: Operator States 2: Keyed States
在读取state的时候需要根据对应的类型选择不同的读取方式

Operator States Keyed States
readListState readKeyedState
readUnionState
readBroadcastState

基于batch 热加载数据生成Savepoint 和 Savepoint state 修改

最后会给出对应的两个demo。
基本流程两者比较类似

  • 基于batch 热加载数据

    1: batch读取数据 --> Dataset (比如读取文本文件)
    2: 编写业务逻辑处理数据 --> 获取转换后的DataSet(处理文本生成一个Tuple2<key, num>
    3: 将数据结果转换为state --> KeyedStateBootstrapFunction
    4: 生成外部Savepoint(注意对uid的指定和StateBackend 类型的选择)
    • Savepoint state 修改
    1: 调用Savepoint.load 加载当前已经存在的Savepoint(注意StateBackend 必须和之前生成的任务一致)
    2: 调用 savepoint.readKeyedState 读取获取到的ExistingSavepoint,结果是一个DataSet数据集
    3:编写Batch 业务逻辑调整生成的DataSet(比如删除某个元素),其结果还算一个DataSet
    4: 自定义 KeyedStateBootstrapFunction 将数据结果转换为state
    5: 生成外部Savepoint(注意对uid的指定和StateBackend 类型的选择)

基于batch 重新构建stream样例

public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //获取外部离线数据源
        DataSource<String> textSource =  env.readTextFile("D:\\sources\\data.txt");
        DataSet<Tuple2<String, Integer>> sourceDataSet = textSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] strArr = value.split(",");
                for (String str : strArr) {
                    Tuple2<String, Integer> worldTuple = new Tuple2<>(str, 1);
                    out.collect(worldTuple);
                }
            }
        });

        //计算出需要的历史状态
        DataSet<ReadAndModifyState.KeyedValueState> dataSet = sourceDataSet
                .groupBy(0)
                .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, ReadAndModifyState.KeyedValueState>() {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<ReadAndModifyState.KeyedValueState> out) throws Exception {

                Iterator iterator = values.iterator();
                Long countNum = 0L;
                String worldkey = null;
                while(iterator.hasNext()){
                    Tuple2<String, Integer> info = (Tuple2<String, Integer>) iterator.next();
                    if(worldkey == null){
                        worldkey = info.f0;
                    }
                    countNum++;
                }

                ReadAndModifyState.KeyedValueState keyedValueState = new ReadAndModifyState.KeyedValueState();
                keyedValueState.key = new Tuple1<>(worldkey);
                keyedValueState.countNum = countNum;

                out.collect(keyedValueState);
            }
        });

        //将历史状态转换为state 并转换为savepoint 写入hdfs上
        BootstrapTransformation<ReadAndModifyState.KeyedValueState> transformation = OperatorTransformation
                .bootstrapWith(dataSet)
                .keyBy(new KeySelector<ReadAndModifyState.KeyedValueState, Tuple1<String>>() {
                    @Override
                    public Tuple1<String> getKey(ReadAndModifyState.KeyedValueState value) throws Exception {
                        return value.key;
                    }
                })
                .transform(new ReadAndModifyState.KeyedValueStateBootstrapper());

        String uid = "keyby_summarize";
        String savePointPath = "hdfs://ns1/user/xc/savepoint-from-batch";
        StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");
        Savepoint.create(rocksDBBackEnd, 128)
                .withOperator(uid, transformation)
                .write(savePointPath);


        env.execute("batch build save point");
        System.out.println("-------end------------");
    }

读取和修改样例

 public static void main(String[] args) throws Exception {
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        String savePointPath = "hdfs://ns1/user/xc/savepoint-61b8e1-bbee958b3087";
        StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");

        ExistingSavepoint savepoint = Savepoint.load(bEnv, savePointPath, rocksDBBackEnd);

        //读取
        String uid = "keyby_summarize";
        DataSet<KeyedValueState> keyState = savepoint.readKeyedState(uid, new StateReaderFunc());

        //修改
        DataSet<KeyedValueState> dataSet = keyState.flatMap((FlatMapFunction<KeyedValueState, KeyedValueState>) (value, out) -> {
            value.countNum = value.countNum * 2;
            out.collect(value);
        }).returns(KeyedValueState.class);

        BootstrapTransformation<KeyedValueState> transformation = OperatorTransformation
                .bootstrapWith(dataSet)
                //注意keyby操作的key一定要和原来的相同
                .keyBy(new KeySelector<KeyedValueState, Tuple1<String>>() {
                    @Override
                    public Tuple1<String> getKey(KeyedValueState value) throws Exception {
                        return value.key;
                    }
                })
                .transform(new KeyedValueStateBootstrapper());

        Savepoint.create(rocksDBBackEnd, 128)
                .withOperator(uid, transformation)
                .write("hdfs://ns1/user/xc/savepoint-after-modify3");


        bEnv.execute("read the list state");
        System.out.println("-----end------------");
    }

    public static class StateReaderFunc extends KeyedStateReaderFunction<Tuple1<String>, KeyedValueState> {

        private static final long serialVersionUID = -3616180524951046897L;
        private transient ValueState<Long> state;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class);
            state = getRuntimeContext().getState(currentCountDescriptor);
        }

        @Override
        public void readKey(Tuple1<String> key, Context ctx, Collector<KeyedValueState> out) throws Exception {
            System.out.println(key.f0 +":" + state.value());

            KeyedValueState keyedValueState = new KeyedValueState();
            keyedValueState.key = new Tuple1<>(key.f0);
            keyedValueState.countNum = state.value();

            out.collect(keyedValueState);
        }
    }

    public static class KeyedValueState {
        Tuple1<String> key;
        Long countNum;
    }

    private static class KeyedValueStateBootstrapper extends KeyedStateBootstrapFunction<Tuple1<String>, KeyedValueState>{

        private static final long serialVersionUID = 1893716139133502118L;
        private ValueState<Long> currentCount = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class, 0L);
            currentCount = getRuntimeContext().getState(currentCountDescriptor);
        }

        @Override
        public void processElement(KeyedValueState value, Context ctx) throws Exception {
            currentCount.update(value.countNum);
        }
    }
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
7月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
626 12
Flink CDC YAML:面向数据集成的 API 设计
|
6月前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
170 5
|
12月前
|
JSON 搜索推荐 API
深入了解亚马逊商品详情API:功能、作用与实例
亚马逊商品详情API接口由官方提供,允许开发者通过程序调用获取商品详细信息,如标题、价格等,适用于电商数据分析、搜索及个性化推荐等场景。接口名称包括ItemLookup、GetMatchingProductForId等,支持HTTP POST/GET请求,需提供商品ID、API密钥及其他可选参数。返回数据格式通常为JSON或XML,涵盖商品详情、分类、品牌、价格、图片URL及用户评价等。该接口对数据收集、实时推荐、营销活动及数据分析至关重要,有助于提升电商平台的数据处理能力、用户体验及商家运营效率。使用时需注册亚马逊开发者账号并申请API访问权限,获取API密钥后按文档构建请求并处理响应数据。
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
人工智能 运维 Serverless
函数计算产品使用问题之启动的实例是否有调用api接口停止功能
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
缓存 JavaScript 前端开发
微信 JS-SDK Demo “分享信息设置” API 及数字签名生成方法(NodeJS版本)
微信 JS-SDK Demo “分享信息设置” API 及数字签名生成方法(NodeJS版本)更新时间(2020-10-29)
|
canal 监控 关系型数据库
实时计算 Flink版产品使用问题之如何在实例里配置监控哪些库,哪些表,包括黑名单,白名单
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
440 0
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之下载了mysql的cdc的demo,在本地调试时,报错:找不到这个包,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
229 0