Flink state best practice| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink state best practice。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink state best practice】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10039


Flink state best practice

 

内容介绍:

一、State overview (State  概念回顾)

二、Which state backend to use?(StateBackend 的选择)

三、State usage best practice(一些使用 state 的心得)

四、Some tips of checkpoint(一些使用 checkpoint 的使用建议)

 

一、State 概念回顾

我们先回顾一下到底什么是 state,流式计算的数据往往是转瞬即逝, 当然,真实业务场景不时所有的数据都进来后就离开,没有任何东西留下来,那么留下来的东西其实就是称之为 state,中文可以翻译成状态。

在下图中,所有的原始数据进入用户代码之后再输出到下游,如果中间涉及到 state 的读写,这些状态会存储在本地的 state backend(可以对标成嵌入式本地 kv 存储)当中。

image.png

接下来我们会在四个维度来区分两种不同的 state:operator state 以及 keyed state。

1. 是否存在当前处理的 key(current key):operator state 是没有当前 key 的概念,而 keyed state 的数值总是与一个 current key 对应。

2. 存储对象是否 on heap: 目前 operator state backend 仅有一种 on-heap 的实现;而 keyed state backend 有 on-heap 和 off-heap(RocksDB)的多种实现。

3. 是否需要手动声明快照(snapshot)和恢复 (restore) 方法:operator state 需要手动实现 snapshot 和 restore 方法;而 keyed state 则由 backend 自行实现,对用户透明。

4. 数据大小:一般而言,我们认为 operator state 的数据规模是比较小的;认为 keyed state 规模是相对比较大的。需要注意的是,这是一个经验判断,不是一个绝对的判断区分标准。

 

二、StateBackend 的选择

1. state backend 的分类

下面这张图对目前广泛使用的三类 state backend 做了区分,其中绿色表示所创建的operator/keyed state backend 是 on-heap 的,黄色则表示是 off-heap 的。

 image.png

·OperatorState 没有 current key 的概念

·KeyedState 的数值总是与一个 current key 对应的

·OperatorState 只有堆内存一种实现

·KeyedState 有堆内存和 RocksDB 两种实现

·OperatorState 需要手动实现 snapshot 和 restore 方法

·KeyedState 由 backend 实现,对用户透明

·OperatorState 一般被认为是规模比较小的

·KeyedState 一般是相对规模较大的

2. StateBackend 的选择

一般而言,在生产中,我们会在 FsStateBackend 和 RocksDBStateBackend 间选择:

· FsStateBackend:性能更好;日常存储是在堆内存中,面临着 OOM 的风险,不支持增量 checkpoint。

· RocksDBStateBackend:无需担心 OOM 风险,是大部分时候的选择。

3.RocksDB 的 state 存储

State1

 State2

KeyGroup+Key+Namespace  

 

value

KeyGroup+Key+Namespace  

value

 (1,K1,Window(10,20))  

v1

 (1,K2,Window(10,20))  

v2

 (1,K3,Window(10,20))  

v3

 (1,K4,Window(10,25))  

v4

(1,K1+MK1,VoidNameSpace)

v5

(1,K2+MK2,VoidNameSpace)

v6

...

...

...

...

在 RocksDB 中,每个 state 独享一个 Column Family,而每个 Column family 使用各自独享的 write buffer 和 block cache,上图中的 window state 和 value state 实际上分属不同的 column family。

4.RocksDB 的相关参数

·Flink-1.8 开始支持 ConfigurableOptionsFactory

state.backend.rocksdb.files.open

最大打开文件数目,‘-1’意味着没有限制,默认值‘5000’

state.backend.rocksdb.thread.num

态建议后台 flush 和 compaction 的线程数. 默认值 ‘1‘, 建议调大

state.backend.rocksdb.writebuffer.count

每个 column family 的 write buffer 数目,默认值 ‘2‘,建议当调大

state.backend.rocksdb.writebuffer.number-to-merge

写之前的write buffer merge数目,默认值 ‘1‘,建议当调大

state.backend.rocksdb.writebuffer.size

每个 write buffer 的 size,默认值‘4MB‘,建议调大

5. 如何配置 RocksDB 的 options

代码示例:

Public class MyOptionsFactory implenents

ConfigurableOptionsFactory {

private static final long DEFAULT SIZE = 256 * 1024* 1024:

// 256 MB

private long blockCacheSize = DEFAULT_SIZE;

@Override

public DBOptions createDBOptionsDBOptions

currentOptions){

return currentOptions.setIncreaseParallelism(4)

.setUseFsync(false);

}

@Override

Public ColumnFamilyOptions createColunnoptions

(ColunnFamilyoptions currentoptions){

return currentOptions.setTableFormatConfig(

new BlockBasedTableConfig()

.setBlockCacheSize(blockCacheSize)

.setBlockSize(128 * 1024)); // 128 KB

}

@Override

public OptionsFactory configure(Configuration configuration){

this.blockCacheSize =

configuration.getLong("my.custom.rocksdb.block.cache.size", DEFAULT_SIZE);

return this;

}

 

三、State best practice:一些使用 state 的心得

1.Operator state 使用建议

(1)慎重使用长 list

下图展示的是目前 task 端 operator state 在执行完 checkpoint 返回给 job master 端的 StateMetaInfo 的代码片段。

image.png 

代码示例:

