Flink(十一)【状态管理】(1)https://developer.aliyun.com/article/1532238
2.3、Map 状态(MapState)
把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组 key-value 映射的列表。对应的 MapState接口中,就会有 UK、UV 两个泛型,分别表示保存的 key和 value 的类型。同样,MapState 提供了操作映射状态的方法,与 Map 的使用非常类似。
- UV get(UK key):传入一个 key 作为参数,查询对应的 value 值;
- put(UK key, UV value):传入一个键值对,更新 key 对应的 value 值;
- putAll(Map map):将传入的映射 map 中所有的键值对,全部添加到映射状态中;
- remove(UK key):将指定 key 对应的键值对删除;
- boolean contains(UK key):判断是否存在指定的 key,返回一个 boolean 值。另外,MapState 也提供了获取整个映射相关信息的方法:
- Iterable> entries():获取映射状态中所有的键值对;
- Iterable keys():获取映射状态中所有的键(key),返回一个可迭代 Iterable 类型;
- Iterable values():获取映射状态中所有的值(value),返回一个可迭代 Iterable类型;
- boolean isEmpty():判断映射是否为空,返回一个 boolean 值。
同样通过一个案例来了解-统计每种传感器每种水位出现的次数:
/** * 输出每种传感器每种水位值出现的次数 */ public class KeyedMapState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> { return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); sensorDS.keyBy(WaterSensor::getId) // process方法的参数类型: KIO .process(new KeyedProcessFunction<String, WaterSensor, String>() { // todo 1.定义状态 MapState<Integer,Integer> countState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // todo 2. 在open方法初始化状态 // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型 countState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("countState",Types.INT,Types.INT)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // todo 1. 添加水位值 if (countState.contains(value.getVc())){ countState.put(value.getVc(),countState.get(value.getVc())+1); }else { countState.put(value.getVc(),1); } // todo 2. 输出每种水位出现的次数 for (Map.Entry<Integer, Integer> entry : countState.entries()) { out.collect("传感器" + value.getId() + " 的水位值 " + entry.getKey() + " 出现了 " + entry.getValue() + "次"); } } }).print(); env.execute(); } }
用法和 Map 差不多,就是没有 getOrDefault 方法。
2.4、规约状态(ReducingState)
类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducintState这个接口调用的方法类似于 ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。
归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍 reduce 聚合算子时讲到的 ReduceFunction,所以状态类型跟输入的数据类型是一样的。
public ReducingStateDescriptor( String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}
这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的 ReduceFunction,另外两个参数则是状态的名称和类型。
案例-计算每种传感器的水位和
/** * 检测每种传感器的水为值,如果连续两个水位差值超过10就输出结果 */ public class KeyedReducingState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> { return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); sensorDS.keyBy(WaterSensor::getId) // process方法的参数类型: KIO .process(new KeyedProcessFunction<String, WaterSensor, String>() { // todo 1.定义状态 ReducingState<Integer> sumState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // todo 2. 在open方法初始化状态 // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型 sumState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("reduceState", new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1+value2; } }, Types.INT)); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // todo 1. 添加水位值 sumState.add(value.getVc()); // todo 2. 输出每种水位出现的次数 out.collect("传感器"+value.getId()+"的水位和为 "+sumState.get()); } }).print(); env.execute(); } }
2.5、聚合状态(AggregationState)
与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与 ReducingState 不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的 AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以和输入数据的类型不同,使用更加灵活。
同样地,AggregatingState 接口调用方法也与 ReducingState 相同,调用.add()方法添加元素时,会直接使用指定的 AggregateFunction 进行聚合并更新状态。
案例-统计每个传感器的水位平均值
/** * 统计每种传感器的平均水位值 */ public class KeyedAggreatingState { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()) // todo 指定 watermark 策略,我们直接使用实现好的 .assignTimestampsAndWatermarks(WatermarkStrategy // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 等待3s // 指定如何从数据中提取事件时间 .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> { return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms })); sensorDS.keyBy(WaterSensor::getId) // process方法的参数类型: KIO .process(new KeyedProcessFunction<String, WaterSensor, String>() { // todo 1.定义状态 AggregatingState<Integer,Double> avgState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // todo 2. 在open方法初始化状态 // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型 avgState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer,Integer>, Double>("avgState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() { @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0,0); } @Override public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) { return Tuple2.of(accumulator.f0+value,accumulator.f1+1); } @Override public Double getResult(Tuple2<Integer, Integer> accumulator) { return accumulator.f0*1D/accumulator.f1; // 两个相除要变成double要在分子*1D } @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { // 只有在会话窗口才需要写 merge 方法 //return Tuple2.of(a.f0+b.f0,a.f1+b.f1); return null; }},Types.TUPLE(Types.INT,Types.INT))); } @Override public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // todo 1. 添加水位值 avgState.add(value.getVc()); // todo 2. 输出每种水位出现的次数 out.collect("传感器"+value.getId()+"的平均水位为 "+avgState.get()); } }).print(); env.execute(); } }
代码解析:
- 这里我们用了累加器,我们把每个水位值转为(水为值,出现次数)的形式,最后统一对统一水为值进行平均值计算
- merge 方法只有在会话窗口中才需要去自定义
- int 除以 int 需要在分子上 *1D 才能使结果为 double
Flink(十一)【状态管理】(3)https://developer.aliyun.com/article/1532241