摘要:本文整理自阿里巴巴开发工程师 Apache Flink Committer 任庆盛,阿里巴巴技术专家 Apache Flink Contributor 罗根,在 Flink Forward Asia 2022 核心技术专场的分享。本篇内容主要分为三个部分:
- Flink Connector 社区动向
- Source 新功能
- SinkV2 API 与小文件合并
一、Flink Connector 社区动向
首先,介绍下 Flink Connector 在社区的新动向。随着 Flink 日益增长的生态规模,越来越多的开发者正在考虑将 Connector 贡献给 Flink 社区。
这对于旧 Flink 的管理模式,是非常大的挑战,也逐渐暴露出了许多问题。比如 Connector 和 Flink 版本的发布周期之前是相互绑定的,导致 Connector 的迭代速度非常的缓慢。
其次,Flink 的代码库随着 Flink 生态的日益扩大,而过于庞大。这对于 Connector 的维护者,以及 Flink 主仓库的维护者来讲,维护压力也越来越大的。
考虑到上述两点,旧的管理模式不利于生态的发展。近期Flink社区经过一系列讨论,整理并确定了新的 Connector 管理模式,其中包括新的代码管理模式。每个 Connector 将会拥有自己独立的仓库,代码将会和 Flink 仓库分离管理。
其次是新的版本管理模式,尽量保持与 Flink 的兼容性为原则,由 Connector 自行维护版本。
下面介绍一下新的代码管理模式。在新的管理模式之下,每个 Connector 使用自己独立的代码仓库,托管于 Apache Organization 之下,那大大降低了 Connector 之间,以及与仓库间的耦合程度。
在文档方面,每个 Connector 会在自己的仓库中,独立维护文档。Flink 仓库会通过脚本,将所有 Connector 的文档统一链接至Flink的官方文档。
在 Issue 管理方面,与 Flink 和其他 Apache 项目相同,Connector 将会继续使用 JAVA 来管理和追踪Issue。
目前,ElasticSearch Connector 已经从 Flink 的主仓库中移出,在独立的仓库下按照新的模式进行管理。大家也可以以此作为样例对 Connector 进行管理。这里有两点提示需要告知大家。
第一,关于代码归属权问题。虽然 Connector 有了自己的代码仓库,但代码依然属于 Apache 软件基金会。所以大家在贡献之前,需要确认代码使用的开源协议,以及相关的法务问题等。
第二,Connector 代码需要由 Apache Flink Committer 进行评审。无论是新的贡献,还是修改现有代码,都需要 Apache Flink Committer 评审之后合并代码。
基于新的管理流程,如何去贡献一个新的 Connector 呢?
- 第一步,我们要像贡献其他 Flink Feature 一样,先起草一份 FLIP。在 Flink Wiki 页面上,提交新的FLIP。
- 第二步,我们将新的 FLIP 提交到 Flink 的开发者社区,进行相关的讨论。收集贡献者,Committer 的相关意见。
- 第三步,社区投票。在广泛收集了大家的观点之后,我们可以在社区的邮件列表中,发起对这份 FLIP 的投票。在收到三个级以上的有效票数之后,表明这个 Connector 已经被社区接受。
- 第四步,联系一位 Committer,创建 Connector 自己独立的仓库。
- 第五步,在仓库准备好之后,我们就可以在仓库中提交 PR,然后找一位 Committer 为大家 review 代码。
第二个问题是,关于版本管理问题。Connector 不再需要按照Flink版本发布节奏,进行版本管理。
这样的好处是,Connector 可以自行进行更快速的版本迭代,但也容易造成版本混乱。为此,Flink 社区也对外部 Connector 的版本管理,进行了一定程度的规定。目前,Connector 的版本号主要分为两个部分。
在 Connector 的自身版本方面,我们可以看到自身版本,由三部分组成。分别是 Major 大版本、Minor 小版本、以及 Patch。为了给用户提供足够多的信息,了解与 Flink 主版本的兼容性。我们要求在版本号后面,加入所支持的Flink版本后缀。
如上图所示,Connector 3.1.2 版本支持 Flink 1.16。我们需要在版本号的最后,加上这个后缀。
在版本支持方面,社区要求 Connector 至少支持两个 Connector 大版本。相对较新的版本,需要持续获得新功能的支持。
在 Flink 的兼容性方面,我们要求至少要支持两个 Flink 的版本。相关细则,大家可以到社区的版本管理文档上,了解详细的 Connector 版本管理规则以及样例,相关网址如下:
https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
二、Source 新功能
2.1 水印对齐
介绍完一些流程性的问题后,下面分享一些技术细节。首先,介绍下 Source 在 Flink 1.15 和 Flink 1.16 中的新功能。
在 1.15 版本为 Source 引入的水印对齐机制 Watermark alignment。
水印对齐的背景是,Watermark 作为处理事件时间的核心,很多算子依赖 Watermark 触发计算和状态清理。如幻灯片中的例子所示,假设在作业中存在一个窗口聚合逻辑,聚合算子从上游 Source 的两个并发消费数据。因此,聚合算子 Watermark 会取两个输入的 Watermark 最小值。
如果其中一个 Source 读取速度过快,与之对应的 Watermark 的前进速度也会过快。超前于较慢一侧输入的数据,就会被聚合算子存储在状态之中。由此可见,两个 Source 并发的读取速度差距越大,需要使用的状态也就越大。这对存储空间,以及状态恢复都会产生很大的挑战。
与刚刚的例子类似,不同 Source 之间,如 Kafka 和 FileSystem Source,它们的读取速度有差异,对下游的算子状态也会产生影响。比如 PPT 中所述的窗口 join 逻辑。为了解决这个问题,Flink 在 1.15 版本为 Source 引入了水印对齐机制。
如果某个并发或者某个 Source 的读取速度过快,水印对齐机制就会暂停这个 Source 的数据读取,等待较慢一侧追赶上来之后,再恢复读取。
如上图所示,展示了用户如何使用水印对齐机制。用户需要提供三个参数。
第一,水印对齐组。处于同一个水印对齐组的一个或多个 Source,将按照相同配置,进行相互对齐。
第二,最大允许的水印偏移。当某一并发的 Watermark 超过了最大允许的偏移量,将暂停读取。
第三,水印检查间隔。Source 会按照指定的间隔,定期检查具体情况。
接下来,我们再分析一下水印对齐的实现机制。我们的 Source 由两部分组成,一部分是,运行在 JobManager 上的 Source Coordinator。另一部分是,运行在 TaskManager 上的 Source Operator。
其中,Source Coordinator 属于协调者的角色。Coordinator 会周期性的按照 Operator 汇报的水印进度,检查当前允许的最大水位,并下发到各个 Operator 上。
如果某个 Source Operator 发现自己的水印超过了最大允许量,就会暂停读取,直到一个对齐检查周期。
为了实现不同 Source 间的水印对齐,我们在 JobManager 上引入了 CoordinatorStore,让 Source Coordinator 间互相交换信息。在水印对齐机制下,各个 Source Coordinator 通过 CoordinatorStore 协调最大的允许水位。
这里需要提醒大家的是,Flink 1.15 和 Flink 1.16 版本暂时不支持 Split 级别的水印对齐。目前,该级别已经排入 Flink 1.17 的开发计划之中。
目前,为了取得更好的水印对齐效果,建议大家在多个 Source 之间,或在并发数与 Split 数一致的 Source 上使用。比如 Kafka+FileSystem Source,Kafka Source 并发与 Partition 数一致等等。
2.2 维表与缓存
下面介绍下维表及其缓存机制的改进。在 Flink 1.16 上,我们提出并实现了 FLIP 221 为维表引入了新的接口,并设计了一个通用的维表缓存。
在接口方面,我们重新整理了维表使用接口,使抽象更加清晰。此外,新引入的维表缓存机制,为所有的维表提供了一致的缓存功能。开发者无需重复实现缓存相关的逻辑。
首先,介绍新的维表接口。在 Flink 1.16 版本之前,开发者需要基于 TableFunction,进行维表开发。但这个非常基础的接口,对于开发者的理解成本和实现难度较大。
因此,我们重新定义了 LookupFunction 接口。我们更清晰的定义了,如何对给定数据中的 Key,从外部系统中查询维表数据。与之相对应的,我们引入了 LookupFunctionProvider,在 Table/SQL API 中构建 LookupFunction。
在通用维表缓存机制方面,考虑到不同用户场景和外部系统维表的规模,我们定义了两种不同的缓存模式:
- 第一,部分缓存模式 Partial Caching。部分缓存是指维表会根据主流数据,按需在外部系统中查询数据,并将查询结果存储到缓存中。在这种模式下,维表可以自行管理缓存及其清理策略。部分缓存模式适用于规模较大的维表,无法一次性拉取并存储在内存当中。
- 第二,为了简化开发者的开发工作,我们提供了一种默认缓存实现 DefaultLookupCache。默认缓存以 Guava Cache 作为底层支撑,为开发者和用户提供了,基于缓存大小、写入时间、读取时间的清理机制。
当开发者想要使用部分缓存模式时,可使用预先定义好的 PartialCachingLookupProvider,提供定义外部系统查找方法的 LookupFunction 和缓存的实现,即可快速的构建起一个基于部分缓存模式的维表。
第二种缓存模式为,全部缓存模式 Full Caching。在全部缓存模式下,框架会从外部系统中,拉取全部数据,并使用户指定的触发器,进行全量数据的重载。从逻辑上看,外部系统中拉取全部数据的过程,实际上是对外部系统的一种扫描。
因此,我们也为开发者提供了 Scan 的复用能力。可以复用 Source 的实现,对维表进行全量数据的拉取。全部缓存模式适用于规模相对较小,可以将全部数据存储至内存当中的维表。
为了降低开发难度,我们也提供了两种常用的重载触发器。一个是,按照指定时间,周期性重载的触发器。另一个是,根据每天中的指定时间,进行重载的触发器。
我们预先定义好 FullCachingLookupProvider,开发者只需要提供需要复用的 Scan 能力、ScanRuntimeProvider、以及用户指定的全量数据重载触发器,即可快速使用全部缓存模式。
除了上述两个新功能之外,在 Flink 1.17 中,我们有哪些新功能值得期待呢?
- 第一,FLIP-217。我们在水印对齐部分中提到的 Split 级别的水印对齐。
- 第二,FLIP-208。我们可以根据数据内容,确定对有界流的结束位置。类似于旧 Kafka 里支持的,根据Kafka消息内容,将整个流进入停止状态。
- 第三,FLIP-238。它是基于新的 FLIP 27 Source API,所实现的数据生成器,以及在通用的 Source API 上实现的限流功能。
三、SinkV2 API 与小文件合并
3.1 SinkV2 介绍
接下来,介绍 SinkV2 API 及其相关的新动向。首先,我们来回顾一下 SinkV1。SinkV1 自 Flink1.12 版本引入,并在 Flink 1.14 版本进行扩展。SinkV1 在设计之初就是为了提供一套统一的,对 Sink 开发者友好的接口。
无论用户用的是 SQL,还是 DataStream API,无论作业是流作业还是批作业,基于 SinkV1 的 Sink 都可以通用。这些差异对 Sink 开发者也是透明的,使 Sink 开发者可以更专注于写出数据的逻辑。
与此同时,SinkV1 基于两阶段提交协议,实现了 Exactly Once 一致性语义。一个简单的 Sink,只需要 Sink Writer 就可以写出数据。对于两阶段提交的 Sink,Sink Writer 在写出数据的同时,会生成相应的 Commitable,即两阶段提交协议中需要提交的信息,Sink 下游会有 Committer 节点接收这些信息。
以流作业为例,Committer 在收到这些 Commitable 之后,完成当前 Checkpoint,所有数据确认持久化,再对这些 Commitable 进行提交。只有 Committer 提交之后,这些数据才对下游可见。而 Committer 的提交操作是幂等的,保证了这些持久化的 Commitable,在作业 failover 之后可以被重复提交,而不会出现重复的数据。
除此以外,SinkV1 还有 Global Committer,可以支持部分场景下的全局提交需求。
虽然 SinkV1 简化了 Sink 的开发,降低了 Sink 支持 Exactly Once 的开发复杂度。但随着 SinkV1 的各类开发,我们逐渐发现了一些问题。比如 SinkV1 无法支持 FileSink 中需要的小文件合并需求。
对于 Sink 来说,每一个 Commitable 对应着文件系统上的一个文件。随着 Sink 的并发增加,输出文件数量也会随之增加。出于效率和正确性的考虑,我们不能让多个 Commitable,映射到同一个文件的多个分片上。这可能导致大规模的作业,会产出大量的小文件。
现在的数据库系统,通常使用列存格式。如果文件切的过细,会导致读取性能下降。因此,需要将这些小文件,进行跨并发、跨 Checkpoint 周期的合并。但 SinkV1 是固定的拓扑结构,无法满足这样的需求。所以反映出 SinkV1 灵活度不足的弊端。
为了解决这个问题,也为了灵活的支持其他场景,社区提出了 FLIP191,扩展 Sink 接口,以支持小文件合并计划,并讨论了三种方案。经过长时间的讨论和多方面的考虑,社区认为基于 SinkV1 的小幅度改动,无法满足当前的需求。
最终,社区决定设计一套能够支持 SinkV1 全部功能迁移,成本很低,但又能灵活支持拓扑扩展的新 Sink API 接口,即 SinkV2。
SinkV2 根据 Sink 的需求,切分出了三个基本的接口。
首先,如果 Sink Writer 不需要保存状态,就可以使用最简单的 Sink 接口。Sink 接口只需要提供一个 Sink Writer 就可以。
如果 Sink Writer 需要保存状态,就需要使用 StatefulSink 接口。与 Sink 的区别在于,StatefulSink 使用的 StatefulSinkWriter 需要保存状态。所以它需要实现一个 snapshotState 方法。同时,StatefulSink 需要实现一个从状态重建 Writer 的方法。单独使用 Sink 或者 StatefulSink 接口时,Sink 都只支持 At Least Once 语义。
如果 Sink 需要支持 Exactly Once 语义,就需要追加实现 Two Phase Committing Sink 接口。Two Phase Committing Sink 与此前的 SinkV1 基本一致,由 Writer 和 Committer 配合使用,两阶段提交协议保证Exactly Once语义。
在此基础上,为了满足多样化的 Sink 需求。社区引入了三个新的拓扑扩展接口,能够分别支持 Sink 在三个不同阶段,对拓扑进行自定义扩展。
除此之外,社区还提供了 AsyncSink。开发者可以基于它,更简单的开发一个异步 Sink。但是 AsyncSink 只支持 At Least Once 语义,无法和 Two Phase Committing Sink 一起使用。
接下来,我们看一下运行时 SinkV2 的拓扑结构。如果只用 Sink,StatefulSink 或者 AsyncSink,运行时将只有一个 Sink Writer 节点。使用 Two Phase Committing Sink,作业拓扑将和 SinkV1 基本一致,也有 Writer 和 Committer,但没有 Global Committer。
SinkV2 新增的拓扑扩展接口,将在这个基础上对拓扑进行扩展。WithPreWriteTopology 可以在 Sink Writer 之前,进行拓扑扩展。比如进行一些数据的预处理。
WithPreCommitTopology 可以在 Sink Writer 和 Committer 之间,进行同步扩展。比如 FileSink 的小文件合并功能就是在这个阶段实现的。
WithPostCommitTopology 则是在 Committer 之后进行扩展。最典型的应用之一就是,SinkV1 提供的 Global Committer。这个阶段的拓扑通常需要和 Committer 一样,保证自己的幂等性。
在这几个接口中,WithPreWriteTopology 只要有 Sink Writer 就可以使用。因此,它可以在任何 Sink 上使用。另外两个接口,只能配合 TwoPhaseCommittingSink 一起使用。
实现了这几个扩展接口的 Sink,在 Flink 编译对应的作业时,会为没有设置并发度的算子,设置与上游相同的并发度。并且为所有算子设置一个,和这个 Sink 的 UID 相关的一个新的拼接的 UID。避免当一个作业使用多个 Sink 时,扩展的算子的 UID 发生冲突。
由于扩展的拓扑需要对 Committable 进行处理,SinkV1 中直接传递 Committable 的方式不能满足需求,SinkV2 对 Committable 进行了消息封装。
首先,在 Sink Writer 准备好要输出的数据,统计要发送的 Committable,并生成一个 CommittableSummary。其中,包含了接下来要发送的这批数据数量,所属的 Checkpoint ID、Writer ID 以及总的 Writer 数量信息。
然后,再将这些 Committable 封装成 CommittableWithLineage 发送下去。
Committable 会接收 CommittableSummary,并根据其中的信息,收集所有的 CommittableWithLineage 后,一起提交。
请注意,为了保证 Committer 的行为正确,PreCommitTopology 处理数据后,输出 Committable 时,需要和这个规则保持一致。
接下来,我们讲一讲异步 AsyncSink。异步 AsyncSink 的执行过程通常存在共性,而 Sink 开发者更需要关注的是,生成请求和实际提交请求部分。
为了简化 Sink 的开发和维护,尽可能让所有的异步 Sink 都能共享到执行层面的优化。社区在 Flink-171中,提出了 AsyncSink,并在 1.15 版本中发布。
AsyncSink 的基类是 AsyncSinkBase,开发者需要在构造函数中,传入一个 ElementConverter,用来将输入的数据转化为写请求。由于输入的数据类型通常和 Sink 的实现无关,ElementConverter 通常由用户提供。
除此之外,在构造方法中,还需要传入一些异步执行需要的异步参数。AsyncSink 使用的 Writer 是AsyncSinkWriter,实现了数据处理和触发异步提交的功能。
开发者需要实现其中的两个方法,一个是 submitRequestEntries,这个方法接收 ElementConverter 输出的写请求,并异步执行。这个方法的实现,必须是一个非阻塞的异步实现。
另一个需要实现的方法是 getSizelnBytes,这个方法用来评估写请求的大小,用来触发基于写请求大小来触发的 flush。
这里再强调一下,AsyncSink 只能支持 At Least Once,所以不能和 TwoPhaseCommittingSink 一起使用。
接下来,讲一讲关于 SinkV2 的开发和使用。SinkV2 继承了 SinkV1 的大部分设计和思想。因此,开发基于 SinkV2 的 Sink,也和基于 SinkV1 一样简单。新开发一个 Sink,只需要按照需求,选择适当的接口,并组合需要的拓扑扩展接口即可。
现有的基于 SinkV1 的 Sink,要迁移到 SinkV2 也非常简单。和 Sink 开发类似,迁移时需要选择适当的接口。因为接口中所设计的方法名基本和 SinkV1 是一致的,这里只需要简单修改即可。
需要注意的是,SinkV2 中并没有 GlobalCommitter,这类 Sink 在迁移的时候需要使用 WithPostCommitTopology。这里可以参考 SinkV1 Adapter。
在使用上,由于 SinkV2 的调用方法和 SinkV1 是一样。它们都是在 DataStream上调用 SinkTo 方法。因此 Sink 迁移之后,如果 Sink 本身的方法没有修改,用户就不需要修改代码,只需要重新编译即可。
3.2 FileSink 解决小文件问题
接下来,介绍下 FileSink 是如何基于 SinkV2 中的 PreCommittingTopology,来实现小文件合并的。
FileSink 在 PreCommittingTopology 中新增了两个节点,CompactCoordinator
和 Compactor。
Sink Writer 产出的 Committable,会输入给 CompactCoordinator。CompactCoordinator 为单点执行,会将 Committable 不区分来自于哪一个 Writer 的并发,不区分属于哪一个 Checkpoint,统一收集起来。
然后,根据用户配置的核定策略,比如达到一定的文件大小,或者达到一定时间之后,将涉及到的 Committable,组合成一个 CompactRequest 合并请求。将合并请求发送给下游的 Compactor。
Compactor 在收到请求之后,它的行为和 Committer 类似。先将 CompactRequest 保存起来,等当前 Checkpoint 完成后,再将保存的请求中对应的文件,进行合并。最终合成一个文件,并输出一个 Committable。
在输出时,Compactor 的行为则和 Sink Writer 相似,都是在 Checkpoint 前,进行输出。首先输出 CommittableSummary,然后输出 Committable 的封装消息。
经过这部分处理,FileSink 就从 Writer 和 Committer 的两阶段提交,变成了 Writer 和 Compactor,以及 Compactor 和 Committer 的两次两阶段提交。保证 Exactly Once 语义的前提下,实现跨并发、跨 Checkpoint 的文件合并,避免小文件碎片的问题。
这里再简单介绍一下,如何使用 FileSink 的小文件合并功能。启用小文件合并很简单,只要在构建 FileSink 时,调用 enableCompact 即可。
在调用时,需要提供两个东西。一个是 File Compact strategy,也就是如何将多个 Commit 组合成一个CompactRequest。这里通常会基于文件的大小,或需要合并的 Checkpoint 周期,进行合并。
另一个需要提供的是 File Compactor,就是如何执行这些文件的合并。目前可以直接使用的 File Compactor 有两类。
一个是 ConcatFileCompactor,可以直接拼接文件,适合明文的文件格式。另一个是 RecordWiseFileCompactor,将文件中的数据读取出来,再统一写入一个新的文件中,适合非明文无法直接处理的文件格式。在创建 RecordWiseFileCompactor 时,需要提供对应的读取数据的 reader。写出时,会使自动使用 Writer 的格式。
特别提醒一点,如果作业启用过小文件合并,后续在关闭时,不可以直接删除 enableCompact,而是需要显示调用 disable Compact。因为 Compact 中保存了 CompactRequest,即需要合并的文件的 Committable。这部分的状态管理比较复杂,如果直接把这些节点去掉,这些 Committable 就没有人接管,导致文件丢失,请务必注意!
3.3 后续规划
SinkV2 已经在 Flink 1.15 版本发布,目前位于 Flink-core 模块中的 Sink2 包中。随着 SinkV2 的发布,SinkV1 目前已被标为 deprecated。
在目前的规划中,SinkV1 在 Flink 2.0 之前,并不会移除。但未来 Sink 相关的新功能都会基于 SinkV2 开发。因此,我们建议新的Sink都基于 SinkV2 API 进行开发。与此同时,基于 SinkV1 的 Sink,建议尽快迁移到 SinkV2 上,SinkV2 也将继续完善。
无论是 SinkV1,还是 SinkV2,都不支持 Writer 和 Committer 的并发进行配置。SinkV2 引入了扩展拓扑的接口,使并发度设置的需求,变得更加迫切。因此,SinkV2 将首先进行扩展支持对并发度进行配置。
目前,SinkV2 的两阶段提交消息机制比较复杂,需要特别注意输出规范,使用起来会比较麻烦。因此,SinkV2 也会继续优化两阶段提交消息机制,为其中的算子提供工具类,基于更简单的输出、更规范的 Committable 消息。此外,我们计划基于 SinkV2 开发,较为通用的 Committable 合并机制。
活动推荐
时间:7 月 29 日下午 13:00-18:30
地点:北京朝阳区望京凯悦酒店
线下报名:http://hdxu.cn/DO7OG
扫下方图片直达线上直播间:
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc