Flink(十一)【状态管理】(4)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink(十一)【状态管理】

Flink(十一)【状态管理】(3)https://developer.aliyun.com/article/1532241

3.2、联合列表状态(UnionListState)

       与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

并行度 = 2
算子1    算子2
 1         2
 3         4
 5         6
并行度 2->3
//普通ListState:
算子1    算子2    算子3
 1        2        3
 4        5        6
//UnionListState:
算子1    算子2    算子3
 1        1        1        
 2        2        2
 3        3        3
 4        4        4
 5        5        5
 6        6        6

可以看到,当我们的分区进行重新调整时,ListState 会把数据先搜集在一起,再重新以轮询的方式进行数据的分配,而 UnionListState 同样会把数据先搜集在一起再把全量数据以广播的形式分配给每个并行算子

只需要修改上面的代码为:

@Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            // 3.1 从上下文初始化算子状态
            countState = context.getOperatorStateStore()
                    .getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));
            // 3.2 从算子状态中把数据拷贝到本地变量
            if (context.isRestored()) { // 如果初始化状态成功
                for (Long c : countState.get()) {
                    count += c;
                }
            }
        }

3.3、广播状态(BroadcastState)

       有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。

       因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。

       在底层,广播状态是以类似映射结构(map)的键值对(key-value)来保存的,必须基于一个“广播流”(BroadcastStream)来创建。

案例-水位超过阈值发送警告,阈值可以动态修改

public class OperatorBroadcastState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
 
        // 数据流
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("localhost", 9999)
                .map(new WaterSensorFunction());
 
        // 广播流 - 用来广播配置-动态修改水位报警阈值
        DataStreamSource<String> thresholdDS = env.socketTextStream("localhost", 8888);
 
        // TODO 1. 将 配置流 广播
        // 返回一个带有广播状态的广播流
        MapStateDescriptor<String, Integer> broadcastMapDescriptor = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
        BroadcastStream<String> configDS = thresholdDS.broadcast(broadcastMapDescriptor);
 
        // TODO 2. 把 数据流 和 配置流 connect
        BroadcastConnectedStream<WaterSensor, String> sensorCS = sensorDS.connect(configDS);
 
        // TODO 3. 调用process
        sensorCS.process(new BroadcastProcessFunction<WaterSensor, String, String>() {
            // 处理数据流
            @Override
            public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                // TODO 5. 通过上下文获取广播状态,取出状态中的值
                // 广播状态对数据流只能读
                ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapDescriptor);
                Integer threshold = broadcastState.get("threshold");
                // 防止数据流已经来数据了,但广播流开始未指定阈值
                threshold = threshold==null?0:threshold;
                if (value.getVc()>threshold){
                    out.collect("传感器 "+value.getId()+" 的水位超过指定的阈值: "+threshold+" !!!");
                }
            }
 
            // 处理广播流
            @Override
            public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                // TODO 4. 通过上下文获取广播状态, 往状态里面写数据
                BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapDescriptor);
                broadcastState.put("threshold",Integer.valueOf(value));
            }
        }).print();
 
        env.execute();
    }
}

测试:

netcat(9999端口) |  console
-------------------------------------------------
s1,1,1        1> 传感器 s1 的水位超过指定的阈值: 0 !!!
5    // 8888端口修改阈值为5
s1,1,1
s1,1,4
s1,1,5
s1,1,6        1> 传感器 s1 的水位超过指定的阈值: 5 !!!

4、状态后端(State Backends)

       在 Flink 的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink 对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。

       状态后端主要负责管理本地状态的存储方式和位置

4.1、状态后端的分类

       状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。

Flink 中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是 HashMapStateBackend。

(1)哈希状态后端(HashMapStateBackend)

       这种方式会把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。

       对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。HashMapStateBackend 是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。

(2)内嵌 RocksDB 状态后端(RocksDB)

       RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB默认存储在 TaskManager 的本地数据目录里

       与 HashMapStateBackend 直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB 中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key 的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。

       对于检查点,同样会写入到远程的持久化文件系统中。EmbeddedRocksDBStateBackend 始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。

       由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。

