Flink DataStream API 批处理能力演进之路

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文由阿里云 Flink 团队郭伟杰老师撰写,旨在向 Flink Batch 社区用户介绍 Flink DataStream API 批处理能力的演进之路。

摘要:本文由阿里云 Flink 团队郭伟杰老师撰写,旨在向 Flink Batch 社区用户介绍 Flink DataStream API 批处理能力的演进之路。内容主要分为以下三个部分:

  1. 批处理语义和性能优化
  2. Batch API 功能增强
  3. 总结

最近在和一个朋友闲聊时,他问了一个很有意思的问题:Flink 是如何在流处理引擎上支持批处理能力的?

鉴于 Flink 已经成为了流处理领域的事实标准,可能很多人都不知道,Flink 在诞生的第一天起就是支持批处理的。DataSet API 也是从那时起就被引入的,它被用来支持对有界数据的批处理操作。随着 Flink 社区逐步意识到基于 Pipeline 的架构非常适合流处理,因此发展出了 DataStream API[1],它是为无界的流式应用开发的,引入了状态,事件时间和窗口等特殊概念。

但随着对两套 API 本质的深入思考,Flink 社区逐渐发现:DataStream API 其实完全可以成为 DataSet API 的超集。

  • 概念上:有界数据集只是无限数据流的一种特例。
  • 语义上:DataStream API 覆盖了 DataSet API 的大部分,同时还有针对实时流计算的扩展,只有少数和分区有关的语义暂时没有支持。具体的差异见下图:

同时维护两套 API 也对社区造成了很大的困扰,并且用户开发作业前必须提前在两种 API 中作出选择。对于用户来说,离线和实时作业具有相同的处理逻辑是很常见的。如果只编写一次代码,就能达到分别开发流和批两个作业的相同效果,将会带来极大的便利。鉴于以上诸多原因,Flink 最终走上了基于 DataStream 的流批一体的发展道路。也正是如此,Flink 社区早在 1.12 版本就开始逐步弃用 DataSet API,将会在 Flink 2.0 中完全移除 DataSet 相关代码。同时,不断提升 DataStream 上的批处理能力,以 DataStream 为核心打造流批一体的 API。

流批一体是一个相对宽泛的概念,它包含 API,调度,Shuffle,容错等多个维度,本文主要关注于 API 及其底层算子执行上 DataStream 对批处理所做的工作,其他细节可以参考文章《Flink 执行引擎:流批一体的融合之路》[2]。下面我们将沿着批处理语义和性能优化以及 Batch API 功能增强两个大的方向回顾 Flink DataStream API 批处理能力演进之路。

1. 批处理语义和性能优化

DataStream API 虽然理论上可以覆盖绝大多数 DataSet API 上的语义和操作,但在一些细微之处还是存在一些差异。下面我们从几个方面详细介绍一些 Flink 社区在这方面所做的努力。

1.1 输出语义

为了最大化数据的实时性,DataStream 上算子的输出是增量式的。例如:KeyedStream.reduce,它会在每次到来一条新的数据时更新内部维护的状态,并向下游发送当前最新的聚合值。用数据库的术语来说,它产生了一个 Upsert 流作为输出: 如果一个键有 10 个输入元素,那么也会得到 10 条输出记录。

