字节跳动 Flink 状态查询实践与优化

简介: 字节跳动基础架构工程师马越在 FFA 2021 的演讲。

摘要:本文整理自字节跳动基础架构工程师,Apache Flink Contributor 马越在 Flink Forward Asia 2021 平台建设专场的演讲。主要内容包括:

  1. 背景
  2. State Processor API 介绍
  3. StateMeta Snapshot 机制
  4. State as Database
  5. 使用 Flink Batch SQL 查询任务状态
  6. 未来展望

点击查看直播回放 & 演讲PDF

一、背景

众所周知,Flink 中的 State 保存了算子计算过程的中间结果。当任务出现异常时,可以通过查询任务快照中的 State 获取有效线索。

但目前对于 Flink SQL 任务来说,当我们想要查询作业 State 时,通常会因为无法获知 State 的定义方式和具体类型等信息,而导致查询 State 的成本过高。

为了解决这个问题,字节跳动流式计算团队在内部提出了 State Query on Flink SQL 的解决方案——用户通过写 SQL 的方式就可以简单地查询 State。本文将主要介绍字节跳动在 Flink 状态查询这方面所进行的相关工作。

二、State Processor API 介绍

1

提到状态查询,我们自然会联想到 Flink 在 1.9 版本提出的特性 -- State Processor API。使用 State Processor API,我们可以将作业产生的 Savepoint 转换成 DataSet,然后使用 DataSet API 完成对 State 的查询、修改和初始化等操作。

2

下面简单介绍一下如何使用 State Processor API 来完成 State 的查询

  • 首先创建 ExistingSavepoint 用来表示一个 Savepoint。初始化 ExistingSavepoint 时需要提供 Savepoint 路径和 StateBackend 等信息;
  • 然后实现 ReaderFunction 用于重新注册所需要查询的 State 以及定义处理 State 的方式。查询状态的过程中会遍历所有的 Key 并按照我们定义的方式去操作 State;
  • 最后,调用 Savepoint.readKeyedState 并传入算子的 uid 和 ReaderFunction,就可以完成 State的查询。

3

接下来为大家简述一下 State 查询背后的原理

在 Savepoint 目录中包含两种文件,一种是状态数据文件,比如上图中的 opA-1-state ,这个文件里面保存着算子 A 在第一个 SubTask 状态的明细数据;还有一种元数据文件,对应上图中的 _metadata,元数据文件中保存了每个算子和状态文件的映射关系。

当我们在进行状态查询的时候。首先在 Client 端会根据 Savepoint 路径去解析 metadata 文件。通过算子 ID,可以获取需要查询的状态所对应的文件的句柄。当状态查询真正执行时,负责读取状态的 Task 会创建一个新的 StateBackend,然后将状态文件中的数据恢复到 Statebackend 中。等到状态恢复完成之后就会遍历全部的 Key 并把对应的状态交给 ReaderFunction 处理。

4

有些同学可能会问,既然社区已经提供了查询 State 的功能,我们为什么还要去做同样的工作呢?主要是因为我们在使用 State Processor API 的过程中发现一些问题

  1. 每次查询 State 我们都需要独立开发一个 Flink Batch 任务,对用户来说具有一定的开发成本;
  2. 实现 ReaderFunction 的时候需要比较清晰地了解任务状态的定义方式,包括 State 的名称、类型以及 State Descriptor 等信息,对用户来说使用门槛高较高;
  3. 使用 State Processor API 时,只能查询单个算子状态,无法同时查询多个算子的状态;
  4. 无法直接查询任务状态的元信息,比如查询任务使用了哪些状态,或者查询某个状态的类型。

5

总体来说,我们的目标有两个,一是降低用户的使用成本;二是增强状态查询的功能。我们希望用户在查询 State 时能用最简单的方式;同时也不需要知道任何信息。

此外,我们还希望用户能同时查询多个算子的 State ,也可以直接查询作业使用了哪些 State,每个 State 的类型是什么。

因此,我们提出了 State Query on Flink SQL 的解决方案。简单来说是把 State 当成数据库一样,让用户通过写 SQL 的方式就可以很简单地查询 State。

6

