Flink1.9 Sate Processor API 介绍和实例demo

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 功能介绍 Flink1.9 新添加的功能,其能够帮助用户直接访问Flink中存储的State,API能够帮助用户非常方便地读取、修改甚至重建整个State。这个功能的强大之处在于几个方面,第一个就是灵活地读取外部的数据,比如从一个数据库中读取自主地构建Savepoint,解决作业冷启动问题,这样就不用从N天前开始重跑整个数据 可以使用的场景 异步校验或者查看某个阶段的状态,一般而言,flink作业的最终结果都会持久化输出,但在面临问题的时候,如何确定哪一级出现问题,state processor api也提供了一种可能,去检验state中的数据是否与预期的一致。

功能介绍

Flink1.9 新添加的功能,其能够帮助用户直接访问Flink中存储的State,API能够帮助用户非常方便地读取、修改甚至重建整个State。这个功能的强大之处在于几个方面,第一个就是灵活地读取外部的数据,比如从一个数据库中读取自主地构建Savepoint,解决作业冷启动问题,这样就不用从N天前开始重跑整个数据

可以使用的场景

  • 异步校验或者查看某个阶段的状态,一般而言,flink作业的最终结果都会持久化输出,但在面临问题的时候,如何确定哪一级出现问题,state processor api也提供了一种可能,去检验state中的数据是否与预期的一致。
  • 脏数据订正,比如有一条脏数据污染了State,就可以用State Processor API对于状态进行修复和订正。
  • 状态迁移,当用户修改了作业逻辑,还想要复用原来作业中大部分的State,或者想要升级这个State的结构就可以用这个API来完成相应的工作。
  • 解决作业冷启动问题,这样就不用从N天前开始重跑整个数据。

一些限制点

  • window state暂时修改不了
  • 每个有状态的算子都必须手动指定uid
  • 无法通过读取savepoint 直接获取到metadata 信息(existing operator ids)

关联的知识点

State 分为: 1: Operator States 2: Keyed States
在读取state的时候需要根据对应的类型选择不同的读取方式

Operator States Keyed States
readListState readKeyedState
readUnionState
readBroadcastState

基于batch 热加载数据生成Savepoint 和 Savepoint state 修改

最后会给出对应的两个demo。
基本流程两者比较类似

  • 基于batch 热加载数据

    1: batch读取数据 --> Dataset (比如读取文本文件)
    2: 编写业务逻辑处理数据 --> 获取转换后的DataSet(处理文本生成一个Tuple2<key, num>
    3: 将数据结果转换为state --> KeyedStateBootstrapFunction
    4: 生成外部Savepoint(注意对uid的指定和StateBackend 类型的选择)
    • Savepoint state 修改
    1: 调用Savepoint.load 加载当前已经存在的Savepoint(注意StateBackend 必须和之前生成的任务一致)
    2: 调用 savepoint.readKeyedState 读取获取到的ExistingSavepoint,结果是一个DataSet数据集
    3:编写Batch 业务逻辑调整生成的DataSet(比如删除某个元素),其结果还算一个DataSet
    4: 自定义 KeyedStateBootstrapFunction 将数据结果转换为state
    5: 生成外部Savepoint(注意对uid的指定和StateBackend 类型的选择)

基于batch 重新构建stream样例

public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //获取外部离线数据源
        DataSource<String> textSource =  env.readTextFile("D:\\sources\\data.txt");
        DataSet<Tuple2<String, Integer>> sourceDataSet = textSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] strArr = value.split(",");
                for (String str : strArr) {
                    Tuple2<String, Integer> worldTuple = new Tuple2<>(str, 1);
                    out.collect(worldTuple);
                }
            }
        });

        //计算出需要的历史状态
        DataSet<ReadAndModifyState.KeyedValueState> dataSet = sourceDataSet
                .groupBy(0)
                .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, ReadAndModifyState.KeyedValueState>() {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<ReadAndModifyState.KeyedValueState> out) throws Exception {

                Iterator iterator = values.iterator();
                Long countNum = 0L;
                String worldkey = null;
                while(iterator.hasNext()){
                    Tuple2<String, Integer> info = (Tuple2<String, Integer>) iterator.next();
                    if(worldkey == null){
                        worldkey = info.f0;
                    }
                    countNum++;
                }

                ReadAndModifyState.KeyedValueState keyedValueState = new ReadAndModifyState.KeyedValueState();
                keyedValueState.key = new Tuple1<>(worldkey);
                keyedValueState.countNum = countNum;

                out.collect(keyedValueState);
            }
        });

        //将历史状态转换为state 并转换为savepoint 写入hdfs上
        BootstrapTransformation<ReadAndModifyState.KeyedValueState> transformation = OperatorTransformation
                .bootstrapWith(dataSet)
                .keyBy(new KeySelector<ReadAndModifyState.KeyedValueState, Tuple1<String>>() {
                    @Override
                    public Tuple1<String> getKey(ReadAndModifyState.KeyedValueState value) throws Exception {
                        return value.key;
                    }
                })
                .transform(new ReadAndModifyState.KeyedValueStateBootstrapper());

        String uid = "keyby_summarize";
        String savePointPath = "hdfs://ns1/user/xc/savepoint-from-batch";
        StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");
        Savepoint.create(rocksDBBackEnd, 128)
                .withOperator(uid, transformation)
                .write(savePointPath);


        env.execute("batch build save point");
        System.out.println("-------end------------");
    }