4.2、如何选择正确的状态后端

       HashMap 和 RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是 RocksDB。在实际应用中,选择那种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。

       HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

       而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。

       我们可以发现,实际应用就是权衡利弊后的取舍。最理想的当然是处理速度快且内存不受限制可以处理海量状态,那就需要非常大的内存资源了,这会导致成本超出项目预算。比起花更多的钱,稍慢的处理速度或者稍小的处理规模,老板可能更容易接受一点。

4.3、状态后端的配置

       在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件 flink-conf.yaml 中指定的,配置的键名称为 state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

(1)修改默认的状态后端配置
//默认状态后端
state.backend: hashmap
// 或者修改为 rocksdb
state.backend: rocksdb
//存放检查点的文件路径
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
(2)为每个作业(Per-job/Application模式)单独配置状态后端

指定状态后端为 HashMapStateBackend:

env.setStateBackend(new HashMapStateBackend());

指定状态后端为 EmbeddedRocksDBStateBackend:

env.setStateBackend(new EmbeddedRocksDBStateBackend());

需要注意,如果想在 IDE 中使用 EmbeddedRocksDBStateBackend,需要为 Flink 项目添加依赖:

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>

而由于 Flink 发行版中(lib/flink-dist-1.17.0.jar)默认就包含了 RocksDB,所以我们打包项目的时候不需要打包这个依赖。

(3)提交参数时指定
flink run-application -t yarn-application
-p    3
-Dstate.backend.type=rocksdb
-c 全类名
jar包

总结

       到今天10点,终于是把专业课的期末考完了,不由得要吐槽一下这试卷实在劣质,上午考的 Spark 考得一堆角落里没有的边角料,Spark 正二八经的核心重点几乎是没考,比如RDD 血缘关系、运行架构 ... 反倒是考了一堆细枝末节(Scala 的 for 花样循环守卫 ...),无语无语。还有一堆事情让人焦虑 ...

       最近确实是累,断更 7 天了,这不下午两点都没睡就又来学习了。宿舍实在是人间炼狱,来了自习室以为会好点,就这功夫,左边的哥们又开始抖腿了,前边坐着的情侣实在碍眼,来自习室的路上看到了一对对情侣,想想咱也读了次大学,混的连个对象都没有,这不能怪咱不争取,属实是这大环境不好,毕业都眼瞅着吃土了还哪有那心思呢。

       眼下每一次偷懒和懈怠都是罪恶的,愈发觉得时间的珍贵,现在努力学习保持单身才是对自己、对自己未来老婆孩子的负责哇。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
589 1
|
消息中间件 存储 Kafka
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
Flink---11、状态管理(按键分区状态(值状态、列表状态、Map状态、归约状态、聚合状态)算子状态(列表状态、广播状态))
|
7月前
|
存储 Cloud Native 数据处理
Flink 2.0 状态管理存算分离架构演进
本文整理自阿里云智能 Flink 存储引擎团队负责人梅源在 Flink Forward Asia 2023 的分享,梅源结合阿里内部的实践,分享了状态管理的演进和 Flink 2.0 存算分离架构的选型。
1242 1
Flink 2.0 状态管理存算分离架构演进
|
6月前
|
消息中间件 Kafka 流计算
Flink(十一)【状态管理】(3)
Flink(十一)【状态管理】
|
6月前
|
传感器 流计算
Flink(十一)【状态管理】(2)
Flink(十一)【状态管理】
|
6月前
|
存储 传感器 大数据
Flink(十一)【状态管理】(1)
Flink(十一)【状态管理】
|
7月前
|
消息中间件 SQL Java
实时计算 Flink版产品使用合集之管理内存webui上一直是百分百是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版产品使用合集之web ui能否在线管理数据source和处理数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
7月前
|
存储 Java API
Flink中的状态管理是什么?请解释其作用和常用方法。
Flink中的状态管理是什么?请解释其作用和常用方法。
87 0
下一篇
DataWorks