在这个方案中,我们需要解决两个问题

  • 如何对用户屏蔽 State 的信息:参考 State Processor API 我们可以知道,查询 State 需要提供非常多的信息,比如 Savepoint 路径、 StateBacked 类型、算子 id 、State Descriptor 等等。通过 SQL 语句显然难以完整地表述这些复杂的信息,那么查询状态到底需要哪些内容,我们又如何对用户屏蔽 State 里复杂的细节呢?这是我们面对的第一个难点。
  • 如何用 SQL 表达 State:State 在 Flink 中的存储方式并不像 Database 一样,我们如何去用 SQL 来表达状态的查询过程呢?这是我们要解决的另一个难点。

三、StateMeta Snapshot 机制

7

首先我们来回答第一个问题,查询一个 State 需要哪些信息呢

可以参考上文中 State Processor API 的示例,当我们创建 ExistingSavepoint 和 ReaderFunction 的时候,我们需要提供的信息有 Savepoint 路径、Backend 类型、OperatorID、算子 key 的类型、State 名称以及 Serializer 等等,我们可以将这些统一称为状态的元信息。

对于 Flink SQL 任务来说,要清楚地了解这些信息,对用户来说门槛是非常高的。我们的想法是让用户只需要提供最简单的信息,即 Savepoint ID ,然后由 Flink 框架把其他的元信息都存在 Savepoint 中,这样就可以对用户屏蔽 State 那些复杂的细节,完成状态的查询。因此,我们引入了 StateMeta Snapshot 机制。

8

StateMeta Snapshot 简单来说就是把状态的元信息添加到 Savepoint Metadata 的过程,具体步骤如下:

  1. 首先在 State 注册的时候,Task 会把 operatorName\ID\KeySerializer\StateDescriptors 等元信息都保存在 Task 的内存中;
  1. 触发 Savepoint 时,Task 会在制作快照的同时,对状态的元信息也同样进行快照。快照完成之后将状态的元信息 (StateMeta) 和状态文件的句柄 (StateHandle) 一起上报给 JobManager;
  1. JobManager 在收到所有 Task 上报的 StateMeta 信息之后 ,将这些状态元信息进行合并,最后会把合并之后的状态元信息保存到 Savepoint 目录里名为 stateInfo 的文件中。

之后在状态查询时就只需解析 Savepoint 中的 stateInfo 文件,而不再需要用户通过代码去输入这些 State 的元信息。通过这样的方式可以很大程度地降低用户查询状态的成本。

四、State as Database

接下来我们来回答第二个问题,我们如何用 SQL 来表达 State。其实社区在设计 State Processor API 的时候就提出了一些解决思路,也就是 State As Database。

9

在传统的数据库中,通常用 Catalog、Database、Table 这个三个元素来表示一个 Table,其实我们也可以将用样的逻辑到映射到 Flink State 上。我们可以把 Flink 的 State 当作一种特殊的数据源,作业每次产生的 Savepoint 都当作一个独立 DB 。在这个 DB 中,我们将 State 元信息、State 的明细数据,都抽象成不同的 Table 暴露给用户,用户直接查询这些 Table 就可以获取任务的状态信息。

10

首先我们来看如何把 State 表示为 Table。我们都知道在 Flink 中,常用的 State 有两种类型,分别是 KeyedState 和 OperatorState

  • 对于 OperatorState 来说,它只有 Value 这一个属性,用来表示这个 State 具体的值。因此我们可以把 OperatorState 表示为只包含一个 Value 字段的表结构。
  • 对于 KeyedState 来说,每个 State 在不同的 Key 和 Namespace 下的值可能都不一样, 因此我们可以将 KeyedState 表示为一个包含 Key、Namespace、Value 这三个字段的表结构。

11

当我们抽象出了单个 State 之后,想要表示多个 State 就比较容易了。可以看到在上图的例子中,这个算子包含 3 个 State,分别是两个 KeyedState 和一个 OperatorState,我们只需要将这些 Table 简单的 union 起来,再通过 state_name 字段去区分不同的 State,就可以表示这个算子中所有的 State。

12

最后还有一个问题,我们如何知道一个任务到底用了哪些 State 或者这些 State 的具体类型呢

为了解决这个问题,我们定义了一种特殊表 -- StateMeta ,用来表示一个 Flink 任务中所有 State 的元信息。StateMeta 中包含一个任务中每个 State 的名称、State 所在的算子 ID 、算子名称 、Key 的类型和 Value 的类型等等,这样用户直接查询 StateMeta 这个表就能获取任务中所有状态的元信息。

五、使用 Flink Batch SQL 查询任务状态

13