而对于实时性往往没有这么强要求的批作业来说,这些中间的增量输出会极大地增加下游算子的计算负担。由于批作业的算子不需要感知数据的 Changelog, 其更期望的是一种 All-or-Nothing 式的输出语义,即仅仅在每个 Key 最后一条数据到来后,才向下游发送数据。因此,我们需要在批模式下对一些 API(例如:KeyedStream#reduce, sum,min,max,minBy,maxBy) 的行为做出改变,使其仅在输入结束时输出最终结果。

下表描述了 Sum 操作在流和批两种模式下的输入输出情况:

(假设它们具有相同的 Key,4 为该 Key 的最后一条数据)

输入 流模式输出 批模式输出
1 1
2 3
3 6
4 10 10

1.2 状态访问和更新算法

对于有状态算子,DataSet 算子在迭代数据时直接在内存中维护最新的状态值。在 DataStream API 中,状态的访问和更新则是通过与 StateBackend 交互所进行的。实现流批一体的统一架构,就意味着 DataStream API 在流和批模式下要尽可能复用相同的算子实现。但是与 RocksDB 等 StateBackend 交互会带来不小的 IO 开销,站在 Flink 开发者的视角上,该如何解决这个问题呢?让我们更深入地思考一下他们之间的本质差异。

流模式下 DataStream API 上的聚合算法其实可以类比为基于 Hash 的聚合,StateBackend 在这里扮演着哈希表的角色。下图展示了在流模式下一个 KeyedOperator 的输入数据和状态存储的关系:

(绿色部分表示新数据到来后状态存储的更新)

我们可以看到:在流模式下,状态存储必须维持一个哈希表,为每个 Key 存储一条 Item。值得注意的是,该状态并不是完全存储在内存中的,达到一定阈值后需要溢写到磁盘。由于批作业是没有 Checkpoint 的,并且其 Shuffle 的中间数据直接写入到了磁盘中,发生 Failover 后直接从上一个 Stage 的数据重新计算状态即可,因此并不需要对状态进行持久化存储,理论上状态完全可以放在内存中。

接下来要考虑的是内存是否有 OOM 风险:对于单个 Key 来说,其状态不会非常大。由于批作业的数据是有界的,如果我们能对 key 进行分组,就可以在同一时间只追踪单一 Key 的状态。沿着这个思路,我们可以把基于 Hash 的状态访问算法变为基于排序的。因此,Flink 在批执行模式下会对 KeyOperator 的所有输入数据按 Key 进行排序,并且在该模式下使用一种特殊的 StateBackend,它在内存中追踪当前 Key 所对应的状态,当 Key 发生切换时清除上一个 Key 的状态值。

批执行模式下,一个 KeyedOperator 的输入数据和状态存储的关系如下图所示:

需要注意的是,这种方式引入了额外的数据排序开销,当状态访问的频率比较低,状态的数据量比较小时,对性能会有负面影响。但是考虑到绝大多数的批处理作业规模都比较大,其中的有状态算子往往需要 per-record 的访问和更新状态。比如对常见的 Join、Group Agg 等,往往存在很多重复 Key 的数据,该优化带来的收益通常比排序带来的开销要大的多。

1.3 EventTime 和 Watermark

实时数据流中事件可能是乱序的,即时间戳为 T 的事件可能出现在时间戳为 T+1 的事件之后。此外,系统无法确定是否将来还有时间戳为 t < T 的元素到来。因此,Flink 的流处理模式是建立在事件的顺序无法得到保证的前提下的。为了消除这种无序性带来的影响,Flink 引入了一种名为 Watermark 的标记。一个时间戳为 T 的 Watermark 到来,表示不会再收到或者可以直接忽略任何 t < T 的数据。

但在批执行模式下,数据是有界的,我们明确知道每一条数据的时间,因此可以认为不存在无法预知的迟到数据。发送中间的 Watermark 是没有意义的, 反而只会增加网络传输的压力和下游处理这些 Watermark 的复杂度。由于定时器和窗口的闭合都需要 Watermark 来触发,因此我们可以只在输入结束时发送 MAX_WATERMARK,或者在每个 Key 结束时发送 MAX_WATERMARK。这样既不会引入太多开销,又可以统一流批算子对于 EventTime 的处理。

2. Batch API 功能增强

需要注意的是,DataStream API 和 DataSet API 所支持的操作并非完全一一对应。Flink 社区有一个官方迁移文档来专门讲解如何从 DataSet 作业迁移到 DataStream 作业[3] (下文简称文档)。在该文档中,根据迁移所带来的代码改动和执行效率的差异,把 DataSet API 分成了四大类:

  1. 在 DataStream 有等价的 API,只需要很少的方法名改动就可以完成迁移。
  2. 通过 DataStream 上不等价的其他 API 可以实现同样的行为,迁移虽然需要进行代码改动,但是执行效率和 DataSet 相同。
  3. 通过 DataStream 上不等价的其他 API 可以实现同样的行为,迁移不仅需要进行代码改动,而且执行效率可能会存在一些差异。
  4. 目前 DataStream 没有支持,且没有简单的 Workaround 的 API。

按照目前 DataStream 上对这些操作的支持情况,我们又可以把它们进一步分为下面两大类:

2.1 完美支持或者可以通过 Workaround 支持

上述四类中,第1和第2类都属于可以无痛迁移的,第3类可以通过 Workaround 来实现,但是在执行效率上有比较大的差异。因此,我们主要关注于第三和第四类。

第三类主要有两种操作:全量 Partition 处理以及笛卡尔积。DataStream 上可以通过 Window 机制来支持这类需求,但是其中主要存在以下两个问题:

(1)需要明确知道输入在什么时候结束,在拿到全量数据后才能进行处理。

Flink 目前内置的窗口一般都是随着时间推进到某个具体的点,或者输入数据的量达到某个具体的值来触发的。并没有一种能够感知输入是否结束的窗口实现。文档通过自定义的 WindowAssigner 和 Window Trigger 实现了一种仅在输入结束时才触发计算的窗口。

随着用户作业的迁移,我们看到这种需求其实广泛存在,因此 Flink 社区在 FLIP-331[4] 中提出了EndOfStreamWindow 的概念,并会在 Flink 1.20 版本中进行支持,你可以通过如下方式来使用:

input.window(GlobalWindows.createWithEndOfStreamTrigger())
                .apply(
                        new WindowFunction<T, R, KEY, GlobalWindow>() {
   
                            @Override
                            public void apply(
                                    KEY key,
                                    GlobalWindow window,
                                    Iterable<T> input,
                                    Collector<R> out)
                                    throws Exception {
   
                                // do something with the iterable input, It has all the input data.
                            }
                        },
                        resultType);

(2)Non-Keyed Stream 上不支持窗口操作

Flink 中的窗口是基于 State 来实现的,而不同 Key 的 State 是不属于同一个命名空间的,因此窗口只有在能明确定义 Key 的流上才有意义。文档中引入如下函数来给数据附加上当前分区(并行度)的信息,然后以该字段作为数据的 Key。

public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> {
   
    @Override
    public Tuple2<String, T> map(T value) {
   
        return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
    }
}

这种方式虽然可以产生正确结果,但也引入了per-record的额外开销。为了优化这个问题,Flink 社区在 FLIP-380[5] 中引入了对 Non-Keyed 全量分区处理的原生支持。下面一一介绍这几个 API 的使用方式和注意事项:

2.1.1Map Partition

该 API 用来对一个分区的数据做全量处理,并在获取所有数据后进行输出。

假如我们需要计算每个分区内数据的条数,并输出给下游算子。可以使用如下方式来实现:

inputStream.fullWindowPartition()
                .mapPartition(
         new MapPartitionFunction<Record, Long>() {
   
                             @Override
                    public void mapPartition(
                            Iterable<Record> values, Collector<Long> out)
                            throws Exception {
   
                        long counter = 0;
                        for (T value : values) {
   
                            counter++;
                        }
                        out.collect(counter));
                    }
          })

