Flink DataSet API迁移到DataStream API实战

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。

背景

为什么要做迁移?

我们的项目从2019年第一个版本开始就采用Flink做批处理,而且业务场景一直是纯批处理的场景:从TiDB读取数据并计算之后存入Starrocks和ElasticSearch,没有实时的需求。当时Flink的版本批处理还是基于DataSet API来做,从1.12版本开始Flink逐步放弃DataSet API,推荐转向Table API/SQL或者DataStream API。目前我们在用的是Flink 1.16版本,仍然保留对DataSet API的支持,但是在最新的版本中已经没有了DataSet API,为了以后的版本升级,我们就需要把现在这些基于DataSet API的job逐步迁移到DataStream API。

为什么不迁移到Table API/SQL

官方文档推荐基于DataSet API的批处理job迁移到Table API/SQL,但是我们在做了一些调研后发现Table API/SQL并不适合于我们的场景,主要有以下几个问题:
在DataSet API下我们可以基于InputFormat实现非常灵活的并行数据读取,比如基于date=?来实现按日期的并发读取,但是在Table API/SQL下没法实现数据的并行读取,我们甚至实现了一套完整的基于JDBC的Catalog都没法达到我们的要求。
对于基于一部分数据读取关联数据的场景(实现类似于SQL中的join操作),我们在DataSet API下实现了丰富的MapFunction/MapPartitionFunction来实现并行批量的读取关联数据,但是在Table API/SQL下实现join要么先把底层的数据全部读进来之后再在Flink内部来实现join,这样会读取大量无用的数据对TiDB造成很大的负载,要么用点查来做的话不支持批量查询,效率太低。
在数据的处理过程中有非常复杂的业务逻辑基于DataSet API可以很方便的实现,但是这些没法转化成SQL的实现。
基于以上的问题迁移到DataStream API是我们唯一的选择。

迁移方案

https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/dataset_migration/ 这篇文章是官方文档中的迁移指南,我们的迁移主要也是参考了这篇文章。但是在迁移的过程中还是会遇到各种各样的困惑,主要是以下的几个方面。

Runtime Mode

DataStream API支持三种Runtime Mode:Streaming、Batch、Auto。Streaming就是流处理,Batch就是批处理,Auto是在有界数据上采用批处理模式,无界数据上采用流处理的模式。关于批处理和流处理模式的具体区别可以看官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/execution_mode/

因为我们是纯批处理的场景,所以Runtime Mode选择的是Batch。但是我其实有一个疑问就是Streaming 模式和Batch模式下的Pipeline Execution Mode有什么区别?因为Pipeline Execution Mode也是一种类似于流式的处理方式。官方同学的回复是:“不一样,只是shuffle实现用了pipeline shuffle,批和流模式下算子的语义不完全相同比如streaming会有回撤流等等。比如 sum 算子,在流模式下每来一条数据都会往下输出一条,但是批模式下每个key才会输出一条”。

数据输入

前面提到了Runtime Mode选择Batch模式,但是选择Batch模式有一个前提是要求所有的输入都是有界的。StreamExecutionEnvironment有一个createInput方法可以接收我们原来在DataSet下使用的各种InputFormat,但是这个方法生成的输入是无界的,使用这个方法提供的输入那还是得采用Streaming模式。所以我们参考StreamExecutionEnvironment的addSource方法自己实现了一个方法把InputFormat转变成一个有界的输入,这样就可以在Batch模式下继续使用原来的InputFormat:

public static <OUT> DataStreamSource<OUT> createStreamSourceFromInputFormat(
            StreamExecutionEnvironment environment, InputFormat<OUT, ?> inputFormat, String name) {

        if (StringUtils.isBlank(name)) {
            name = "Custom Input Format Source";
        }

        TypeInformation<OUT> inputFormatTypes = TypeExtractor.getInputFormatTypes(inputFormat);
        InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, inputFormatTypes);
        TypeInformation<OUT> resolvedTypeInfo = inputFormatTypes;

        if (resolvedTypeInfo == null && function instanceof ResultTypeQueryable) {
            resolvedTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
        }
        if (resolvedTypeInfo == null) {
            try {
                resolvedTypeInfo =
                        TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(), 0, null, null);
            } catch (final InvalidTypesException e) {
                resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(name, e);
            }
        }

        boolean isParallel = function instanceof ParallelSourceFunction;

        if (environment.getConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(function, environment.getConfig().getClosureCleanerLevel(), true);
        }
        ClosureCleaner.ensureSerializable(function);

        final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
        return new DataStreamSource<>(
                environment, resolvedTypeInfo, sourceOperator, isParallel, name, Boundedness.BOUNDED);
    }

我们另外也开发了一个实现Source接口的TableSource类,但是本质上跟上面的方法+InputFormat差不多,而且还可以复用之前的InputFormat,所有就没有使用这个TableSource类。

数据输出

DataSet下的数据输出采用的是OutputFormat,DataStream有一个输出的方法writeUsingOutputFormat可以使用原来的OutputFormat,但是这个方法已经标记为废弃了,所以我们还是切换到了addSink上做输出:对TiDB/MySQL的输出就采用JdbcSink,对StarRocks和ElasticSearch的Sink我们也自己实现了一套,基于原来对应的OutputFormat修改起来很方便。

数据处理

其实原来的DataSet 这一套API非常好用,切换到DataStream之后即使有上面的迁移文档,有很多的数据操作实现起来还是没有比原来的API更复杂或者和原来的使用方式不一样。比如比较核心的一点是大部分的操作都需要开窗:先把数据使用keyBy分组然后使用迁移文档里的EndOfStreamWindows开窗再处理,这个需要适应一下。

