Flink 1.13,State Backend 优化及生产实践分享

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink 1.13 版本,State Backend 模块就内存管控、访问延迟勘察等方面带来了相关优化与新特性。

本文由社区志愿者佳伟整理,唐云(茶干) 在 5 月 22 日北京站 Flink Meetup 分享的 《State Backend Flink – 1.13 优化及生产实践分享》。内容包括:

  1. 鸟瞰 Flink 1.13 state-backend 变化
  2. RocksDB state-backend 内存管控优化
  3. Flink state-backend 模块发展规划

GitHub 地址
https://github.com/apache/flink
欢迎大家给 Flink 点赞送 star~

一、鸟瞰 Flink 1.13 state-backend 变化

1. State 访问的性能监控

首先,Flink 1.13 中引入了 State 访问的性能监控,即 latency trackig state。

img

通过对每次访问前后的 System#nanoTime 求差,得到 state 访问延迟值(latency)。此功能不局限于 State Backend 的类型,自定义实现的 State Backend 也可以复用此功能。State 访问的性能监控会产生一定的性能影响,所以,默认每 100 次做一次取样 (sample)。上图即监控结果展示。

State 访问的性能监控开启后,对不同的 State Backend 性能损失影响不同:

  • 对于 RocksDB State Backend, 性能损失大概在 1% 左右;
  • 而对于 Heap State Backend, 性能损失最多可达 10%。

img

上图所示是三个相关的配置项,默认情况下此功能是关闭的,需通过指定参数 state.backend.latency-track.keyed-state-enabled=true 来手动开启。

2. 统一的 Savepoint 格式

img

Flink 1.13 之后,Savepoint 支持切换 State Backend,极大提升了系统应用性。创建 Savepoint 后,可修改作业拓扑中 State Backend 的类型,如从 RocksDB 切换成 Heap,或从 Heap 切换成 RocksDB,但切换仅限于 Savepoint。Checkpoint 所存储的文件格式与 State Backend 类型相关,而非通用格式,Checkpoint 目前暂不支持该功能。

3. 更清晰的 API

还有一个比较重要的改动,就是关于概念上的清晰化。Flink 1.13 中将状态和检查点两者区分开来。

在 Flink 中,State Backend 有两个功能:

  • 提供状态的访问、查询;
  • 如果开启了 Checkpoint,会周期向远程的 Durable storage 上传数据和返回元数据 (meta) 给 Job Manager (以下简称 JM)。

在之前的 Flink 版本中,以上两个功能是混在一起的,即把状态存储和检查点的创建概念笼统得混在一起,导致初学者对此部分感觉很混乱,很难理解。

img

目前,State Backend 的种类如上图所示,由于概念的混乱,导致之前的写法中,RocksDB State Backend 中是可以嵌入 Memory State Backend 或 Heap State Backend 的。实际上,RocksDB 里面嵌入的 State Backend,描述的是其内部 Checkpoint 数据传输方向。

对于 Memory State Backend,在原始构建下,未指定任何的 filepath。且在不开启 HA 的模式下,会将所有 Checkpoint 数据返回给 JM。当 Memory State Backend 指定 filepath,满足上传条件时,Checkpoint 数据直接上传到指定 filepath 下,数据内容不会返回给 JM。

对于 Fs State Backend,数据会直接上传到所定义的 filepath 下。

当然,大家线上用的最多的还是 RocksDB State Backend 搭配上一个远程 fs 地址,旧的写法对于使用 Flink 的用户来说,容易造成状态和检查点理解混乱。

img

Flink 1.13 中两个概念被拆开:

  1. 其中,State Backend 的概念变窄,只描述状态访问和存储;
  2. 另外一个概念是 Checkpoint storage,描述的是 Checkpoint 行为,如 Checkpoint 数据是发回给 JM 内存还是上传到远程。所以,相对应的配置项也被拆开 。

当前不仅需要指定 State Backend ,还需要指定 Checkpoint Storage。以下就是新老接口的对应关系:

image-20210628232801623

当然,虽然旧接口目前仍然保存,但还是推荐大家使用新接口,向新方式迁移,从概念上也更清晰一些。

4. RocksDB partitioned Index & filter

下面要提的就是关于RocksDB的优化:

