Flink DataSet API迁移到DataStream API实战

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。

背景

为什么要做迁移?

我们的项目从2019年第一个版本开始就采用Flink做批处理,而且业务场景一直是纯批处理的场景:从TiDB读取数据并计算之后存入Starrocks和ElasticSearch,没有实时的需求。当时Flink的版本批处理还是基于DataSet API来做,从1.12版本开始Flink逐步放弃DataSet API,推荐转向Table API/SQL或者DataStream API。目前我们在用的是Flink 1.16版本,仍然保留对DataSet API的支持,但是在最新的版本中已经没有了DataSet API,为了以后的版本升级,我们就需要把现在这些基于DataSet API的job逐步迁移到DataStream API。

为什么不迁移到Table API/SQL

官方文档推荐基于DataSet API的批处理job迁移到Table API/SQL,但是我们在做了一些调研后发现Table API/SQL并不适合于我们的场景,主要有以下几个问题:
在DataSet API下我们可以基于InputFormat实现非常灵活的并行数据读取,比如基于date=?来实现按日期的并发读取,但是在Table API/SQL下没法实现数据的并行读取,我们甚至实现了一套完整的基于JDBC的Catalog都没法达到我们的要求。
对于基于一部分数据读取关联数据的场景(实现类似于SQL中的join操作),我们在DataSet API下实现了丰富的MapFunction/MapPartitionFunction来实现并行批量的读取关联数据,但是在Table API/SQL下实现join要么先把底层的数据全部读进来之后再在Flink内部来实现join,这样会读取大量无用的数据对TiDB造成很大的负载,要么用点查来做的话不支持批量查询,效率太低。
在数据的处理过程中有非常复杂的业务逻辑基于DataSet API可以很方便的实现,但是这些没法转化成SQL的实现。
基于以上的问题迁移到DataStream API是我们唯一的选择。

迁移方案

https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/dataset_migration/ 这篇文章是官方文档中的迁移指南,我们的迁移主要也是参考了这篇文章。但是在迁移的过程中还是会遇到各种各样的困惑,主要是以下的几个方面。

Runtime Mode

DataStream API支持三种Runtime Mode:Streaming、Batch、Auto。Streaming就是流处理,Batch就是批处理,Auto是在有界数据上采用批处理模式,无界数据上采用流处理的模式。关于批处理和流处理模式的具体区别可以看官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/execution_mode/

因为我们是纯批处理的场景,所以Runtime Mode选择的是Batch。但是我其实有一个疑问就是Streaming 模式和Batch模式下的Pipeline Execution Mode有什么区别?因为Pipeline Execution Mode也是一种类似于流式的处理方式。官方同学的回复是:“不一样,只是shuffle实现用了pipeline shuffle,批和流模式下算子的语义不完全相同比如streaming会有回撤流等等。比如 sum 算子,在流模式下每来一条数据都会往下输出一条,但是批模式下每个key才会输出一条”。

数据输入

前面提到了Runtime Mode选择Batch模式,但是选择Batch模式有一个前提是要求所有的输入都是有界的。StreamExecutionEnvironment有一个createInput方法可以接收我们原来在DataSet下使用的各种InputFormat,但是这个方法生成的输入是无界的,使用这个方法提供的输入那还是得采用Streaming模式。所以我们参考StreamExecutionEnvironment的addSource方法自己实现了一个方法把InputFormat转变成一个有界的输入,这样就可以在Batch模式下继续使用原来的InputFormat:

public static <OUT> DataStreamSource<OUT> createStreamSourceFromInputFormat(
            StreamExecutionEnvironment environment, InputFormat<OUT, ?> inputFormat, String name) {

        if (StringUtils.isBlank(name)) {
            name = "Custom Input Format Source";
        }

        TypeInformation<OUT> inputFormatTypes = TypeExtractor.getInputFormatTypes(inputFormat);
        InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<>(inputFormat, inputFormatTypes);
        TypeInformation<OUT> resolvedTypeInfo = inputFormatTypes;

        if (resolvedTypeInfo == null && function instanceof ResultTypeQueryable) {
            resolvedTypeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
        }
        if (resolvedTypeInfo == null) {
            try {
                resolvedTypeInfo =
                        TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(), 0, null, null);
            } catch (final InvalidTypesException e) {
                resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(name, e);
            }
        }

        boolean isParallel = function instanceof ParallelSourceFunction;

        if (environment.getConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(function, environment.getConfig().getClosureCleanerLevel(), true);
        }
        ClosureCleaner.ensureSerializable(function);

        final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
        return new DataStreamSource<>(
                environment, resolvedTypeInfo, sourceOperator, isParallel, name, Boundedness.BOUNDED);
    }

我们另外也开发了一个实现Source接口的TableSource类,但是本质上跟上面的方法+InputFormat差不多,而且还可以复用之前的InputFormat,所有就没有使用这个TableSource类。

数据输出

DataSet下的数据输出采用的是OutputFormat,DataStream有一个输出的方法writeUsingOutputFormat可以使用原来的OutputFormat,但是这个方法已经标记为废弃了,所以我们还是切换到了addSink上做输出:对TiDB/MySQL的输出就采用JdbcSink,对StarRocks和ElasticSearch的Sink我们也自己实现了一套,基于原来对应的OutputFormat修改起来很方便。

