Flink(十一)【状态管理】(2)https://developer.aliyun.com/article/1532239
2.6、状态生存时间(TTL)
在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。
具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。
配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的 .enableTimeToLive() 方法启动 TTL 功能。
StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("mystate", String.class); stateDescriptor.enableTimeToLive(ttlConfig);
这里用到了几个配置项:
- .newBuilder()。状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
- .setUpdateType()。设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型 OnReadAndWrite 则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为 OnCreateAndWrite。
- .setStateVisibility()。设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里默认设置的是 NeverReturnExpired ,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,也返回它的值。
- 除此之外,TTL 配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对 RocksDB 状态后端使用压缩过滤器(compaction filter)进行后台清理。
这里需要注意,目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。
注意:我们知道,我们上面创建状态时都是在 open 方法中进行初始化的,而 TTL 的配置也是通过 状态来调用的 ,所以 TTL 的定义也应该在 open 方法中进行。
案例-
public class KeyedStateTTL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> { return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); sensorDS.keyBy(WaterSensor::getId) // process方法的参数类型: KIO .process(new KeyedProcessFunction<String, WaterSensor, String>() { ValueState<Integer> lastVcState; // 初始化必须在生命周期中定义,因为初始化需要运行时环境,这里环境还没启动会初始化失败 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // todo 1. 创建 StateTtlConfig StateTtlConfig stateTtlConfig = StateTtlConfig .newBuilder(Time.seconds(5)) // 过期时间 // 设置只有在创建或写的时候才会刷新失效时间(往后推5s) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); // todo 2. 状态描述器启用 TTL ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT); stateDescriptor.enableTimeToLive(stateTtlConfig); this.lastVcState = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 获取状态值-读 Integer last_vc = lastVcState.value(); out.collect("key="+value.getId()+",状态值= "+last_vc); // 更新状态值-写 lastVcState.update(value.getVc()); } }).print(); env.execute(); } }
运行效果:
s1,1,1 => null s1,1,2 => 1 s1,1,1 => 2 // 等待5s后 s1,1,1 => null
3、算子状态(Operator State)
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。
算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上(所以一般都不用我们去写,因为 Flink 已经帮我们把各种连接器写好了),或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。
当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。
算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState 和 BroadcastState。可以看到,算子状态中并没有什么 Map、Value啊,这是因为算子状态是我们一个子任务里大家共享一起使用的。
3.1、列表状态(ListState)
与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。
与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。
算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。
案例-在 map 算子中计算数据的个数:
public class OperatorListState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.socketTextStream("localhost",9999) .map(new MyCountMapFunction()) .print(); env.execute(); } // TODO 1.实现 CheckpointedFunction 接口 public static class MyCountMapFunction implements MapFunction<String,Long>, CheckpointedFunction { private Long count = 0L; private ListState<Long> countState; @Override public Long map(String value) throws Exception { return ++count; //++count 不可以是count++ } /** * TODO 2. 我们的本地变量 count 要持久化到算子状态,这里需要对算子状态做快照 * @param context * @throws Exception */ @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { // 2.1 清空算子状态 countState.clear(); // 2.2 将本地变量添加到算子状态中 countState.add(count); } /** * TODO 3. 初始化本地变量,当我们的任务失败要恢复状态时,flink的checkpoint机制会从状态中把数据添加到本地变量,每个子任务调用一次 * @param context * @throws Exception */ @Override public void initializeState(FunctionInitializationContext context) throws Exception { // 3.1 从上下文初始化算子状态 countState = context.getOperatorStateStore() .getListState(new ListStateDescriptor<Long>("countState", Types.LONG)); // 3.2 从算子状态中把数据拷贝到本地变量 if (context.isRestored()) { // 如果初始化状态成功 for (Long c : countState.get()) { count += c; } } } } }
这里我们设置并行度为 2 ,我们选择 Socket 作为 Source 算子时,Source 并行度只能为 1 ,但是这里我们的 Map 算子并行度为 2 ,运行结果:
输入 输出 a 1> 1 b 2> 1 c 1> 2 d 2> 2 e 1> 3 f 2> 3
可以看到,数据被均匀分到两个算子中去了。
Flink(十一)【状态管理】(4)https://developer.aliyun.com/article/1532243