img

Flink 1.13 中对 RocksDB 增加了分区索引功能。如上图所示,RocksDB Block Cache 中存储的数据包含三部分:

  1. Data Block (真实数据)
  2. Index Block (每条数据的索引)
  3. Filter Block (对文件的 Bloom Filter)

可以通过方块大小明显看出块大小,Index 和 Filter 是明显大于 Data 的。以 256M SSD 文件为例,Index Block 大概是 0.5M,Filter Block 大概是 5M,Data Block ze 则默认是 4KB。当 Cache Block 是几百 MB 的时候,如果文件数特别多,Index 和 Filter 不断的替出换入,性能会非常差,尤其是在默认开启了内存管控后。比较明显的现象是,IO 特别频繁,性能始终上不去。

img

Flink 1.13 中,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的 partitioned Index 做了多级索引。也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,相对于内存比较小的场景中,性能提升 10 倍左右。所以,如果在内存管控下 Rocksdb 性能不如预期的话,这也能成为一个性能优化点。

目前共有两个参数可控制这个功能:

  1. state.backend.rocksdb.memory.partitioned-index-filters:true (默认 false)
  2. state.backend.rocksdb.block.metadata-blocksize (多级索引内存配置)

5. 默认行为变化

img

Flink 1.13 中,默认行为发生如上图所示的变化。

  • 不再支持 state.backend.async 配置项,所有的 Checkpoint 均是异步的 (同步 Checkpoint 场景很少,已去除);
  • state.backend.rocksdb.checkpoint.transfer.thread.num 默认值增大到 4 RocksDB 增量 Checkpoint 时,4 个线程多线程上传文件 RocksDB从增量 Checkpoint 恢复数据时,采用 4 个线程多线程下载。

当然,性能提升的同时,对 HDFS 底层压力更大些,如果升级后 HDFS 不稳定,可考虑是否与此处相关。

二、RocksDB state-backend 内存管控优化

Flink 1.10 开始做 state-backend 内存优化,在之后的每个版本中都有相关改进。

img

对 RocksDB State Backend 做内存管控的最基本原因在于 Flink state 与 RocksDB 的 Column Family (独立内存) 一一对应。

在 Flink 1.10 之前,如果声明两个 state,会各自享用自己的 Write Buffer 和 Cache 内存,Flink 并没有对一个 operator 中的 state 数量限制,理论上用户可以设置几千个、几万个 state,可能导致容器内存撑爆。另外,Flink 在 slot-sharing 机制下,一个 slot 内可以存在多个包含 keyed state 的 operator,也很难保证 state 个数不超。

img

多个 RocksDB 会有多个 Write Buffer Manager 。如上图所示,以单个 Write Buffer Manager 为例,它将自己的内存 reserve 到 Block Cache 中,根据自己的内存管控逻辑来实现记账,Block Cache 内有 LRU Handle,超出预算时,会被踢出。

image-20210628232903342

上图提到的 arena block ,是 Write Buffer 最小内存分配单元,默认是 Write buffer 默认配置的 1/8,内存默认为 8MB。但在极端情况下,磁盘上会出现小文件过多的现象,导致性能非常差。如当整体内存分配过小时,Write Buffer 所管控的内存数量也就会比较少,刚开始申请内存时,默认申请 8MB 内存,当已用内存达到总内存的 7/8 时,会被置为 Immutable (置为不可变),之后这部分数据被替出到磁盘上。

如果单个 arena block 内存占比过大,可能会出现临界 arena block 只写了几 KB,但触发了 Write Buffer 的内存行为管控,将 arena block 置为了 Immutable,之后的数据就会被刷出去,产生小文件,导致性能非常差。对于 LSM DB 来说,提前 flush,对读放大性能产生很大影响,Write Buffer 无法缓存更多读请求。

我们引入对 arena block 大小有强校验,当 arena block 大小不合适时,会打印 Warning 级别日志,认为当前需要对 arena block 大小作出相应调整。即需要降低 arena block 大小,从而解决数据提前被 flush 的问题,进而提升性能。

img