数据处理

其实原来的DataSet 这一套API非常好用,切换到DataStream之后即使有上面的迁移文档,有很多的数据操作实现起来还是没有比原来的API更复杂或者和原来的使用方式不一样。比如比较核心的一点是大部分的操作都需要开窗:先把数据使用keyBy分组然后使用迁移文档里的EndOfStreamWindows开窗再处理,这个需要适应一下。

另外一个比较大的问题是DataStream下没有了mapPartition方法,在DataSet API下我们经常在rebalance之后使用mapPartition进行处理,但是在DataStream下并没有对应的方法。迁移文档里提供了一种方式是先把数据使用AddSubtaskIDMapFunction做一次map之后再根据subtaskId做分组开窗处理,但是这个会改变原来DataStream下的数据类型,强行嵌套一层Tuple在外面,还是很不方便的。我们现在采用的方式是直接找一个分布相对均匀的字段做keyBy再开窗处理。好消息是官方透露在1.20的版本上会提供DataStream的mapPartition方法,还会有sortPatition等方法,具体可以看这个FLIP:https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream

可行性

基于上面的这套方案我们已经迁移了10多个job到DataStream API,目前迁移后的job运行都正常,我们也会在后续的版本迭代中逐步的完成所有job的迁移。

参考

除了上面提到的一些文档,还有一些与批处理相关的FLIP可以参考:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API#FLIP134:BatchexecutionfortheDataStreamAPI-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Motivation
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data#FLIP327:Supportswitchingfrombatchtostreammodetoimprovethroughputwhenprocessingbacklogdata-Motivation

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
231 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
程序员 API 开发者
实战阿里qwen2.5-coder 32B,如何配置Cline的Ollama API接口。
阿里Qwen2.5大模型开源免费,适合编程应用。在Ollama平台下载时,推荐选择带有“cline”字样的Qwen2.5-Coder版本,仅需额外下载适配文件,无需重复下载模型文件。Ollama环境永久免费,配置简单,效果出色,适合开发者使用。
3039 77
|
10天前
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
312 12
Flink CDC YAML:面向数据集成的 API 设计
|
1月前
|
供应链 搜索推荐 API
深度解析1688 API对电商的影响与实战应用
在全球电子商务迅猛发展的背景下,1688作为知名的B2B电商平台,为中小企业提供商品批发、分销、供应链管理等一站式服务,并通过开放的API接口,为开发者和电商企业提供数据资源和功能支持。本文将深入解析1688 API的功能(如商品搜索、详情、订单管理等)、应用场景(如商品展示、搜索优化、交易管理和用户行为分析)、收益分析(如流量增长、销售提升、库存优化和成本降低)及实际案例,帮助电商从业者提升运营效率和商业收益。
181 20
|
30天前
|
XML API 开发者
探究获取亚马逊畅销榜API接口及实战应用
亚马逊MWS(商城网络服务)提供了一系列API接口,帮助开发者获取平台数据,其中畅销榜API尤为关键。通过注册开发者账号、创建应用并申请权限,可使用HTTP POST请求获取商品的销售排名、价格等信息。Python代码示例展示了如何构建和发送请求,并处理返回的XML或JSON数据。注意遵守亚马逊的频率限制、数据准确性和合规性要求,以确保安全合法地利用这些数据支持电商业务决策。
63 1
|
1月前
|
JSON 监控 API
获取1688商品SKU信息API接口及实战应用
在电商蓬勃发展的今天,数据成为宝贵的财富。1688作为国内知名批发采购平台,提供商品SKU信息API接口,可获取库存、价格、规格等关键数据,助力电商运营、市场分析和价格监控。本文介绍如何注册1688开放平台账号、创建应用并获取AppKey/AppSecret,申请API权限,使用Python实现接口调用,处理响应数据,并注意请求频率限制和错误处理。通过该接口,可为电商运营和数据分析提供有力支持。
70 2
|
2月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
522 2
探索Flink动态CEP:杭州银行的实战案例
|
1月前
|
安全 程序员 API
API对于程序员的多元用法:从基础到实战
API(应用程序编程接口)是现代软件开发中不可或缺的工具,充当不同系统间沟通的桥梁。通过API,程序员可以轻松获取外部数据、扩展应用功能、实现微服务间的通信等,极大提升开发效率和应用功能性。常见的API类型包括Web API、本地API和第三方API。使用API,开发者能快速集成复杂功能(如支付、物流跟踪),并确保数据安全与管理。掌握API的开发、维护及安全管理技巧,对构建高效、稳定的应用至关重要。随着数字化进程加速,API的重要性将不断提升。
40 1
|
2月前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。
|
3月前
|
JSON BI API
商城上货API接口的实战案例
在商城上货过程中,API接口扮演着至关重要的角色。以下是对商城上货API接口的实战分析,涵盖其主要功能、类型、安全性以及实战案例等方面。

相关产品

  • 实时计算 Flink版