(1)KeyedState
键控状态是根据输入数据流中定义的键(key)来维护和访问的。
Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算 子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动 将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的 状态。
Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream (keyBy 算子处理之后)。
(2)键控状态支持的数据类型
ValueState:
保存单个值.
每个有 key 有一个状态值.
设置使用update(T), 获取使用T value()
ListState:
保存元素列表.
添加元素: add(T) addAll(List< T >)
获取元素: Iterable< T > get()
覆盖所有元素: update(List< T >)
ReducingState:
存储单个值, 表示把所有元素的聚合结果添加到状态中. 与 ListState 类似, 但 是当使用 add(T)的时候 ReducingState 会使用指定的 ReduceFunction 进行聚合.
AggregatingState<IN, OUT>:
存储单个值. 与 ReducingState 类似, 都是进行聚合. 不同的是, AggregatingState 的聚合的结果和元素类型可以不一样.
MapState<UK, UV>:
存储键值对列表.
添加键值对: put(UK, UV) or putAll(Map<UK, UV>)
根据 key 获取值:get(UK)
获取所有: entries(), keys() and values()
检测是否为空: isEmpty()
注意:
所有的类型都有 clear(), 清空当前 key 的状态
这些状态对象仅用于用户与状态进行交互.
状态不是必须存储到内存,也可以存储在磁盘或者任意其他地方
从状态获取的值与输入元素的 key 相关
(3)代码实现
基于ValueState的开发实战
package com.aikfk.flink.datastream.state; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @author :caizhengjie * @description:TODO * @date :2021/3/31 4:04 下午 */ public class KeyedStateCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Long>> dataStream = env.fromElements( Tuple2.of("a", 3L), Tuple2.of("a", 5L), Tuple2.of("b", 7L), Tuple2.of("c", 4L), Tuple2.of("c", 2L)) .keyBy(value -> value.f0) .flatMap(new CountFunction()); dataStream.print(); env.execute("KeyedState"); } static class CountFunction extends RichFlatMapFunction<Tuple2<String,Long>,Tuple2<String,Long>>{ // 定义状态ValueState private ValueState<Tuple2<String,Long>> keyCount; /** * 初始化 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Tuple2<String,Long>> descriptor = new ValueStateDescriptor<Tuple2<String, Long>>("keycount", TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {})); keyCount = getRuntimeContext().getState(descriptor); } @Override public void flatMap(Tuple2<String, Long> input, Collector<Tuple2<String, Long>> collector) throws Exception { // 使用状态 Tuple2<String, Long> currentValue = (keyCount.value() == null) ? new Tuple2<>("", 0L) : keyCount.value(); // 累加数据 currentValue.f0 = input.f0; currentValue.f1 ++; // 更新状态 keyCount.update(currentValue); collector.collect(keyCount.value()); } } }
运行结果
3> (b,1) 11> (a,1) 11> (a,2) 8> (c,1) 8> (c,2)
基于keyedState的开发实战
package com.aikfk.flink.datastream.state; import com.aikfk.flink.datastream.bean.WaterSensor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.*; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; /** * @author :caizhengjie * @description:TODO * @date :2021/4/1 2:09 下午 */ public class KeyedStateTest { public static void main(String[] args) throws Exception { // 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.读取端口数据并转换为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String s) throws Exception { String[] split = s.split(","); return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2])); } }); // 3.按照id分组 KeyedStream<WaterSensor, String> keyedStream = waterSensorDS.keyBy(WaterSensor::getId); // 4.演示状态的使用 SingleOutputStreamOperator<WaterSensor> result = keyedStream.process(new MyStateProcessFunc()); // 5.打印 result.print(); // 6.执行任务 env.execute(); } public static class MyStateProcessFunc extends KeyedProcessFunction<String,WaterSensor,WaterSensor> { // a.定义状态 private ValueState<Long> valueState; private ListState<Long> listState; private MapState<String,Long> mapState; private ReducingState<WaterSensor> reducingState; private AggregatingState<WaterSensor,WaterSensor> aggregatingState; // b.初始化 @Override public void open(Configuration parameters) throws Exception { valueState = getRuntimeContext() .getState(new ValueStateDescriptor<Long>("value-state",Long.class)); listState = getRuntimeContext() .getListState(new ListStateDescriptor<Long>("list-state", Long.class)); mapState = getRuntimeContext() .getMapState(new MapStateDescriptor<String, Long>("map-state", String.class,Long.class)); // reducingState = getRuntimeContext() // .getReducingState(new ReducingStateDescriptor<WaterSensor>()) // aggregatingState = getRuntimeContext() // .getAggregatingState(new AggregatingStateDescriptor // <WaterSensor, Object, WaterSensor>()); } // c.状态的使用 @Override public void processElement(WaterSensor value, Context context, Collector<WaterSensor> collector) throws Exception { // c.1 Value状态 Long value1 = valueState.value(); valueState.update(122L); valueState.clear(); // c.2 List状态 Iterable<Long> longs = listState.get(); listState.add(122L); listState.clear(); listState.update(new ArrayList<>()); // c.3 Map状态 Iterator<Map.Entry<String,Long>> iterator = mapState.iterator(); Long aLong = mapState.get(""); mapState.contains(""); mapState.put("",122L); mapState.putAll(new HashMap<>()); mapState.remove(""); mapState.clear(); //c.4 Reduce状态 WaterSensor waterSensor1 = reducingState.get(); reducingState.add(new WaterSensor()); reducingState.clear(); //c.5 Agg状态 aggregatingState.add(value); WaterSensor waterSensor2 = aggregatingState.get(); aggregatingState.clear(); } } }