Flink之状态编程KeyedState的使用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)KeyedState


键控状态是根据输入数据流中定义的键(key)来维护和访问的。


Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算 子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动 将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的 状态。


Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream (keyBy 算子处理之后)。

15.png


(2)键控状态支持的数据类型


ValueState:


保存单个值.

每个有 key 有一个状态值.

设置使用update(T), 获取使用T value()

ListState:


保存元素列表.

添加元素: add(T) addAll(List< T >)

获取元素: Iterable< T > get()

覆盖所有元素: update(List< T >)

ReducingState:


存储单个值, 表示把所有元素的聚合结果添加到状态中. 与 ListState 类似, 但 是当使用 add(T)的时候 ReducingState 会使用指定的 ReduceFunction 进行聚合.


AggregatingState<IN, OUT>:


存储单个值. 与 ReducingState 类似, 都是进行聚合. 不同的是, AggregatingState 的聚合的结果和元素类型可以不一样.


MapState<UK, UV>:


存储键值对列表.

添加键值对: put(UK, UV) or putAll(Map<UK, UV>)

根据 key 获取值:get(UK)

获取所有: entries(), keys() and values()

检测是否为空: isEmpty()

注意:


所有的类型都有 clear(), 清空当前 key 的状态

这些状态对象仅用于用户与状态进行交互.

状态不是必须存储到内存,也可以存储在磁盘或者任意其他地方

从状态获取的值与输入元素的 key 相关


(3)代码实现


基于ValueState的开发实战

package com.aikfk.flink.datastream.state;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/31 4:04 下午
 */
public class KeyedStateCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String,Long>> dataStream = env.fromElements(
                Tuple2.of("a", 3L),
                Tuple2.of("a", 5L),
                Tuple2.of("b", 7L),
                Tuple2.of("c", 4L),
                Tuple2.of("c", 2L))
                .keyBy(value -> value.f0)
                .flatMap(new CountFunction());
        dataStream.print();
        env.execute("KeyedState");
    }
    static class CountFunction extends RichFlatMapFunction<Tuple2<String,Long>,Tuple2<String,Long>>{
        // 定义状态ValueState
        private ValueState<Tuple2<String,Long>> keyCount;
        /**
         * 初始化
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Tuple2<String,Long>> descriptor =
                    new ValueStateDescriptor<Tuple2<String, Long>>("keycount",
                            TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {}));
            keyCount = getRuntimeContext().getState(descriptor);
        }
        @Override
        public void flatMap(Tuple2<String, Long> input,
                            Collector<Tuple2<String, Long>> collector) throws Exception {
            // 使用状态
            Tuple2<String, Long> currentValue =
                    (keyCount.value() == null) ? new Tuple2<>("", 0L) : keyCount.value();
            // 累加数据
            currentValue.f0 = input.f0;
            currentValue.f1 ++;
            // 更新状态
            keyCount.update(currentValue);
            collector.collect(keyCount.value());
        }
    }
}

运行结果

3> (b,1)
11> (a,1)
11> (a,2)
8> (c,1)
8> (c,2)


基于keyedState的开发实战

package com.aikfk.flink.datastream.state;
import com.aikfk.flink.datastream.bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/1 2:09 下午
 */
public class KeyedStateTest {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2]));
                    }
                });
        // 3.按照id分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorDS.keyBy(WaterSensor::getId);
        // 4.演示状态的使用
        SingleOutputStreamOperator<WaterSensor> result = keyedStream.process(new MyStateProcessFunc());
        // 5.打印
        result.print();
        // 6.执行任务
        env.execute();
    }
    public static class MyStateProcessFunc extends KeyedProcessFunction<String,WaterSensor,WaterSensor> {
        // a.定义状态
        private ValueState<Long> valueState;
        private ListState<Long> listState;
        private MapState<String,Long> mapState;
        private ReducingState<WaterSensor> reducingState;
        private AggregatingState<WaterSensor,WaterSensor> aggregatingState;
        // b.初始化
        @Override
        public void open(Configuration parameters) throws Exception {
            valueState = getRuntimeContext()
                    .getState(new ValueStateDescriptor<Long>("value-state",Long.class));
            listState = getRuntimeContext()
                    .getListState(new ListStateDescriptor<Long>("list-state", Long.class));
            mapState = getRuntimeContext()
                    .getMapState(new MapStateDescriptor<String, Long>("map-state", String.class,Long.class));
//            reducingState = getRuntimeContext()
//                    .getReducingState(new ReducingStateDescriptor<WaterSensor>())
//            aggregatingState = getRuntimeContext()
//                    .getAggregatingState(new AggregatingStateDescriptor
//                            <WaterSensor, Object, WaterSensor>());
        }
        // c.状态的使用
        @Override
        public void processElement(WaterSensor value,
                                   Context context, Collector<WaterSensor> collector) throws Exception {
            // c.1 Value状态
            Long value1 = valueState.value();
            valueState.update(122L);
            valueState.clear();
            // c.2 List状态
            Iterable<Long> longs = listState.get();
            listState.add(122L);
            listState.clear();
            listState.update(new ArrayList<>());
            // c.3 Map状态
            Iterator<Map.Entry<String,Long>> iterator = mapState.iterator();
            Long aLong = mapState.get("");
            mapState.contains("");
            mapState.put("",122L);
            mapState.putAll(new HashMap<>());
            mapState.remove("");
            mapState.clear();
            //c.4 Reduce状态
            WaterSensor waterSensor1 = reducingState.get();
            reducingState.add(new WaterSensor());
            reducingState.clear();
            //c.5 Agg状态
            aggregatingState.add(value);
            WaterSensor waterSensor2 = aggregatingState.get();
            aggregatingState.clear();
        }
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
12
分享
相关文章
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
flink-sql(table api 编程)
table api 基本使用 tableEnvironment 和 streamTableEnvironment 注册表,临时表,持久表 Table api 和 table sql 混用 table api 和 datastream 混用 table api 的输入和输出(kafka) kafka的高级特性option
flink-sql(table api 编程)
Flink SQL 核心概念剖析与编程案例实战
本文使用了 Docker 镜像快速安装一些基础组件,zk 和 kafka,并通过案例的方式,剖析了 SQL 的概念与详细的使用方式

热门文章

最新文章