Flink DataSet API迁移到DataStream API实战

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。

背景

为什么要做迁移?

我们的项目从2019年第一个版本开始就采用Flink做批处理,而且业务场景一直是纯批处理的场景:从TiDB读取数据并计算之后存入Starrocks和ElasticSearch,没有实时的需求。当时Flink的版本批处理还是基于DataSet API来做,从1.12版本开始Flink逐步放弃DataSet API,推荐转向Table API/SQL或者DataStream API。目前我们在用的是Flink 1.16版本,仍然保留对DataSet API的支持,但是在最新的版本中已经没有了DataSet API,为了以后的版本升级,我们就需要把现在这些基于DataSet API的job逐步迁移到DataStream API。

为什么不迁移到Table API/SQL

官方文档推荐基于DataSet API的批处理job迁移到Table API/SQL,但是我们在做了一些调研后发现Table API/SQL并不适合于我们的场景,主要有以下几个问题:
在DataSet API下我们可以基于InputFormat实现非常灵活的并行数据读取,比如基于date=?来实现按日期的并发读取,但是在Table API/SQL下没法实现数据的并行读取,我们甚至实现了一套完整的基于JDBC的Catalog都没法达到我们的要求。
对于基于一部分数据读取关联数据的场景(实现类似于SQL中的join操作),我们在DataSet API下实现了丰富的MapFunction/MapPartitionFunction来实现并行批量的读取关联数据,但是在Table API/SQL下实现join要么先把底层的数据全部读进来之后再在Flink内部来实现join,这样会读取大量无用的数据对TiDB造成很大的负载,要么用点查来做的话不支持批量查询,效率太低。
在数据的处理过程中有非常复杂的业务逻辑基于DataSet API可以很方便的实现,但是这些没法转化成SQL的实现。
基于以上的问题迁移到DataStream API是我们唯一的选择。

迁移方案

https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/dataset_migration/ 这篇文章是官方文档中的迁移指南,我们的迁移主要也是参考了这篇文章。但是在迁移的过程中还是会遇到各种各样的困惑,主要是以下的几个方面。

Runtime Mode

DataStream API支持三种Runtime Mode:Streaming、Batch、Auto。Streaming就是流处理,Batch就是批处理,Auto是在有界数据上采用批处理模式,无界数据上采用流处理的模式。关于批处理和流处理模式的具体区别可以看官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/execution_mode/

因为我们是纯批处理的场景,所以Runtime Mode选择的是Batch。但是我其实有一个疑问就是Streaming 模式和Batch模式下的Pipeline Execution Mode有什么区别?因为Pipeline Execution Mode也是一种类似于流式的处理方式。官方同学的回复是:“不一样,只是shuffle实现用了pipeline shuffle,批和流模式下算子的语义不完全相同比如streaming会有回撤流等等。比如 sum 算子,在流模式下每来一条数据都会往下输出一条,但是批模式下每个key才会输出一条”。

数据输入

前面提到了Runtime Mode选择Batch模式,但是选择Batch模式有一个前提是要求所有的输入都是有界的。StreamExecutionEnvironment有一个createInput方法可以接收我们原来在DataSet下使用的各种InputFormat,但是这个方法生成的输入是无界的,使用这个方法提供的输入那还是得采用Streaming模式。所以我们参考StreamExecutionEnvironment的addSource方法自己实现了一个方法把InputFormat转变成一个有界的输入,这样就可以在Batch模式下继续使用原来的InputFormat:

public static <OUT> DataStreamSource<OUT> createStreamSourceFromInputFormat(
            StreamExecutionEnvironment environment, InputFormat<OUT, ?> inputFormat, String name) {

        if (StringUtils.isBlank(name)) {
            name = "Custom Input Format Source";
        }

        TypeInformation<OUT> inputFormatTypes = TypeExtractor.getInputFormatTypes(inputFormat);
        InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, inputFormatTypes);
        TypeInformation<OUT> resolvedTypeInfo = inputFormatTypes;

        if (resolvedTypeInfo == null && function instanceof ResultTypeQueryable) {
            resolvedTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
        }
        if (resolvedTypeInfo == null) {
            try {
                resolvedTypeInfo =
                        TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(), 0, null, null);
            } catch (final InvalidTypesException e) {
                resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(name, e);
            }
        }

        boolean isParallel = function instanceof ParallelSourceFunction;

        if (environment.getConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(function, environment.getConfig().getClosureCleanerLevel(), true);
        }
        ClosureCleaner.ensureSerializable(function);

        final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
        return new DataStreamSource<>(
                environment, resolvedTypeInfo, sourceOperator, isParallel, name, Boundedness.BOUNDED);
    }

我们另外也开发了一个实现Source接口的TableSource类,但是本质上跟上面的方法+InputFormat差不多,而且还可以复用之前的InputFormat,所有就没有使用这个TableSource类。

数据输出