class StateMetaInfo implements Serializable {

private static final long serialVersionUID =.3593817615858941166L;

private final long[] offsets;

private final Mode distributionMode;

public StateMetaInfo( long[] offsets,Mode distributionMode){

this.offsets = Preconditions.checkNotNull(offsets);

this.distributionMode = Preconditions.

checkNotNull(distributionMode)

由于 operator state 没有 key group 的概念,所以为了实现改并发恢复的功能,需要对 operator state 中的每一个序列化后的元素存储一个位置偏移 offset,也就是构成了上图红框中的 offset 数组。

如果 operator state 中的 list 长度达到一定规模时,这个 offset 数组就可能会有几十 MB 的规模,这个数组会返回给 job master,当 operator 的并发数目很大时,很容易触发 job master 的内存超用问题。

我们遇到过用户把 operator state 当做黑名单存储,黑名单规模很大,导致一旦开始执行 checkpoint,job master 就会因为收到 task 发来的“巨大”的 offset 数组,而内存不断增长直到超用无法正常响应。

(2)正确使用 UnionListState

union list state 目前被广泛使用在 kafka connector 中,不过可能用户日常开发中较少遇到,他的语义是从检查点恢复之后每个并发 task 内拿到的是原先所有operator 上的 state,如下图所示:

image.png

l restore 后,每个 subTask 均恢复了与之前所有并发的 state。

l 目前 Flink 内部的使用都是为了获取之前的全局信息,在下一次 snapshot 时,仅使用其中一部分做 snapshot。

l 切勿在下一次 snapshot 时进行全局 snapshot

2.Keyed state 使用建议

(1)如何正确清空当前的 state

state.clear() 实际上只能清理当前 key 对应的 value 值,如果想要清空整个 state,需要借助于 applyToAllKeys 方法,具体代码片段如下:

image.png

(2) RocksDB 中考虑 value 值很大的极限场景

l 受限于 JNI bridge API 的限制,单个 value 只支持 2^31 bytes 大小

l 考虑使用 MapState 来替代 ListState 或者 ValueState

因为RocksDB 的 map state 并不是将整个 map 作为 value 进行存储,而是将 map 中的一个条目作为键值对进行存储。

(3) 如何知道当前 RocksDB 的运行情况

l RocksDB 的日志可以观察到一些 compaction 信息,默认存储位置在 flink-io 目录下,需要登录到 taskmanager 里面才能找到。

l 考虑打开 RocksDB 的 native metrics

https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#rocksdb-native-metrics

4)配置了 state TTL,可能你的存储空间并没有减少

l 默认情况下,只有在下次读访问时才会触发清理那条过期数据如果那条数据之后不再访问,则也不会被清理

StateTtlconfig ttlConfig = StateTtlconfig

newBuilder(Time.days(7))

.cleanupFullSnapshot( )

// Full snapshot 时候会清理 snapshot 的内容

.build(o); 

StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder (Time.days(7))

// check 10 keys for every state acces

.cleanupIncrementally(10,false)

.build();

//Heap state backend 的持续清理

StateTtlConfig ttlConfig = stateTtlConfig

.newBuilder(Time.days(7))

.cleanupInRocksdbCompactFilter().build();

//RocksDB state backend 的持续清理

3. RawState(timer) 使用建议

(1)Timer state 太大怎么办

l 考虑存储到 RocksDB 中

state.backend.rocksdb.timer-service.factory:ROCKSDB

l Trade off

n 存储到 Heap 中,面临 OOM 风险,Checkpoin

的同步阶段耗时大

存储到 RocksDB 中,影响 timer 的读写性能

4.RocksDBState 使用建议

(1)不要创建过多的 state

l 每个 state 一个 column family ,独占 write buffer,过多的 state 会导致占据过多的 write buffer

l 根本上还是 RocksDB StateBackend 的 native 内存无法直接管理

 

四、一些使用 checkpoint 的使用建议

1. Checkpoint 间隔不要太短

虽然理论上 Flink 支持很短的 checkpoint 间隔,但是在实际生产中,过短的间隔对于底层分布式文件系统而言,会带来很大的压力。另一方面,由于检查点的语义,所以实际上 Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的 checkpoint,可能会影响整体的性能。当然,这个建议的出发点是底层分布式文件系统的压力考虑。

2.合理设置超时时间

默认的超时时间是 10min,如果 state 规模大,则需要合理配置。最坏情况是分布式地创建速度大于单点(job master 端)的删除速度,导致整体存储集群可用空间压力较大。建议当检查点频繁因为超时而失败时,增大超时时间。

3.FsStateBackend 可以考虑文件压缩

对于刷出去的文件可以考虑使用压缩来减少 Checkpoint 体积

ExecutionConfig executionConfig = new ExecutionConfig();

executionconfig.setUsesnapshotcompression(true) ;

目前只支持这一种压缩方式,当然压缩会面临一定的  CPU 开销,但这个 CPU 开销发生在异步阶段,对于整个作业的处理性能影响不大。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
SQL Java API
flink问题之state过期设置如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
438 0
|
7月前
|
SQL 消息中间件 分布式数据库
Flink问题之State 0点清除如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
140 0
|
2月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
79 5
|
2月前
|
存储 SQL 分布式计算
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
21 0
|
2月前
|
存储 消息中间件 大数据
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
70 0
|
2月前
|
存储 SQL 分布式计算
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
47 0
|
4月前
|
消息中间件 应用服务中间件 API
Flink四大基石——3.State
Flink四大基石——3.State
65 1
|
4月前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
51 1
|
7月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即"Top N"问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
195 1
|
7月前
|
分布式计算 资源调度 Hadoop
Hadoop学习笔记(HDP)-Part.18 安装Flink
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
254 2
Hadoop学习笔记(HDP)-Part.18 安装Flink