Flink(十一)【状态管理】(2)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十一)【状态管理】

Flink(十一)【状态管理】(1)https://developer.aliyun.com/article/1532238

2.3、Map 状态(MapState)

把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组 key-value 映射的列表。对应的 MapState接口中,就会有 UK、UV 两个泛型,分别表示保存的 key和 value 的类型。同样,MapState 提供了操作映射状态的方法,与 Map 的使用非常类似。

  • UV get(UK key):传入一个 key 作为参数,查询对应的 value 值;
  • put(UK key, UV value):传入一个键值对,更新 key 对应的 value 值;
  • putAll(Map map):将传入的映射 map 中所有的键值对,全部添加到映射状态中;
  • remove(UK key):将指定 key 对应的键值对删除;
  • boolean contains(UK key):判断是否存在指定的 key,返回一个 boolean 值。另外,MapState 也提供了获取整个映射相关信息的方法:
  • Iterable> entries():获取映射状态中所有的键值对;
  • Iterable keys():获取映射状态中所有的键(key),返回一个可迭代 Iterable 类型;
  • Iterable values():获取映射状态中所有的值(value),返回一个可迭代 Iterable类型;
  • boolean isEmpty():判断映射是否为空,返回一个 boolean 值。

同样通过一个案例来了解-统计每种传感器每种水位出现的次数:

/**
 * 输出每种传感器每种水位值出现的次数
 */
public class KeyedMapState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                            return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                        }));
 
        sensorDS.keyBy(WaterSensor::getId)
                // process方法的参数类型: KIO
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    // todo 1.定义状态
                    MapState<Integer,Integer> countState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // todo 2. 在open方法初始化状态
                        // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                        countState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("countState",Types.INT,Types.INT));
                    }
 
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        // todo 1. 添加水位值
                        if (countState.contains(value.getVc())){
                            countState.put(value.getVc(),countState.get(value.getVc())+1);
                        }else {
                            countState.put(value.getVc(),1);
                        }
                        // todo 2. 输出每种水位出现的次数
                        for (Map.Entry<Integer, Integer> entry : countState.entries()) {
                            out.collect("传感器" + value.getId() + " 的水位值 " + entry.getKey() + " 出现了 " + entry.getValue() + "次");
                        }
                    }
 
                }).print();
 
        env.execute();
    }
}

用法和 Map 差不多,就是没有 getOrDefault 方法。

2.4、规约状态(ReducingState)

       类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducintState这个接口调用的方法类似于 ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。

       归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍 reduce 聚合算子时讲到的 ReduceFunction,所以状态类型跟输入的数据类型是一样的。

public ReducingStateDescriptor(
     String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) 
{...}

这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的 ReduceFunction,另外两个参数则是状态的名称和类型。

案例-计算每种传感器的水位和

/**
 * 检测每种传感器的水为值,如果连续两个水位差值超过10就输出结果
 */
public class KeyedReducingState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                            return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                        }));
 
        sensorDS.keyBy(WaterSensor::getId)
                // process方法的参数类型: KIO
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    // todo 1.定义状态
                    ReducingState<Integer> sumState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // todo 2. 在open方法初始化状态
                        // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                        sumState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("reduceState", new ReduceFunction<Integer>() {
                            @Override
                            public Integer reduce(Integer value1, Integer value2) throws Exception {
                                return value1+value2;
                            }
                        }, Types.INT));
                    }
 
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        // todo 1. 添加水位值
                        sumState.add(value.getVc());
                        // todo 2. 输出每种水位出现的次数
                        out.collect("传感器"+value.getId()+"的水位和为 "+sumState.get());
                    }
 
                }).print();
 
        env.execute();
    }
}

2.5、聚合状态(AggregationState)

       与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与 ReducingState 不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的 AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以和输入数据的类型不同,使用更加灵活

       同样地,AggregatingState 接口调用方法也与 ReducingState 相同,调用.add()方法添加元素时,会直接使用指定的 AggregateFunction 进行聚合并更新状态。

案例-统计每个传感器的水位平均值

/**
 * 统计每种传感器的平均水位值
 */
public class KeyedAggreatingState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                            return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                        }));
 
        sensorDS.keyBy(WaterSensor::getId)
                // process方法的参数类型: KIO
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    // todo 1.定义状态
                    AggregatingState<Integer,Double> avgState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // todo 2. 在open方法初始化状态
                        // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                        avgState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer,Integer>, Double>("avgState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
                            @Override
                            public Tuple2<Integer, Integer> createAccumulator() {
                                return Tuple2.of(0,0);
                            }
 
                            @Override
                            public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                                return Tuple2.of(accumulator.f0+value,accumulator.f1+1);
                            }
 
                            @Override
                            public Double getResult(Tuple2<Integer, Integer> accumulator) {
                                return accumulator.f0*1D/accumulator.f1;    // 两个相除要变成double要在分子*1D
                            }
 
                            @Override
                            public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                                // 只有在会话窗口才需要写 merge 方法
                                //return Tuple2.of(a.f0+b.f0,a.f1+b.f1);
                                return null;
                            }},Types.TUPLE(Types.INT,Types.INT)));
                    }
 
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        // todo 1. 添加水位值
                        avgState.add(value.getVc());
                        // todo 2. 输出每种水位出现的次数
                        out.collect("传感器"+value.getId()+"的平均水位为 "+avgState.get());
                    }
 
                }).print();
 
        env.execute();
    }
}

代码解析:

  • 这里我们用了累加器,我们把每个水位值转为(水为值,出现次数)的形式,最后统一对统一水为值进行平均值计算
  • merge 方法只有在会话窗口中才需要去自定义
  • int 除以 int 需要在分子上 *1D 才能使结果为 double

Flink(十一)【状态管理】(3)https://developer.aliyun.com/article/1532241

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
589 1
|
消息中间件 存储 Kafka
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
|
6月前
|
存储 分布式计算 IDE
Flink(十一)【状态管理】(4)
Flink(十一)【状态管理】
|
7月前
|
存储 Cloud Native 数据处理
Flink 2.0 状态管理存算分离架构演进
本文整理自阿里云智能 Flink 存储引擎团队负责人梅源在 Flink Forward Asia 2023 的分享,梅源结合阿里内部的实践,分享了状态管理的演进和 Flink 2.0 存算分离架构的选型。
1242 1
Flink 2.0 状态管理存算分离架构演进
|
6月前
|
消息中间件 Kafka 流计算
Flink(十一)【状态管理】(3)
Flink(十一)【状态管理】
|
6月前
|
存储 传感器 大数据
Flink(十一)【状态管理】(1)
Flink(十一)【状态管理】
|
7月前
|
消息中间件 SQL Java
实时计算 Flink版产品使用合集之管理内存webui上一直是百分百是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版产品使用合集之web ui能否在线管理数据source和处理数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
7月前
|
存储 Java API
Flink中的状态管理是什么?请解释其作用和常用方法。
Flink中的状态管理是什么?请解释其作用和常用方法。
87 0
下一篇
DataWorks