Flink DataSet API迁移到DataStream API实战

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。

背景

为什么要做迁移?

我们的项目从2019年第一个版本开始就采用Flink做批处理,而且业务场景一直是纯批处理的场景:从TiDB读取数据并计算之后存入Starrocks和ElasticSearch,没有实时的需求。当时Flink的版本批处理还是基于DataSet API来做,从1.12版本开始Flink逐步放弃DataSet API,推荐转向Table API/SQL或者DataStream API。目前我们在用的是Flink 1.16版本,仍然保留对DataSet API的支持,但是在最新的版本中已经没有了DataSet API,为了以后的版本升级,我们就需要把现在这些基于DataSet API的job逐步迁移到DataStream API。

为什么不迁移到Table API/SQL

官方文档推荐基于DataSet API的批处理job迁移到Table API/SQL,但是我们在做了一些调研后发现Table API/SQL并不适合于我们的场景,主要有以下几个问题:
在DataSet API下我们可以基于InputFormat实现非常灵活的并行数据读取,比如基于date=?来实现按日期的并发读取,但是在Table API/SQL下没法实现数据的并行读取,我们甚至实现了一套完整的基于JDBC的Catalog都没法达到我们的要求。
对于基于一部分数据读取关联数据的场景(实现类似于SQL中的join操作),我们在DataSet API下实现了丰富的MapFunction/MapPartitionFunction来实现并行批量的读取关联数据,但是在Table API/SQL下实现join要么先把底层的数据全部读进来之后再在Flink内部来实现join,这样会读取大量无用的数据对TiDB造成很大的负载,要么用点查来做的话不支持批量查询,效率太低。
在数据的处理过程中有非常复杂的业务逻辑基于DataSet API可以很方便的实现,但是这些没法转化成SQL的实现。
基于以上的问题迁移到DataStream API是我们唯一的选择。

迁移方案

https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/dataset_migration/ 这篇文章是官方文档中的迁移指南,我们的迁移主要也是参考了这篇文章。但是在迁移的过程中还是会遇到各种各样的困惑,主要是以下的几个方面。

Runtime Mode

DataStream API支持三种Runtime Mode:Streaming、Batch、Auto。Streaming就是流处理,Batch就是批处理,Auto是在有界数据上采用批处理模式,无界数据上采用流处理的模式。关于批处理和流处理模式的具体区别可以看官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/execution_mode/

因为我们是纯批处理的场景,所以Runtime Mode选择的是Batch。但是我其实有一个疑问就是Streaming 模式和Batch模式下的Pipeline Execution Mode有什么区别?因为Pipeline Execution Mode也是一种类似于流式的处理方式。官方同学的回复是:“不一样,只是shuffle实现用了pipeline shuffle,批和流模式下算子的语义不完全相同比如streaming会有回撤流等等。比如 sum 算子,在流模式下每来一条数据都会往下输出一条,但是批模式下每个key才会输出一条”。

数据输入

前面提到了Runtime Mode选择Batch模式,但是选择Batch模式有一个前提是要求所有的输入都是有界的。StreamExecutionEnvironment有一个createInput方法可以接收我们原来在DataSet下使用的各种InputFormat,但是这个方法生成的输入是无界的,使用这个方法提供的输入那还是得采用Streaming模式。所以我们参考StreamExecutionEnvironment的addSource方法自己实现了一个方法把InputFormat转变成一个有界的输入,这样就可以在Batch模式下继续使用原来的InputFormat:

public static <OUT> DataStreamSource<OUT> createStreamSourceFromInputFormat(
            StreamExecutionEnvironment environment, InputFormat<OUT, ?> inputFormat, String name) {

        if (StringUtils.isBlank(name)) {
            name = "Custom Input Format Source";
        }

        TypeInformation<OUT> inputFormatTypes = TypeExtractor.getInputFormatTypes(inputFormat);
        InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, inputFormatTypes);
        TypeInformation<OUT> resolvedTypeInfo = inputFormatTypes;

        if (resolvedTypeInfo == null && function instanceof ResultTypeQueryable) {
            resolvedTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
        }
        if (resolvedTypeInfo == null) {
            try {
                resolvedTypeInfo =
                        TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(), 0, null, null);
            } catch (final InvalidTypesException e) {
                resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(name, e);
            }
        }

        boolean isParallel = function instanceof ParallelSourceFunction;

        if (environment.getConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(function, environment.getConfig().getClosureCleanerLevel(), true);
        }
        ClosureCleaner.ensureSerializable(function);

        final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
        return new DataStreamSource<>(
                environment, resolvedTypeInfo, sourceOperator, isParallel, name, Boundedness.BOUNDED);
    }