DataSet下的数据输出采用的是OutputFormat,DataStream有一个输出的方法writeUsingOutputFormat可以使用原来的OutputFormat,但是这个方法已经标记为废弃了,所以我们还是切换到了addSink上做输出:对TiDB/MySQL的输出就采用JdbcSink,对StarRocks和ElasticSearch的Sink我们也自己实现了一套,基于原来对应的OutputFormat修改起来很方便。

数据处理

其实原来的DataSet 这一套API非常好用,切换到DataStream之后即使有上面的迁移文档,有很多的数据操作实现起来还是没有比原来的API更复杂或者和原来的使用方式不一样。比如比较核心的一点是大部分的操作都需要开窗:先把数据使用keyBy分组然后使用迁移文档里的EndOfStreamWindows开窗再处理,这个需要适应一下。

另外一个比较大的问题是DataStream下没有了mapPartition方法,在DataSet API下我们经常在rebalance之后使用mapPartition进行处理,但是在DataStream下并没有对应的方法。迁移文档里提供了一种方式是先把数据使用AddSubtaskIDMapFunction做一次map之后再根据subtaskId做分组开窗处理,但是这个会改变原来DataStream下的数据类型,强行嵌套一层Tuple在外面,还是很不方便的。我们现在采用的方式是直接找一个分布相对均匀的字段做keyBy再开窗处理。好消息是官方透露在1.20的版本上会提供DataStream的mapPartition方法,还会有sortPatition等方法,具体可以看这个FLIP:https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream

可行性

基于上面的这套方案我们已经迁移了10多个job到DataStream API,目前迁移后的job运行都正常,我们也会在后续的版本迭代中逐步的完成所有job的迁移。

参考

除了上面提到的一些文档,还有一些与批处理相关的FLIP可以参考:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API#FLIP134:BatchexecutionfortheDataStreamAPI-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data#FLIP327:Supportswitchingfrombatchtostreammodetoimprovethroughputwhenprocessingbacklogdata-Motivation

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
7天前
|
供应链 监控 安全
1688商品详情API接口实战指南:合规获取数据,驱动B2B业务增长
1688商品详情API(alibaba.product.get)是合规获取B2B商品数据的核心工具,支持全维度信息调用,助力企业实现智能选品、供应链优化与市场洞察,推动数字化转型。
|
7天前
|
缓存 监控 供应链
亚马逊 MWS API 实战:商品详情精准获取与跨境电商数据整合方案
本文详细解析亚马逊MWS API接口的技术实现,重点解决跨境商品数据获取中的核心问题。文章首先介绍MWS接口体系的特点,包括多站点数据获取、AWS签名认证等关键环节,并对比普通电商接口的差异。随后深入拆解API调用全流程,提供签名工具类、多站点客户端等可复用代码。针对跨境业务场景,文章还给出数据整合工具实现方案,支持缓存、批量处理等功能。最后通过实战示例展示多站点商品对比和批量选品分析的应用,并附常见问题解决方案。该技术方案可直接应用于跨境选品、价格监控等业务场景,帮助开发者高效获取亚马逊商品数据。
|
12天前
|
数据采集 JSON API
微店商品列表API接口开发指南:从零到实战
微店商品列表API(vdian.shop.item.list.get)用于获取店铺商品数据,支持分页、签名认证,返回JSON格式。适用于商品同步、竞品分析、多平台展示及数据清洗。提供Python请求示例,便于快速接入。
|
12天前
|
存储 监控 前端开发
淘宝商品详情 API 实战:5 大策略提升店铺转化率(附签名优化代码 + 避坑指南)
本文深入解析淘宝商品详情API的核心字段与实战应用,分享如何通过动态定价、库存预警、差评控制等5大策略提升电商转化率。结合300+店铺实战经验,提供优化代码与避坑指南,助力开发者与运营者实现数据驱动的精细化运营。
|
12天前
|
JSON 数据挖掘 API
微店商品详情API接口开发指南:从零到实战
微店商品详情API(micro.item_get)用于获取商品名称、价格、库存等信息,支持HTTP GET/POST请求,返回JSON格式数据,适用于电商开发、店铺管理与数据分析。提供Python请求示例,便于快速集成调用,适用于多店铺管理、跨平台展示及价格监控等场景。
|
12天前
|
算法 API 数据安全/隐私保护
电商 API 双平台实战:淘宝 item.get + 京东 item_detail 对接指南(附可复用代码 + 问题排查)
本文详细解析了淘宝和京东双平台API对接的核心流程,涵盖资质申请、凭证获取、签名生成、高频接口调用及常见问题解决方案,助力开发者高效实现商品数据同步与管理。
监控 安全 API
79 0
JSON 数据挖掘 API
75 0
存储 人工智能 安全
121 4
|
18天前
|
人工智能 数据可视化 测试技术
AI 时代 API 自动化测试实战:Postman 断言的核心技巧与实战应用
AI 时代 API 自动化测试实战:Postman 断言的核心技巧与实战应用
259 11

相关产品

  • 实时计算 Flink版