它与 map 的主要区别如下:

MapPartition Map
计算触发时机 所有输入结束后触发一次 每条输入数据都会触发一次
输入数据类型 包含所有数据的 Iterable 对象 每条数据自身

值得注意的是:MapPartition 虽然给调用者提供了一个基于全量数据的 Iterable 对象,但它并不会把全量数据都加载到内存。该 API 的底层实现充分利用了 Flink 执行引擎的反压机制,在对 Iterable 对象进行迭代时只会按需把数据加载到内存。

2.1.2Reduce/Aggregate Partition

该 API 主要用于对分区内的数据做全量聚合,分别需要传入 ReduceFunction 和 AggregateFunction。ReduceFunction 描述了两条输入数据如何合并产生同样类型的输出数据,而 AggregateFunction 是更通用的 ReduceFunction, 它通过引入一个中间的 Accumulator, 支持产生不同类型的输出。

下面的例子展示了如何在一个双字段的 Tuple 数据流上对其第二个字段做全量聚合

inputStream.fullWindowPartition()
       .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
   
                        @Override
                        public Tuple2<String, Integer> reduce(
                              Tuple2<String, Integer> value1,
                              Tuple2<String, Integer> value2) throws Exception {
   
                          return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                                    }
                                })

2.1.3 Sort Partition