另外一个比较大的问题是DataStream下没有了mapPartition方法,在DataSet API下我们经常在rebalance之后使用mapPartition进行处理,但是在DataStream下并没有对应的方法。迁移文档里提供了一种方式是先把数据使用AddSubtaskIDMapFunction做一次map之后再根据subtaskId做分组开窗处理,但是这个会改变原来DataStream下的数据类型,强行嵌套一层Tuple在外面,还是很不方便的。我们现在采用的方式是直接找一个分布相对均匀的字段做keyBy再开窗处理。好消息是官方透露在1.20的版本上会提供DataStream的mapPartition方法,还会有sortPatition等方法,具体可以看这个FLIP:https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream

可行性

基于上面的这套方案我们已经迁移了10多个job到DataStream API,目前迁移后的job运行都正常,我们也会在后续的版本迭代中逐步的完成所有job的迁移。

参考

除了上面提到的一些文档,还有一些与批处理相关的FLIP可以参考:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API#FLIP134:BatchexecutionfortheDataStreamAPI-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data#FLIP327:Supportswitchingfrombatchtostreammodetoimprovethroughputwhenprocessingbacklogdata-Motivation

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
4天前
|
JSON 数据管理 关系型数据库
【Dataphin V3.9】颠覆你的数据管理体验!API数据源接入与集成优化,如何让企业轻松驾驭海量异构数据,实现数据价值最大化?全面解析、实战案例、专业指导,带你解锁数据整合新技能!
【8月更文挑战第15天】随着大数据技术的发展,企业对数据处理的需求不断增长。Dataphin V3.9 版本提供更灵活的数据源接入和高效 API 集成能力,支持 MySQL、Oracle、Hive 等多种数据源,增强 RESTful 和 SOAP API 支持,简化外部数据服务集成。例如,可轻松从 RESTful API 获取销售数据并存储分析。此外,Dataphin V3.9 还提供数据同步工具和丰富的数据治理功能,确保数据质量和一致性,助力企业最大化数据价值。
15 1
|
27天前
|
前端开发 API 数据库
告别繁琐,拥抱简洁!Python RESTful API 设计实战,让 API 调用如丝般顺滑!
【7月更文挑战第23天】在Python的Flask框架下构建RESTful API,为在线商店管理商品、订单及用户信息。以商品管理为例,设计简洁API端点,如GET `/products`获取商品列表,POST `/products`添加商品,PUT和DELETE则分别用于更新和删除商品。使用SQLAlchemy ORM与SQLite数据库交互,确保数据一致性。实战中还应加入数据验证、错误处理和权限控制,使API既高效又安全,便于前端或其他服务无缝对接。
47 9
|
24天前
|
API
OpenAI用不了?丝滑迁移通义API!
OpenAI用不了?丝滑迁移通义API!
30 1
|
1月前
|
JavaScript 应用服务中间件 API
Node.js搭建REST API实战:从基础到部署
【7月更文挑战第18天】通过以上步骤,你可以将你的Node.js REST API从开发环境顺利迁移到生产环境,并利用各种工具和技术来确保应用的稳定性、安全性和可扩展性。
|
1月前
|
前端开发 API 开发者
Python Web开发者必看!AJAX、Fetch API实战技巧,让前后端交互如丝般顺滑!
【7月更文挑战第13天】在Web开发中,AJAX和Fetch API是实现页面无刷新数据交换的关键。在Flask博客系统中,通过创建获取评论的GET路由,我们可以展示使用AJAX和Fetch API的前端实现。AJAX通过XMLHttpRequest发送请求,处理响应并在成功时更新DOM。Fetch API则使用Promise简化异步操作,代码更现代。这两个工具都能实现不刷新页面查看评论,Fetch API的语法更简洁,错误处理更直观。掌握这些技巧能提升Python Web项目的用户体验和开发效率。
44 7
|
1月前
|
安全 Java API
Nest.js 实战 (三):使用 Swagger 优雅地生成 API 文档
这篇文章介绍了Swagger,它是一组开源工具,围绕OpenAPI规范帮助设计、构建、记录和使用RESTAPI。文章主要讨论了Swagger的主要工具,包括SwaggerEditor、SwaggerUI、SwaggerCodegen等。然后介绍了如何在Nest框架中集成Swagger,展示了安装依赖、定义DTO和控制器等步骤,以及如何使用Swagger装饰器。文章最后总结说,集成Swagger文档可以自动生成和维护API文档,规范API标准化和一致性,但会增加开发者工作量,需要保持注释和装饰器的准确性。
Nest.js 实战 (三):使用 Swagger 优雅地生成 API 文档
|
1月前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
17天前
|
JavaScript 前端开发 中间件
打造卓越后端:构建高效API的最佳实践与实战代码示例——解锁高性能Web服务的秘密
【8月更文挑战第2天】构建高效后端API:最佳实践与代码示例
30 0
|
25天前
|
API 开发者 Python
淘宝商品详情API接口开发实战
在电商领域,获取淘宝商品详情是关键需求。需先注册淘宝开放平台账号并创建应用,获取AppKey与AppSecret;随后申请商品服务API权限。利用Python,通过AppKey和AppSecret获取Access Token,进而调用商品详情API,需替换示例代码中的`your_app_key`, `your_app_secret`, `your_access_token`, 和`item_id`。注意遵守平台限制,处理可能的错误及合理规划调用策略以避免违规。[示例代码](https://)展示了从获取Access Token到调用商品详情API的全过程。

热门文章

最新文章

相关产品

  • 实时计算 Flink版