以上就是状态查询方案的整体介绍。那我们到底如何去查询一个 State 呢,我们以一个 Word Count 任务为例来说明

首先,我们需要创建一个 Flink SQL 任务并启动。通过 web-ui 可以看到这个任务中包含三个算子,分别是 Source,Aggregate 还有 Sink。然后,我们可以触发 Savepoint,当 Savepoint 制作成功之后获取对应的 SavepointID。我们可以通过 SavepointID 去完成作业状态的查询。

14

假如我们现在对 Flink SQL 任务中状态的使用一无所知,那么首先我们需要查询的就是这个 Flink 任务中包含哪些 State 以及这些 State 的类型。我们可以从 StateMeta 表获取这些信息。如上图中场景一所示,通过查询 StateMeta 表,可以看到这个任务包含一个 ListState 和一个 ValueState,分别存在于 Source 算子和 Aggregate 算子中。

此外,有些对 Flink 比较了解的同学知道,KafkaSource 中的 State 是用于记录当前消费的 Offset 信息。如场景二所示,我们可以通过查询 Source 算子的状态,获取到任务中消费 Kafka Topic 的 Partition 和 Offset 信息。

还有一种比较常见的场景,比如下游的业务同学发现某个 key(比如 key_662)的结果异常。我们在定位问题的时候可以直接去查询作业中 aggregate 算子中的状态,同时去指定 key 等于 key_662 作为查询条件。如上图场景三所示,通过查询的结果可以看到,当 key 为 662 时对应的聚合结果是 11290。用户使用这样的方式就可以比较方便地验证状态是否正确。

六、未来展望

15

未来,我们计划进一步丰富 State 的功能,目前我们支持了使用 SQL 查询 State 的功能 ,其实社区还提供了 State 修改和初始化的能力。在一些场景下,这些能力也比较重要。比如,我们已知状态中的部分 key 计算错误,希望将状态中这部分的数据进行修正;或者任务逻辑发生变更以后和之前的状态不能完全兼容, 这个时候我们希望可以通过状态修改和初始化的能力去生成一个新的 Savepoint。同样,在使用方式上我们也希望用户能直接使用 SQL 中 insert 和 update 语法来完成状态的修改和初始化操作。

其次,我们会进一步加强 State 的可用性。我们使用 DAG 编辑的方案解决了作业拓扑发生变化时产生的状态不兼容问题,但是当 Flink SQL 任务修改字段时 State Serializer 可能会变化,同样导致状态无法兼容。针对这种情况我们设计了完整的 Flink SQL State Schema Evolution 方案,可以极大的增强 Flink SQL 任务发生变化之后状态的恢复能力,目前方案正在落地中。此外,我们还提供了完善的状态恢复事前检查能力,能够做到在任务上线之前就检查出状态是否兼容并告知用户,避免状态不兼容引起的作业启动失败对线上造成影响。

点击查看直播回放 & 演讲PDF


img

2022第四届 实时计算FLINK挑战赛

49万奖金等你来拿!

延续 “鼓励师计划”,赢取丰厚礼品!

点击进入赛事官网了解报名参赛

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

O1CN01tmtpiy1iazJYZdixL_!!6000000004430-2-tps-899-548.png"

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
1147 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
SQL 算法 调度
Flink批处理自适应执行计划优化
本文整理自阿里集团高级开发工程师孙夏在Flink Forward Asia 2024的分享,聚焦Flink自适应逻辑执行计划与Join算子优化。内容涵盖自适应批处理调度器、动态逻辑执行计划、自适应Broadcast Hash Join及Join倾斜优化等技术细节,并展望未来改进方向,如支持更多场景和智能优化策略。文章还介绍了Flink UI调整及性能优化措施,为批处理任务提供更高效、灵活的解决方案。
602 0
Flink批处理自适应执行计划优化
|
10月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
580 9
Flink在B站的大规模云原生实践
|
11月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
1118 10
Flink x Paimon 在抖音集团生活服务的落地实践
|
11月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1766 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
11月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
656 9
网易游戏 Flink 云原生实践
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
1329 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
存储 SQL Java
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
895 1
Flink CDC + Hologres高性能数据同步优化实践
|
SQL 存储 调度
基于 Flink 进行增量批计算的探索与实践
基于 Flink 进行增量批计算的探索与实践
351 1
基于 Flink 进行增量批计算的探索与实践
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
1748 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目

相关产品

  • 实时计算 Flink版
  • 下一篇
    开通oss服务