读取和修改样例

 public static void main(String[] args) throws Exception {
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        String savePointPath = "hdfs://ns1/user/xc/savepoint-61b8e1-bbee958b3087";
        StateBackend rocksDBBackEnd = new RocksDBStateBackend("hdfs://ns1/user/xc");

        ExistingSavepoint savepoint = Savepoint.load(bEnv, savePointPath, rocksDBBackEnd);

        //读取
        String uid = "keyby_summarize";
        DataSet<KeyedValueState> keyState = savepoint.readKeyedState(uid, new StateReaderFunc());

        //修改
        DataSet<KeyedValueState> dataSet = keyState.flatMap((FlatMapFunction<KeyedValueState, KeyedValueState>) (value, out) -> {
            value.countNum = value.countNum * 2;
            out.collect(value);
        }).returns(KeyedValueState.class);

        BootstrapTransformation<KeyedValueState> transformation = OperatorTransformation
                .bootstrapWith(dataSet)
                //注意keyby操作的key一定要和原来的相同
                .keyBy(new KeySelector<KeyedValueState, Tuple1<String>>() {
                    @Override
                    public Tuple1<String> getKey(KeyedValueState value) throws Exception {
                        return value.key;
                    }
                })
                .transform(new KeyedValueStateBootstrapper());

        Savepoint.create(rocksDBBackEnd, 128)
                .withOperator(uid, transformation)
                .write("hdfs://ns1/user/xc/savepoint-after-modify3");


        bEnv.execute("read the list state");
        System.out.println("-----end------------");
    }

    public static class StateReaderFunc extends KeyedStateReaderFunction<Tuple1<String>, KeyedValueState> {

        private static final long serialVersionUID = -3616180524951046897L;
        private transient ValueState<Long> state;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class);
            state = getRuntimeContext().getState(currentCountDescriptor);
        }

        @Override
        public void readKey(Tuple1<String> key, Context ctx, Collector<KeyedValueState> out) throws Exception {
            System.out.println(key.f0 +":" + state.value());

            KeyedValueState keyedValueState = new KeyedValueState();
            keyedValueState.key = new Tuple1<>(key.f0);
            keyedValueState.countNum = state.value();

            out.collect(keyedValueState);
        }
    }

    public static class KeyedValueState {
        Tuple1<String> key;
        Long countNum;
    }

    private static class KeyedValueStateBootstrapper extends KeyedStateBootstrapFunction<Tuple1<String>, KeyedValueState>{

        private static final long serialVersionUID = 1893716139133502118L;
        private ValueState<Long> currentCount = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor currentCountDescriptor = new ValueStateDescriptor("currentCountState", Long.class, 0L);
            currentCount = getRuntimeContext().getState(currentCountDescriptor);
        }

        @Override
        public void processElement(KeyedValueState value, Context ctx) throws Exception {
            currentCount.update(value.countNum);
        }
    }
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
JSON 搜索推荐 API
深入了解亚马逊商品详情API:功能、作用与实例
亚马逊商品详情API接口由官方提供,允许开发者通过程序调用获取商品详细信息,如标题、价格等,适用于电商数据分析、搜索及个性化推荐等场景。接口名称包括ItemLookup、GetMatchingProductForId等,支持HTTP POST/GET请求,需提供商品ID、API密钥及其他可选参数。返回数据格式通常为JSON或XML,涵盖商品详情、分类、品牌、价格、图片URL及用户评价等。该接口对数据收集、实时推荐、营销活动及数据分析至关重要,有助于提升电商平台的数据处理能力、用户体验及商家运营效率。使用时需注册亚马逊开发者账号并申请API访问权限,获取API密钥后按文档构建请求并处理响应数据。
|
4月前
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
5月前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
4月前
|
缓存 JavaScript 前端开发
微信 JS-SDK Demo “分享信息设置” API 及数字签名生成方法(NodeJS版本)
微信 JS-SDK Demo “分享信息设置” API 及数字签名生成方法(NodeJS版本)更新时间(2020-10-29)
|
6月前
|
人工智能 运维 Serverless
函数计算产品使用问题之启动的实例是否有调用api接口停止功能
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
6月前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
canal 监控 关系型数据库
实时计算 Flink版产品使用问题之如何在实例里配置监控哪些库,哪些表,包括黑名单,白名单
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
279 0
|
6月前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之下载了mysql的cdc的demo,在本地调试时,报错:找不到这个包,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
172 0
|
6月前
|
SQL 存储 API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】