另一种比较重要的操作就是排序,对分区内数据进行排序的需求在批处理中是广泛存在的。理论上,我们可以通过 MapPartition 来轻松实现全内存的排序,但是在大规模 Batch 作业中,把数据全部加载到内存中往往是不现实的。sortPartition API 支持外部排序,在数据量到达一定阈值后会溢写磁盘,因此无需担心内存的 OOM 问题。

下面是一个对分区内数据做全量升序排列的示例代码:

DataStreamSource<Tuple2<String, Integer>> source = xxxxx
 // 按照 tuple 的第一个字段进行排序
 SingleOutputStreamOperator<Tuple2<String, Integer>> sortedPartition =
                source.fullWindowPartition().sortPartition(1, Order.ASCENDING);

注意:排序算子会使用 Flink Managed Memory。内存的大小会影响排序的效率,过小的内存会导致数据频繁地写入和读出磁盘。如果你的一些排序操作相对较重(数据 Record 比较大,数据量比较多),建议调大“execution.sort-partition.memory”值来提升性能。

2.2 目前还不支持

上述第四类代表目前 Flink DataStream API 还没有支持的操作。主要有两种: RangePartition 和 GroupCombine.

其中 GroupCombine 会把数据分成多个批次,对每个批次的数据进行合并。它并不是用户的业务需求,是引擎为了提高执行效率而对用户的需求,因此Flink 社区暂时没有计划支持该操作。而 RangePartition 基于现有的 DataStream API 可以实现,但是相对复杂(需要用户实现复杂的采样算法),笔者所在的团队已经对此在做 PoC 实现了,未来会在合适的时机贡献回社区。

3. 总结

本文回顾了 Flink 在批处理能力上从 DataSet API 到流批一体的 DataStream API 的演进,并从批处理语义&性能优化以及 Batch API 功能增强两大方面分别展示了 Flink 社区是如何思考和提升 DataStream 批处理能力的,相信随着社区的不断努力,Flink Batch 会越来越好。Flink DataStream API 的流批一体能力也将在数据处理领域扮演越来越重要的角色。

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741#FLIP131:ConsolidatetheuserfacingDataflowSDKs/APIs(anddeprecatetheDataSetAPI)-WhydoesFlinkhavethreeAPIs

[2] https://developer.aliyun.com/article/783112

[3] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/dataset_migration/

[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment

[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream

欢迎大家加入 Flink Batch 交流钉钉群。本群旨在为 Flink Batch 爱好者提供一个交流技术和传递资讯的平台,在这里:

  • 你可以掌握Flink Batch前沿的资讯,可以与 Flink 开发者及 Committer 面对面交流
  • Flink Batch 的问题集中解决,各位开发者及 Committer 及时解决你的 Blocker

“Flink Batch 交流群”群的钉钉群号: 34817520,也可以扫码加入

更多内容


img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
新用户复制点击下方链接或者扫描二维码即可0元免费试用 Flink + Paimon
实时计算 Flink 版(3000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

0CA9E977-9C4C-4444-94B3-F01C0B8C891B.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
221 0
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
202 0
|
3月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
44 0
|
5月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
339 0
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之怎么使用DataStream生成结果表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
7月前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
文字识别 小程序 算法
视觉智能开放平台产品使用合集之通过小程序接入视觉智能开放平台API能力,不是上海地域的OSS链接,该怎么办
视觉智能开放平台是指提供一系列基于视觉识别技术的API和服务的平台,这些服务通常包括图像识别、人脸识别、物体检测、文字识别、场景理解等。企业或开发者可以通过调用这些API,快速将视觉智能功能集成到自己的应用或服务中,而无需从零开始研发相关算法和技术。以下是一些常见的视觉智能开放平台产品及其应用场景的概览。
|
7月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
287 0

相关产品

  • 实时计算 Flink版