Flink(十一)【状态管理】(1)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十一)【状态管理】

Flink 状态管理

       我们一直称 Flink 为运行在数据流上的有状态计算框架和处理引擎。在之前的章节中也已经多次提到了“状态”(state),不论是简单聚合、窗口聚合,还是处理函数的应用,都会有状态的身影出现。状态就如同事务处理时数据库中保存的信息一样,是用来辅助进行任务计算的数据。而在 Flink 这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时正确地恢复。这就需要一套完整的管理机制来处理所有的状态。

1、Flink 中的状态

       在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。

1.1、概述

       在 Flink 中,算子任务可以分为有状态无状态两种情况。

1.1.1、无状态算子任务

       无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。比如我们之前学的 map、flatMap、filter 等,计算时不依赖其它数据,就属于无状态的算子

1.1.2、有状态算子任务

       而有状态的算子任务则除了当前数据外,还需要一些其他数据来得到计算结果。这里的“其他数据”就是所谓的状态(State)。比如我们之前学的 聚合算子、窗口算子都属于有状态的算子。

       比如我们之前窗口函数中学的增量聚合函数,每来一条数据它都会把处理后的结果保存到一个中间值,当窗口内再来一条数据就更新这个中间值,这个中间值就是所谓的状态。

       再比如我们的全窗口函数,它会把来的所有数据都保存到一个中间值当中,直到窗口关闭时才会触发计算,同样,这个中间值就是所谓的状态。

       此外还有我们的一些聚合算子比如 sum、min、max 等,它肯定是要把中间结果存储起来的,所以这都叫有状态算子。

1.2、状态的分类

1.2.1、托管状态(Managed State)和原始状态(Raw State)

       Flink 的状态有两种:托管状态(Managed State)原始状态(Raw State)。托管状态就是由 Flink 统一管理的,也就是管理状态的存储、访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

       通常我们都是采用 Flink 托管状态来实现需求。只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况下不推荐使用。

1.2.2、算子状态和按键分区状态

       我们知道在 Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。

       基于这样的想法,我们又可以将托管状态分为两类:算子状态按键分区状态

(1)算子状态(Operator State)

       状态作用范围限定为当前的算子任务的各个子任务,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。

       算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们需要进一步实现 CheckpointedFunction 接口 。

       新版本的 Flink 重构了 Source (也就是使用 fromSource 的写法),所以新版本则需要继承 SourceReaderBase 抽象类。

(2)按键分区状态

状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就 keyBy 之后才可以使用。

       按键分区状态应用非常广泛。之前讲到的聚合算子必须在 keyBy 之后才能使用,就是因为聚合的结果是以 Keyed State 的形式保存的。另外,也可以通过富函数类(Rich Function)来自定义 Keyed State,所以只要提供了富函数类接口的算子,也都可以使用 Keyed State。所以即使是 map、filter 这样无状态的基本转换算子,我们也可以通过富函数类给它们 “追加” Keyed State,或者实现 CheckpointedFunction 接口来定义 Operator State;从这个角度讲,Flink 中所有的算子都可以是有状态的,所以说 Flink 是“有状态的流处理”。

       无论是 Keyed State 还是 Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着自己的状态,算子的子任务之间状态不共享

2、按键分区状态(Keyed State)

       在实际应用中,我们一般都需要将数据按照某个 key 进行分区,然后再进行计算处理;所以最为常见的状态类型就是 Keyed State。之前介绍到 keyBy 之后的聚合、窗口计算,算子所持有的状态,都是 Keyed State。

       另外,我们还可以通过富函数类(Rich Function)对转换算子进行扩展、实现自定义功能,比如 RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是 Keyed State。

       按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以 key 为作用范围进行隔离。我们知道,在进行按键分区(keyBy)之后,具有相同键的所有数据,都会分配到同一个并行子任务中;所以如果当前任务定义了状态,Flink 就会在当前并行子任务实例中,为每个键值维护一个状态的实例。于是当前任务就会为分配来的所有数据,按照 key 维护和处理对应的状态。

因为一个并行子任务可能会处理多个 key 的数据,所以 Flink 需要对 Keyed State 进行一些特殊优化。在底层,Keyed State 类似于一个分布式的映射(map)数据结构,所有的状态会根据 key 保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的 key,从 map 存储中读取出对应的状态值。所以具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。这种将状态绑定到 key 上的方式,相当于使得状态和流的逻辑分区一一对应了:不会有别的 key 的数据来访问当前状态;而当前状态对应 key 的数据也只会访问这一个状态,不会分发到其他分区去。这就保证了对状态的操作都是本地进行的,对数据流和状态的处理做到了分区一致性。

       另外,在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同。需要注意,使用 Keyed State 必须基于 KeyedStream。没有进行 keyBy 分区的 DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问 Keyed State。

       需要注意:使用 Keyed State 必须基于 KeyedStream

实际应用中,需要保存为状态的数据会有各种各样的类型,有时还需要复杂的集合类型,比如列表(List)和映射(Map)。对于这些常见的用法,Flink 的按键分区状态(Keyed State)提供了足够的支持。接下来我们就来了解一下 Keyed State 所支持的结构类型:

2.1、值状态(ValueState)

顾名思义,状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定义如下:

public interface ValueState<T> extends State {
    T value() throws IOException;
    void update(T value) throws IOException;
}

这里的 T 是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是 ValueState。

我们可以在代码中读写值状态,实现对于状态的访问和更新。

  • T value():获取当前状态的值;
  • update(T value):对状态进行更新,传入的参数 value 就是要覆写的状态值。

