Flink(十一)【状态管理】(3)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十一)【状态管理】

Flink(十一)【状态管理】(2)https://developer.aliyun.com/article/1532239

2.6、状态生存时间(TTL)

       在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

       具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。

       配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的 .enableTimeToLive() 方法启动 TTL 功能。

StateTtlConfig ttlConfig = StateTtlConfig
     .newBuilder(Time.seconds(10))
     .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
     .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
     .build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("mystate", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

这里用到了几个配置项:

  • .newBuilder()。状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
  • .setUpdateType()。设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型 OnReadAndWrite 则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为 OnCreateAndWrite。
  • .setStateVisibility()。设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里默认设置的是 NeverReturnExpired ,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,也返回它的值。
  • 除此之外,TTL 配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对 RocksDB 状态后端使用压缩过滤器(compaction filter)进行后台清理。

这里需要注意,目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。

注意:我们知道,我们上面创建状态时都是在 open 方法中进行初始化的,而 TTL 的配置也是通过 状态来调用的 ,所以 TTL 的定义也应该在 open 方法中进行。

案例-

public class KeyedStateTTL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                            return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                        }));
 
        sensorDS.keyBy(WaterSensor::getId)
                // process方法的参数类型: KIO
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
 
                    ValueState<Integer> lastVcState;    // 初始化必须在生命周期中定义,因为初始化需要运行时环境,这里环境还没启动会初始化失败
 
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
 
                        // todo 1. 创建 StateTtlConfig
                        StateTtlConfig stateTtlConfig = StateTtlConfig
                                .newBuilder(Time.seconds(5))    // 过期时间
                                // 设置只有在创建或写的时候才会刷新失效时间(往后推5s)
                                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//                                .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                                .build();
 
                        // todo 2. 状态描述器启用 TTL
                        ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
                        stateDescriptor.enableTimeToLive(stateTtlConfig);
 
                        this.lastVcState = getRuntimeContext().getState(stateDescriptor);
                    }
 
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        // 获取状态值-读
                        Integer last_vc = lastVcState.value();
                        out.collect("key="+value.getId()+",状态值= "+last_vc);
 
                        // 更新状态值-写
                        lastVcState.update(value.getVc());
                    }
                }).print();
 
        env.execute();
    }
}

运行效果:

s1,1,1        =>    null
s1,1,2        =>    1
s1,1,1        =>    2
// 等待5s后
s1,1,1        =>    null

3、算子状态(Operator State)

       算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。

       算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上(所以一般都不用我们去写,因为 Flink 已经帮我们把各种连接器写好了),或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。

       当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

       算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState 和 BroadcastState。可以看到,算子状态中并没有什么 Map、Value啊,这是因为算子状态是我们一个子任务里大家共享一起使用的。

3.1、列表状态(ListState)

       与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。

       与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

       当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。

       算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。

案例-在 map 算子中计算数据的个数:

public class OperatorListState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
 
        env.socketTextStream("localhost",9999)
                .map(new MyCountMapFunction())
                .print();
 
        env.execute();
    }
    // TODO 1.实现 CheckpointedFunction 接口
    public static class MyCountMapFunction implements MapFunction<String,Long>, CheckpointedFunction {
 
        private Long count = 0L;
        private ListState<Long> countState;
 
 
        @Override
        public Long map(String value) throws Exception {
            return ++count; //++count 不可以是count++
        }
 
        /**
         * TODO 2. 我们的本地变量 count 要持久化到算子状态,这里需要对算子状态做快照
         * @param context
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // 2.1 清空算子状态
            countState.clear();
            // 2.2 将本地变量添加到算子状态中
            countState.add(count);
        }
 
        /**
         * TODO 3. 初始化本地变量,当我们的任务失败要恢复状态时,flink的checkpoint机制会从状态中把数据添加到本地变量,每个子任务调用一次
         * @param context
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            // 3.1 从上下文初始化算子状态
            countState = context.getOperatorStateStore()
                    .getListState(new ListStateDescriptor<Long>("countState", Types.LONG));
            // 3.2 从算子状态中把数据拷贝到本地变量
            if (context.isRestored()) { // 如果初始化状态成功
                for (Long c : countState.get()) {
                    count += c;
                }
            }
        }
    }
}

这里我们设置并行度为 2 ,我们选择 Socket 作为 Source 算子时,Source 并行度只能为 1 ,但是这里我们的 Map 算子并行度为 2 ,运行结果:

输入    输出
a       1> 1
b       2> 1
c       1> 2
d       2> 2
e       1> 3
f       2> 3

可以看到,数据被均匀分到两个算子中去了。

Flink(十一)【状态管理】(4)https://developer.aliyun.com/article/1532243

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
584 1
|
消息中间件 存储 Kafka
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
|
6月前
|
存储 分布式计算 IDE
Flink(十一)【状态管理】(4)
Flink(十一)【状态管理】
|
7月前
|
存储 Cloud Native 数据处理
Flink 2.0 状态管理存算分离架构演进
本文整理自阿里云智能 Flink 存储引擎团队负责人梅源在 Flink Forward Asia 2023 的分享,梅源结合阿里内部的实践,分享了状态管理的演进和 Flink 2.0 存算分离架构的选型。
1201 1
Flink 2.0 状态管理存算分离架构演进
|
6月前
|
传感器 流计算
Flink(十一)【状态管理】(2)
Flink(十一)【状态管理】
|
6月前
|
存储 传感器 大数据
Flink(十一)【状态管理】(1)
Flink(十一)【状态管理】
|
7月前
|
消息中间件 SQL Java
实时计算 Flink版产品使用合集之管理内存webui上一直是百分百是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版产品使用合集之web ui能否在线管理数据source和处理数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
7月前
|
存储 Java API
Flink中的状态管理是什么?请解释其作用和常用方法。
Flink中的状态管理是什么?请解释其作用和常用方法。
80 0