RocksDB Block Cache 为了提高并发性能,将 arena block 分成了若干个分片 (shards)。实质上是 Write Buffer Manager 在做 reserve 时,将 arena block 拆成了若干个 dummy entry,实际上只做了记账,会占据 block cache 的逻辑容量。目前 Flink 使用的 RocksDB 版本中,shards 默认是 1MB,可能会有 shards 的数据超过预算的风险。后来的 RocksDB 高版本中,将 1MB 调成了 256KB 来解决这个风险。由于 Flink 1.13 中没有对 RocksDB 版本升级,所以这个问题依然存在。此外,Flink 1.13 中,没有将 RocksDB Block Cache 内存管控设置成严格模式 (Strict Mode)。

image-20210628232922494

目前社区用的 RocksDB 的版本是 5.17.2,与 RocksDB 社区最新的 6.17+ 版本,相差大概一两千个 commit。社区在尝试升级 RocksDB 版本时,发现高版本有一些性能回退,即使尽力解决,也只是解决了其中一部分,在部分访问接口下,还是有大约不到 10% 的性能下降。所以,Flink 1.13 决定暂不升级 RocksDB 版本,社区预计会在 Flink 1.14 中做相应升级,引入 RocksDB 一些新的 future,借此弥补目前已知的 10% 性能回退的 Gap。

image-20210628232949884

综上各种问题,RocksDB 内存管控不完善,加上 Writer Buffer 对 Data Block 不严格的管控,在理论上还是存在一定小几率内存超用的。但就目前来看,整体还是比较稳定,超用的部分不会太多。如果想手动多分一部分内存给 RocksDB 来防止超用,预防在云原生的环境因 OOM 被 K8S kill,可手动将 JVM OverHead 内存调大,如上图所示。

之所以不调大 Task Off-Heap,是由于目前 Task Off-Heap 是和 Direct Memeory 混在一起的,即使调大整体,也并不一定会分给 RocksDB 来做 Buffer,所以我们推荐通过调整 JVM OverHead 来解决内存超用的问题。同理,如果 Flink 中用到其他相关库,遇到相似问题,也可以尝试将 JVM OverHead 调大来解决。如果想查明内存泄漏原因,也可以结合相应 jemalloc + jeprof 等分析工具排查解决。

三、 Flink state-backend 模块发展规划

以下为 state-backend 模块在 Flink1.14、1.15 中的发展规划:

img

要说明的是,目前只有 RocksDB 支持增量 Checkpoint。

img

对于 Changelog,在 Apache Kafka 和 Apache Pulsar 中都有这个概念。Changelog 的引入,是 Flink 作为流式计算系统,对传统消息中间件的借鉴。即在数据上传的同时,做一个 proxy,将数据定期写到外部的 log 里,每次做 Checkpoint 时不需要等数据上传,进而使 Checkpoint 的时间更加可控。

Flink 1.13 已经实现了 proxy 代理层,实际的逻辑层还没有实现,在 Flink 1.14 中会做具体实现,包括相关 log 清理逻辑。希望在 Flink 1.14 中对状态和检查点性能有更好的提升,尤其是目前二阶段提交依赖于 Checkpoint commit,Changelog State Backend 的引入,预计在 Flink 1.14 可以尽快解决相关痛点。

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99元试用实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包3个月及以上还有85折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
752 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
9月前
|
SQL 算法 调度
Flink批处理自适应执行计划优化
本文整理自阿里集团高级开发工程师孙夏在Flink Forward Asia 2024的分享,聚焦Flink自适应逻辑执行计划与Join算子优化。内容涵盖自适应批处理调度器、动态逻辑执行计划、自适应Broadcast Hash Join及Join倾斜优化等技术细节,并展望未来改进方向,如支持更多场景和智能优化策略。文章还介绍了Flink UI调整及性能优化措施,为批处理任务提供更高效、灵活的解决方案。
358 0
Flink批处理自适应执行计划优化
|
6月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
347 9
Flink在B站的大规模云原生实践
|
7月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
650 10
Flink x Paimon 在抖音集团生活服务的落地实践
|
7月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1199 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
7月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
402 9
网易游戏 Flink 云原生实践
|
9月前
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
979 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
9月前
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
678 1
Flink CDC + Hologres高性能数据同步优化实践
|
9月前
|
存储 运维 BI
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~
|
4月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
482 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄

相关产品

  • 实时计算 Flink版