在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState 的状态描述器构造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) {
     super(name, typeClass, null);
}

这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。有了这个描述器,运行时环境就可以获取到状态的控制句柄(handler)了。

接下来演示一个案例:对连续两个水位值超过10的传感器输出报警信息

/**
 * 检测每种传感器的水为值,如果连续两个水位差值超过10就输出结果
 */
public class KeyedValueState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                            return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                        }));
 
        sensorDS.keyBy(WaterSensor::getId)
                // process方法的参数类型: KIO
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    // todo 1.定义状态
                    ValueState<Integer> lastVcState;    // 初始化必须在生命周期中定义,因为初始化需要运行时环境,这里环境还没启动会初始化失败
 
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // todo 2. 在open方法初始化状态
                        // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                        lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
                    }
 
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        // 要和上一条水位值进行比较
                        // todo 1. 取出上一条数据的水位值
                        // Integer的初始值为null 这里遇到第一条水位时需要注意判断null
                        int last_value = lastVcState.value()==null?0:lastVcState.value();
                        // todo 2. 判断两条水位是否都超过 10
                        if (Math.abs(value.getVc()-last_value)>10)
                            System.out.println("传感器"+value.getId()+"当前水位值="+value.getVc()+",与上一条水位值="+last_value+"相差超过10!!!");
                        // todo 3. 更新上一条水位值
                        lastVcState.update(value.getVc());
                    }
                }).print();
 
        env.execute();
    }
}

代码解析:

  • 我们说过,如果没有合适的函数能解决我们的需求,那就直接用 process 就行了。
  • ValueState 应该在 open 方法中去初始化,因为直接在程序执行前去尝试调用 getRuntimeContext 获取执行环境是获取不到的。
  • 要用 ValueState 而不是 int 也不是 hashMap
  • 用 int 就不能区分不同的传感器了
  • 用 hashMap 的效率要比 ValueState 低而且存在一些别的问题
  • 注意初始值 Integer 的初始值为 null

2.2、列表状态(ListState)

将需要保存的数据,以列表(List)的形式组织起来。在 ListState接口中同样有一个类型参数 T,表示列表中数据的类型。ListState 也提供了一系列的方法来操作状态,使用方式与一般的 List 非常相似。

  • Iterable get():获取当前的列表状态,返回的是一个可迭代类型 Iterable;
  • update(List values):传入一个列表 values,直接对状态进行覆盖;
  • add(T value):在状态列表中添加一个元素 value;
  • addAll(List values):向列表中添加多个元素,以列表 values 形式传入。

类似地,ListState 的状态描述器就叫作 ListStateDescriptor,用法跟 ValueStateDescriptor完全一致。

同样我们演示一个案例:针对每种传感器输出最高的3个水位值:

 

/**
 * 获取每个水位器的 top3 的水位值
 */
public class KeyedListState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
 
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction())
                // todo 指定 watermark 策略,我们直接使用实现好的
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                        // 指定如何从数据中提取事件时间
                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                            return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                        }));
 
        sensorDS.keyBy(WaterSensor::getId)
                // process方法的参数类型: KIO
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    // todo 1.定义状态
                    ListState<Integer> maxState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // todo 2. 在open方法初始化状态
                        // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                        maxState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("maxState",Types.INT));
                    }
 
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        // todo 1. 添加水位值
                        // Integer的初始值为null 这里遇到第一条水位时需要注意判断null
                        maxState.add(value.getVc());
                        // todo 2. 排序
                        List<Integer> list = new ArrayList<>();
                        for (int vc : maxState.get()) {
                            list.add(vc);
                        }
                        list.sort((o1, o2) -> o2 - o1);
                        if (list.size()>3)  // 一超过3立即清理,防止数据量大的排序开销
                            list.remove(3);
                        // todo 3. 输出top3
                        out.collect("传感器="+value.getId()+"最大的3个水为值为"+list.toString());
                        
                    }
 
                }).print();
 
        env.execute();
    }
}

代码解析:

  • 这里同样不用普通的 List,因为它不能按 key 分组
  • 这里用了一个简单的优化,if(list.size()>3) list.remove(3) 使得list永远只有3个值,减少了大数据场景下每次的遍历开销
  • list.sort((o2,o1)->o2-o1) lambda基础知识

Flink(十一)【状态管理】(2)https://developer.aliyun.com/article/1532239

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
589 1
|
消息中间件 存储 Kafka
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
|
6月前
|
存储 分布式计算 IDE
Flink(十一)【状态管理】(4)
Flink(十一)【状态管理】
|
7月前
|
存储 Cloud Native 数据处理
Flink 2.0 状态管理存算分离架构演进
本文整理自阿里云智能 Flink 存储引擎团队负责人梅源在 Flink Forward Asia 2023 的分享,梅源结合阿里内部的实践,分享了状态管理的演进和 Flink 2.0 存算分离架构的选型。
1235 1
Flink 2.0 状态管理存算分离架构演进
|
6月前
|
消息中间件 Kafka 流计算
Flink(十一)【状态管理】(3)
Flink(十一)【状态管理】
|
6月前
|
传感器 流计算
Flink(十一)【状态管理】(2)
Flink(十一)【状态管理】
|
7月前
|
消息中间件 SQL Java
实时计算 Flink版产品使用合集之管理内存webui上一直是百分百是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版产品使用合集之web ui能否在线管理数据source和处理数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
7月前
|
存储 Java API
Flink中的状态管理是什么?请解释其作用和常用方法。
Flink中的状态管理是什么?请解释其作用和常用方法。
84 0