Flink 状态管理
我们一直称 Flink 为运行在数据流上的有状态计算框架和处理引擎。在之前的章节中也已经多次提到了“状态”(state),不论是简单聚合、窗口聚合,还是处理函数的应用,都会有状态的身影出现。状态就如同事务处理时数据库中保存的信息一样,是用来辅助进行任务计算的数据。而在 Flink 这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时正确地恢复。这就需要一套完整的管理机制来处理所有的状态。
1、Flink 中的状态
在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。
1.1、概述
在 Flink 中,算子任务可以分为有状态和无状态两种情况。
1.1.1、无状态算子任务
无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。比如我们之前学的 map、flatMap、filter 等,计算时不依赖其它数据,就属于无状态的算子。
1.1.2、有状态算子任务
而有状态的算子任务则除了当前数据外,还需要一些其他数据来得到计算结果。这里的“其他数据”就是所谓的状态(State)。比如我们之前学的 聚合算子、窗口算子都属于有状态的算子。
比如我们之前窗口函数中学的增量聚合函数,每来一条数据它都会把处理后的结果保存到一个中间值,当窗口内再来一条数据就更新这个中间值,这个中间值就是所谓的状态。
再比如我们的全窗口函数,它会把来的所有数据都保存到一个中间值当中,直到窗口关闭时才会触发计算,同样,这个中间值就是所谓的状态。
此外还有我们的一些聚合算子比如 sum、min、max 等,它肯定是要把中间结果存储起来的,所以这都叫有状态算子。
1.2、状态的分类
1.2.1、托管状态(Managed State)和原始状态(Raw State)
Flink 的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是由 Flink 统一管理的,也就是管理状态的存储、访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。
通常我们都是采用 Flink 托管状态来实现需求。只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况下不推荐使用。
1.2.2、算子状态和按键分区状态
我们知道在 Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。
基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。
(1)算子状态(Operator State)
状态作用范围限定为当前的算子任务的各个子任务,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。
算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们需要进一步实现 CheckpointedFunction 接口 。
新版本的 Flink 重构了 Source (也就是使用 fromSource 的写法),所以新版本则需要继承 SourceReaderBase 抽象类。
(2)按键分区状态
状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就 keyBy 之后才可以使用。
按键分区状态应用非常广泛。之前讲到的聚合算子必须在 keyBy 之后才能使用,就是因为聚合的结果是以 Keyed State 的形式保存的。另外,也可以通过富函数类(Rich Function)来自定义 Keyed State,所以只要提供了富函数类接口的算子,也都可以使用 Keyed State。所以即使是 map、filter 这样无状态的基本转换算子,我们也可以通过富函数类给它们 “追加” Keyed State,或者实现 CheckpointedFunction 接口来定义 Operator State;从这个角度讲,Flink 中所有的算子都可以是有状态的,所以说 Flink 是“有状态的流处理”。
无论是 Keyed State 还是 Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着自己的状态,算子的子任务之间状态不共享。
2、按键分区状态(Keyed State)
在实际应用中,我们一般都需要将数据按照某个 key 进行分区,然后再进行计算处理;所以最为常见的状态类型就是 Keyed State。之前介绍到 keyBy 之后的聚合、窗口计算,算子所持有的状态,都是 Keyed State。
另外,我们还可以通过富函数类(Rich Function)对转换算子进行扩展、实现自定义功能,比如 RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是 Keyed State。
按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以 key 为作用范围进行隔离。我们知道,在进行按键分区(keyBy)之后,具有相同键的所有数据,都会分配到同一个并行子任务中;所以如果当前任务定义了状态,Flink 就会在当前并行子任务实例中,为每个键值维护一个状态的实例。于是当前任务就会为分配来的所有数据,按照 key 维护和处理对应的状态。
因为一个并行子任务可能会处理多个 key 的数据,所以 Flink 需要对 Keyed State 进行一些特殊优化。在底层,Keyed State 类似于一个分布式的映射(map)数据结构,所有的状态会根据 key 保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的 key,从 map 存储中读取出对应的状态值。所以具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。这种将状态绑定到 key 上的方式,相当于使得状态和流的逻辑分区一一对应了:不会有别的 key 的数据来访问当前状态;而当前状态对应 key 的数据也只会访问这一个状态,不会分发到其他分区去。这就保证了对状态的操作都是本地进行的,对数据流和状态的处理做到了分区一致性。
另外,在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同。需要注意,使用 Keyed State 必须基于 KeyedStream。没有进行 keyBy 分区的 DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问 Keyed State。
需要注意:使用 Keyed State 必须基于 KeyedStream。
实际应用中,需要保存为状态的数据会有各种各样的类型,有时还需要复杂的集合类型,比如列表(List)和映射(Map)。对于这些常见的用法,Flink 的按键分区状态(Keyed State)提供了足够的支持。接下来我们就来了解一下 Keyed State 所支持的结构类型:
2.1、值状态(ValueState)
顾名思义,状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定义如下:
public interface ValueState<T> extends State { T value() throws IOException; void update(T value) throws IOException; }
这里的 T 是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是 ValueState。
我们可以在代码中读写值状态,实现对于状态的访问和更新。
- T value():获取当前状态的值;
- update(T value):对状态进行更新,传入的参数 value 就是要覆写的状态值。
在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState 的状态描述器构造方法如下:
public ValueStateDescriptor(String name, Class<T> typeClass) { super(name, typeClass, null); }
这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。有了这个描述器,运行时环境就可以获取到状态的控制句柄(handler)了。
接下来演示一个案例:对连续两个水位值超过10的传感器输出报警信息
/** * 检测每种传感器的水为值,如果连续两个水位差值超过10就输出结果 */ public class KeyedValueState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> { return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); sensorDS.keyBy(WaterSensor::getId) // process方法的参数类型: KIO .process(new KeyedProcessFunction<String, WaterSensor, String>() { // todo 1.定义状态 ValueState<Integer> lastVcState; // 初始化必须在生命周期中定义,因为初始化需要运行时环境,这里环境还没启动会初始化失败 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // todo 2. 在open方法初始化状态 // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型 lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 要和上一条水位值进行比较 // todo 1. 取出上一条数据的水位值 // Integer的初始值为null 这里遇到第一条水位时需要注意判断null int last_value = lastVcState.value()==null?0:lastVcState.value(); // todo 2. 判断两条水位是否都超过 10 if (Math.abs(value.getVc()-last_value)>10) System.out.println("传感器"+value.getId()+"当前水位值="+value.getVc()+",与上一条水位值="+last_value+"相差超过10!!!"); // todo 3. 更新上一条水位值 lastVcState.update(value.getVc()); } }).print(); env.execute(); } }
代码解析:
- 我们说过,如果没有合适的函数能解决我们的需求,那就直接用 process 就行了。
- ValueState 应该在 open 方法中去初始化,因为直接在程序执行前去尝试调用 getRuntimeContext 获取执行环境是获取不到的。
- 要用 ValueState 而不是 int 也不是 hashMap
- 用 int 就不能区分不同的传感器了
- 用 hashMap 的效率要比 ValueState 低而且存在一些别的问题
- 注意初始值 Integer 的初始值为 null
2.2、列表状态(ListState)
将需要保存的数据,以列表(List)的形式组织起来。在 ListState接口中同样有一个类型参数 T,表示列表中数据的类型。ListState 也提供了一系列的方法来操作状态,使用方式与一般的 List 非常相似。
- Iterable get():获取当前的列表状态,返回的是一个可迭代类型 Iterable;
- update(List values):传入一个列表 values,直接对状态进行覆盖;
- add(T value):在状态列表中添加一个元素 value;
- addAll(List values):向列表中添加多个元素,以列表 values 形式传入。
类似地,ListState 的状态描述器就叫作 ListStateDescriptor,用法跟 ValueStateDescriptor完全一致。
同样我们演示一个案例:针对每种传感器输出最高的3个水位值:
/** * 获取每个水位器的 top3 的水位值 */ public class KeyedListState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> { return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); sensorDS.keyBy(WaterSensor::getId) // process方法的参数类型: KIO .process(new KeyedProcessFunction<String, WaterSensor, String>() { // todo 1.定义状态 ListState<Integer> maxState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // todo 2. 在open方法初始化状态 // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型 maxState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("maxState",Types.INT)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // todo 1. 添加水位值 // Integer的初始值为null 这里遇到第一条水位时需要注意判断null maxState.add(value.getVc()); // todo 2. 排序 List<Integer> list = new ArrayList<>(); for (int vc : maxState.get()) { list.add(vc); } list.sort((o1, o2) -> o2 - o1); if (list.size()>3) // 一超过3立即清理,防止数据量大的排序开销 list.remove(3); // todo 3. 输出top3 out.collect("传感器="+value.getId()+"最大的3个水为值为"+list.toString()); } }).print(); env.execute(); } }
代码解析:
- 这里同样不用普通的 List,因为它不能按 key 分组
- 这里用了一个简单的优化,if(list.size()>3) list.remove(3) 使得list永远只有3个值,减少了大数据场景下每次的遍历开销
- list.sort((o2,o1)->o2-o1) lambda基础知识
Flink(十一)【状态管理】(2)https://developer.aliyun.com/article/1532239