大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink 广播状态

基本概念、代码案例、测试结果

状态存储

Flink的一个重要特性就是有状态计算(stateful processing),Flink提供了简单易用的API来存储和获取状态,但是我们还是要理解API背后的原理,才能更好的使用。


State存储方式

Flink为State提供了三种开箱即用的后端存储方式(state backend):


Memory State Backend

File System (FS)State Backend

RocksDB State Backend

MemoryStateBackend

MemoryStateBackend将工作状态数据保存在TaskManager的Java内存中。Key/Value状态和Window算子使用哈希表存储数值和触发器。进行快照时(CheckPointing),生成的快照数据将和 CheckPoint ACK消息一起发送给 JobManager,JobManager将收到的所有快照数据保存在Java内存中。

MemoryStateBackend现在被默认配置成异步的,这样避免阻塞主线程的pipline处理,MemoryStateBackend的状态存取的数据都非常快,但是不适合生产环境中使用。这是以为它有以下限制:


每个state的默认大小被限制为5MB(这个值可以通过MemoryStateBackend构造方法设置)

每个Task的所有State数据(一个Task可能包含一个Pipline中的多个的Operator)大小不能超过RPC系统的帧大小(akk.framesize 默认10MB)

JobManager收到的State数据总和不能超过JobManager内存

MemoryStateBackend适合的场景:


本地开发和调试

状态很小的作业

FsStateBackend

FsStateBackend 需要配置一个CheckPoint路径,例如:hdfs://xxxxxxx 或者 file:///xxxxx,我们一般都会配置HDFS的目录。

FsStateBackend将工作状态数据保存在TaskManager的Java内存中,进行快照时,再将快照数据写入上面的配置的路径,然后将写入的文件路径告知JobManager。JobManager中保存所有状态的元数据信息(在HA的模式下,元数据会写入CheckPoint目录)。

FsStateBackend 默认使用异步方式进行快照,防止阻塞主线程的Pipline处理,可以通过

FsStateBackend构造函数取消该模式:

new FsStateBackend(path, false)

FsStateBackend 适合的场景:

  • 大状态、长窗口、大键值(键或者值很大)状态的作业
  • 适合高可用方案

RocksDBStateBackend

RocksDBStateBackend 也需要配置一个CheckPoint路径,例如:hdfs://xxx 或者 file:///xxx,一般是 HDFS 路径。

RocksDB是一种可嵌入的可持久型的 key-value 存储引擎,提供 ACID 支持。由Facebook基于LevelDB开发,使用LSM存储引擎,是内存和磁盘的混合存储。

RocksDBStateBackend将工作状态保存在TaskManager的RocksDB数据库中,CheckPoint时,RocksDB中的所有数据会被传输到配置的文件目录,少量元数据信息保存在JobManager内存中(HA模式下,会保存在CheckPoint目录)。

RocksDBStateBackend使用异步方式进行快照,RocksDBStateBackend的限制:


由于RocksDB的JNI Bridge API是基于 byte[] 的,RocksDBStateBackend支持的每个Key或者每个Value的最大值不超过 2的31次方(2GB)

要注意的是,有merge操作的状态(例如:ListState),可能会在运行过程中超过2的31次时,导致程序失败。

RocksDBStateBackend适用于以下的场景:

超大状态、超长窗口(天)、大键值状态的作业

适合高可用模式

使用RocksDBStateBackend时,能够限制状态大小是TaskManager磁盘空间(相对于FsStateBackend状态大小限制与TaskManager内存)。这也导致RocksDBStateBackend的吞吐比其他两个要低一些,因为RocksDB的状态数据的读写都要经过反序列化/序列化。


RocksDBStateBackend时目前三者中唯一支持增量CheckPoint的。

三者吞吐量对比

KeyedState 和 Operator State

State分类

Operator State

(或 non-keyed state):

每个Operator State绑定一个并行的Operator实例,KafkaConnector是使用OperatorState的典型示例:每个并行的Kafka Consumer实例维护了每个Kafka Topic分区和该分区Offset的映射关系,并将这个映射关系保存为OperatorState。

在算子并行度改变时,OperatorState也会重新分配。


Keyed State

这种State只存在于KeyedStream上的函数和操作中,比如Keyed UDF(KeyedProcessFunction)Window State。可以把Keyed State想象成被分区的OperatorState。每个KeyedState在逻辑上可以看成与一个 <parallel-operator-instance, key> 绑定,由于一个key肯定只存在于一个Operator实例,所以我们可以简单的的认为一个 <operator, key>对应一个 KeyedState。

每个KeyedState在逻辑上还会被分配一个KeyGroup,分配方法如下:

MathUtils.murmurHash(key.hashCode()) % maxParallelism;

其中maxParallelism是Flink程序的最大并行度,这个值一般我们不会去手动设置,使用默认的值(128)就好,这里注意下,maxParallelism和我们运行程序时指定的算子并行度(parallelism)不同,parallelism不能大于maxParallelism,最多两者相等。