我们另外也开发了一个实现Source接口的TableSource类,但是本质上跟上面的方法+InputFormat差不多,而且还可以复用之前的InputFormat,所有就没有使用这个TableSource类。

数据输出

DataSet下的数据输出采用的是OutputFormat,DataStream有一个输出的方法writeUsingOutputFormat可以使用原来的OutputFormat,但是这个方法已经标记为废弃了,所以我们还是切换到了addSink上做输出:对TiDB/MySQL的输出就采用JdbcSink,对StarRocks和ElasticSearch的Sink我们也自己实现了一套,基于原来对应的OutputFormat修改起来很方便。

数据处理

其实原来的DataSet 这一套API非常好用,切换到DataStream之后即使有上面的迁移文档,有很多的数据操作实现起来还是没有比原来的API更复杂或者和原来的使用方式不一样。比如比较核心的一点是大部分的操作都需要开窗:先把数据使用keyBy分组然后使用迁移文档里的EndOfStreamWindows开窗再处理,这个需要适应一下。

另外一个比较大的问题是DataStream下没有了mapPartition方法,在DataSet API下我们经常在rebalance之后使用mapPartition进行处理,但是在DataStream下并没有对应的方法。迁移文档里提供了一种方式是先把数据使用AddSubtaskIDMapFunction做一次map之后再根据subtaskId做分组开窗处理,但是这个会改变原来DataStream下的数据类型,强行嵌套一层Tuple在外面,还是很不方便的。我们现在采用的方式是直接找一个分布相对均匀的字段做keyBy再开窗处理。好消息是官方透露在1.20的版本上会提供DataStream的mapPartition方法,还会有sortPatition等方法,具体可以看这个FLIP:https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream

可行性

基于上面的这套方案我们已经迁移了10多个job到DataStream API,目前迁移后的job运行都正常,我们也会在后续的版本迭代中逐步的完成所有job的迁移。

参考

除了上面提到的一些文档,还有一些与批处理相关的FLIP可以参考:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API#FLIP134:BatchexecutionfortheDataStreamAPI-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data#FLIP327:Supportswitchingfrombatchtostreammodetoimprovethroughputwhenprocessingbacklogdata-Motivation

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
48 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
53 0
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
131 0
|
9天前
|
JSON BI API
商城上货API接口的实战案例
在商城上货过程中,API接口扮演着至关重要的角色。以下是对商城上货API接口的实战分析,涵盖其主要功能、类型、安全性以及实战案例等方面。
|
6天前
|
XML 数据可视化 API
商品详情数据实战案例,API接口系列
淘宝商品详情数据在电商领域具有广泛的应用价值,而淘宝商品详情API接口则为开发者提供了获取这些数据的重要途径。通过合理利用这些接口和数据,可以提升业务效率、优化用户体验,为电商行业的发展注入新的活力。
|
11天前
|
前端开发 API 开发者
Python Web开发者必看!AJAX、Fetch API实战技巧,让前后端交互如丝般顺滑!
在Web开发中,前后端的高效交互是提升用户体验的关键。本文通过一个基于Flask框架的博客系统实战案例,详细介绍了如何使用AJAX和Fetch API实现不刷新页面查看评论的功能。从后端路由设置到前端请求处理,全面展示了这两种技术的应用技巧,帮助Python Web开发者提升项目质量和开发效率。
26 1
|
16天前
|
存储 JSON API
淘宝API接口实战:高效获取商品标题、分类及店铺名称
在淘宝API接口实战中,通过以下步骤高效获取商品标题、分类及店铺名称:1. 准备工作:了解淘宝开放平台文档,注册开发者账号,选择开发语言和工具。2. 获取API访问权限:申请相应权限,提供应用场景说明。3. 调用API接口:构建HTTP请求,提供必要参数。4. 解析响应数据:提取JSON数据中的所需信息。5. 数据处理和存储:进一步处理并存储数据。6. 注意事项:遵守使用规范,注意调用频率和数据安全。示例代码使用Python调用淘宝API。
|
1月前
|
Java Shell 流计算
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
23 1
Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
|
1月前
|
前端开发 API
Context API 实战应用
【10月更文挑战第8天】在 React 应用开发中,状态管理至关重要。本文介绍了 `Context API` 的基础概念、基本用法,以及常见问题和易错点的解决方法。通过代码示例,详细讲解了如何在组件间高效共享状态,优化性能,处理嵌套 Context 和副作用。
17 1
|
1月前
|
存储 Java 数据处理
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
35 1

相关产品

  • 实时计算 Flink版