为什么会有 Key Group这个概念呢?

我们通常写程序,会给算子指定一个并行度,运行一段时间后,积累了一些State,这时候数据量大了,需要增加并行度。我们修改并行度后重新提交,那这些已经存在的State该如何分配到各个Operator呢?这就有了最大并行度和KeyGroup的概念。

上面的计算公式也说明了KeyGroup的个数最多是maxParallelism个。当并行度改变之后,我们在计算

这个Key被分配到的Operator:

keyGroupId * paralleism / maxParallelism;

可以看到,一个KeyGroupId会对应一个Operator,当并行度更改时,新的Operator会去拉取对应的KeyGroup的KeyedState,这样就把KeyedState尽量均匀的分配给所有的Operator了。

根据State数据是否被Flink托管,Flink又将State分类为:


Managed State:被Flink托管,保存为内部的哈希表或者RocksDB,CheckPoint时,Flink将State进行序列化编码。例如:ValueState ListState

Row State:Operator 自行管理的数据结构, Checkpoint时,它们只能以byte数据写入CheckPoint。

建议使用 Managed State,当使用 Managed State时,Flink会帮助我们更改并行度时重新分配State,优化内存。


使用ManageKeyedState

如何创建?

上面提到,KeyedState只能在KeyedStream上使用,可以通过Stream.keyBy创建KeyedStream,我们可以创建以下几种:


ValueState

ListState

ReducingState

AggregatingState<IN,OUT>

MapState<UK,UV>

FoldingState<T,ACC>

每种State都对应各种的描述符,通过描述符RuntimeContext中获取对应的State,而RuntimeContext只有RichFunction才能获取,所以想要使用KeyedState,用户编写的类必须继承RichFunction或者其他子类。


ValueState getState(ValueStateDescriptor)

ReducingState getReducingState(ReducingStateDescriptor)

ListState getListState(ListStateDescriptor)

AggregationState<IN,OUT> getAggregatingState(AggregatingStateDescriptor<IN,ACC,OUT>)

FoldingState<T,ACC> getFoldingState(FoldingStateDescriptor<T,ACC>)

MapState<UK,UV> getMapState(MapStateDescriptor<UK,UV>)

给KeyedState设置过期时间

在Flink 1.6.0 以后,还可以给KeyedState设置 TTL(Time-To-Live),当某一个Key的State数据过期时,会被StateBackend尽力删除。

官方给出了示例:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1)) // 状态存活时间
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // TTL 何时被更新,这里配置的 state 创建和写入时
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();// 设置过期的 state 不被读取
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("textstate", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

State的TTL何时被更新?

可以进行以下配置,默认只是key的state被modify(创建或者更新)的时候才更新TTL:


StateTtlConfig.UpdateType.OnCreateAndWrite:只在一个key的state创建和写入时更新TTL(默认)

StateTtlConfig.UpdateType.onReadAndWrite:读取state时仍然更新TTL

当State过期但是还未删除时,这个状态是否还可见?

可以进行以下配置,默认是不可见的:


StateTtlConfig.StateVisibility.NerverReturnExpired:不可见(默认)

StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp:可见

注意:


状态的最新访问时会和状态数据保存在一起,所以开启TTL特性增大State的大小,Heap State Backend会额外存储一个包括用户状态以及时间戳的Java8对象,RocksDB StateBackend会在每个状态值(list或者map的每个元素)序列化后增加8个字节。

暂时只支持基于 Processing Time的TTL。

尝试从CheckPoint/SavePoint进行恢复时,TTL的状态(是否开启)必须和之前保存一致,否则会遇到:StateMigrationException。

TTL的配置并不会保存在CheckPoint/SavePoint中,仅对当前的Job有效。

当前开启TTL的MapState仅在用户序列化支持NULL的情况下,才支持用户值为NULL,如果用户值序列化器不支持NULL,可以用NullableSerializer包装一层。

使用ManageOperatorState

(这里以及后续放到下一篇:大数据-127 Flink)


目录
相关文章
|
17天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
52 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
18天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
zdl
|
4天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
23 0
|
1月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
53 3
|
1月前
|
消息中间件 分布式计算 druid
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
38 2
|
1月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
57 1
|
1月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
|
1月前
|
SQL 运维 大数据
大数据实时计算产品的对比测评
在使用多种Flink实时计算产品后,我发现Flink凭借其流批一体的优势,在实时数据处理领域表现出色。它不仅支持复杂的窗口机制与事件时间处理,还具备高效的数据吞吐能力和精准的状态管理,确保数据处理既快又准。此外,Flink提供了多样化的编程接口和运维工具,简化了开发流程,但在界面友好度上还有提升空间。针对企业级应用,Flink展现了高可用性和安全性,不过价格因素可能影响小型企业的采纳决策。未来可进一步优化文档和自动化调优工具,以提升用户体验。
116 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
56 0
|
1月前
|
SQL 大数据
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(二)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(二)
65 0