Apache Flink 零基础入门(七):Table API 编程

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 本文主要包含三部分:第一部分,主要介绍什么是 Table API,从概念角度进行分析,让大家有一个感性的认识;第二部分,从代码的层面介绍怎么使用 Table API;第三部分,介绍 Table API 近期的动态。

作者:程鹤群(军长)

文章概述:本文主要包含三部分:第一部分,主要介绍什么是 Table API,从概念角度进行分析,让大家有一个感性的认识;第二部分,从代码的层面介绍怎么使用 Table API;第三部分,介绍 Table API 近期的动态。文章结构如下:

  • 什么是 Table API

    • Flink API 总览
    • Table API 的特性
  • Table API 编程

    • WordCount 示例
    • Table API 操作

      • 如何获取一个 Table
      • 如果输出一个 Table
      • 如果查询一个 Table
  • Table API 动态

一、什么是 Table API

为了更好地了解 Table API,我们先看下 Flink 都提供了哪些 API 供用户使用。

1.Flink API 总览

01 flink_api.png

如图,Flink 根据使用的便捷性和表达能力的强弱提供了 3 层 API,由上到下,表达能力逐渐增强,比如 processFunction,是最底层的 API,表达能力最强,我们可以用他来操作 state 和 timer 等复杂功能。Datastream API 相对于 processFunction 来说,又进行了进一步封装,提供了很多标准的语义算子给大家使用,比如我们常用的 window 算子(包括 Tumble, slide,session 等)。那么最上面的 SQL 和 Table API 使用最为便捷,具有自身的很多特点,重点归纳如下:

02 table_api_and_sql.png

第一,Table API & SQL 是一种声明式的 API。用户只需关心做什么,不用关心怎么做,比如图中的 WordCount 例子,只需要关心按什么维度聚合,做哪种类型的聚合,不需要关心底层的实现。

第二,高性能。Table API & SQL 底层会有优化器对 query 进行优化。举个例子,假如 WordCount 的例子里写了两个 count 操作,优化器会识别并避免重复的计算,计算的时候只保留一个 count 操作,输出的时候再把相同的值输出两遍即可,以达到更好的性能。

第三,流批统一。上图例子可以发现,API 并没有区分流和批,同一套 query 可以流批复用,对业务开发来说,避免开发两套代码。

第四,标准稳定。Table API & SQL 遵循 SQL 标准,不易变动。API 比较稳定的好处是不用考虑 API 兼容性问题。

第五,易理解。语义明确,所见即所得。

2.Table API 特性

上一小节介绍了 Table API 和 SQL 一些共有的特性,这个小节重点介绍下 Table API 自身的特性。主要可以归纳为以下两点:

03 tableapi_properties.png

第一,Table API 使得多声明的数据处理写起来比较容易。

怎么理解?比如我们有一个 Table(tab),并且需要执行一些过滤操作然后输出到结果表,对应的实现是:tab.where(“a < 10”).inertInto(“resultTable1”);此外,我们还需要做另外一些筛选,然后也对结果输出,即 tab.where(“a > 100”).insertInto(“resultTable2”)。你会发现,用 Table API 写起来会非常简洁方便,两行代码就把功能实现了。

第二,Table API 是 Flink 自身的一套 API,这使得我们更容易地去扩展标准的 SQL。当然,在扩展 SQL 的时候并不是随意的去扩展,需要考虑 API 的语义、原子性和正交性,并且当且仅当需要的时候才去添加。

对比 SQL,我们可以认为 Table API 是 SQL 的超集。SQL 有的操作,Table API 可以有,然而我们又可以从易用性和功能性地角度对 SQL 进行扩展和提升。

二、Table API编程

第一章介绍了 Table API 相关的概念。这一章我们来看下如何用 Table API 来编程。本章会先从一个 WordCount 的例子出发,让大家对 Table API 编程先有一个大概的认识,然后再具体介绍一下 Table API 的操作,比如,如何获取一个 Table,如何输出一个 Table,以及如何对 Table 执行查询操作。

1.WordCount举例

这是一个完整的,用 java 编写的 batch 版本的 WordCount 例子,此外,还有 scala 和 streaming 版本的 WordCount,都统一上传到了 github 上(https://github.com/hequn8128/TableApiDemo),大家可以下载下来尝试运行或者修改。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaBatchWordCount {   // line:10

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath();
        tEnv.connect(new FileSystem().path(path))
            .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
            .withSchema(new Schema().field("word", Types.STRING))
            .registerTableSource("fileSource");  // line:20

        Table result = tEnv.scan("fileSource")
            .groupBy("word")
            .select("word, count(1) as count");

        tEnv.toDataSet(result, Row.class).print();
    }
}

我们具体看下这个 WordCount 的例子。首先,第13、14行,是对 environment 的一些初始化,先通过 ExecutionEnvironment 的 getExecutionEnvironment 方法拿到执行环境,然后再通过 BatchTableEnvironment 的 create 拿到对应的 Table 环境,拿到环境后,我们可以注册 TableSource、TableSink 或执行一些其他操作。

这里需要注意的是,ExecutionEnvironment 跟 BatchTableEnvironment 都是对应 Java 的版本,对于 scala 程序,这里需要是一个对应 scala 版本的 environment。这也是初学者一开始可能会遇到的问题,因为 environent 有很多且容易混淆。为了让大家更好区分这些 environment,下面对 environment 进行了一些归纳。

004 table_environment.png

这里从 batch/stream,还有 Java/scala,对 environment 进行了分类,对于这些 environment 使用时需要特别注意,不要 import 错了。environment 的问题,社区已经进行了一些讨论,如上图下方的链接,这里不再具体展开。

我们再回到刚刚的 WordCount 的例子,拿到 environment 后,需要做的第二件事情是注册对应的TableSource。

tEnv.connect(new FileSystem().path(path))
    .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
    .withSchema(new Schema().field("word", Types.STRING))
    .registerTableSource("fileSource");

使用起来也非常方便,首先,因为我们要读一个文件,需要指定读取文件的路径,指定了之后,我们需要再描述文件内容的格式,比如他是 csv 的文件并且行分割符是什么。还有就是指定这个文件对应的 Schema 是什么,比如只有一列单词,并且类型是 String。最后,我们需要把 TableSource 注册到 environment 里去。

Table result = tEnv.scan("fileSource")
    .groupBy("word")
    .select("word, count(1) as count");

tEnv.toDataSet(result, Row.class).print();

通过 scan 刚才注册好的 TableSource,我们可以拿到一个 Table 对象,并执行相应的一些操作,比如 GroupBy,count。最后,可以把 Table 按 DataSet 的方式进行输出。

以上便是一个 Table API 的 WordCount 完整例子。涉及 Table 的获取,Table 的操作,以及 Table 的输出。接下来会具体介绍如何获取 Table、输出 Table 和执行 Table 操作。

2.如何获取一个Table

获取 Table 大体可以分为两步,第一步,注册对应的 TableSource;第二步,调用 Table environement 的 scan 方法获取 Table 对象。注册 Table Source 又有3种方法:通过 Table descriptor 来注册,通过自定义 source 来注册,或者通过 DataStream 来注册。具体的注册方式如下图所示:

04 get_table.png

3.如何输出一个Table

对应输出 Table,我们也有类似的3种方法:Table descriptor, 自定义 Table sink 以及输出成一个 DataStream。如下图所示:

05 emit_table.png

4.如何操作一个Table

4.1 Table 操作总览

第2、3节介绍了如何获取和输出一个 Table,本节主要介绍如何对 Table 进行操作。Table 上有很多操作,比如一些 projection 操作 select、filter、where;聚合操作,如 groupBy、flatAggrgate;还有join操作,等等。我们以一个具体的例子来介绍下 Table 上各操作的转换流程。

06 api_on_table.png

如上图,当我们拿到一个 Table 后,调用 groupBy 会返回一个 GroupedTable。GroupedTable 里只有 select 方法,对 GroupedTable 调用 select 方法会返回一个 Table。拿到这个 Table 后,我们可以再调用 Table 上的方法。图中其他 Table,如 OverWindowedTable 也是类似的流程。值得注意的是,引入各个类型的 Table 是为了保证 API 的合法性和便利性,比如 groupBy 之后只有 select 操作是有意义的,在编辑器上可以直接点出来。

前面我们提到,可以将 Table API 看成是 SQL 的超集,因此我们也可以对 Table 里的操作按此进行分类,大致分为三类,如下图所示:

07 table_method_classification.png

第一类,是跟 SQL 对齐的一些操作,比如 select, filter, join 等。第二类,是一些提升 Table API 易用性的操作。第三类,是增强 Table API 功能的一些操作。第一类操作由于和 SQL 类似,比较容易理解,其次,也可以查看官方的文档,了解具体的方法,所以这里不再展开介绍。下面的章节会重点介绍后两类操作,这些操作也是 Table API 独有的。

4.2 提升易用性相关操作

介绍易用性之前,我们先来看一个问题。假设我们有一张很大的表,里面有一百列,此时需要去掉一列,那么SQL怎么写?我们需要 select 剩下的 99 列!显然这会给用户带来不小的代价。为了解决这个问题,我们在Table上引入了一个 dropColumns 方法。利用 dropColumns 方法,我们便可以只写去掉的列。与此对应,还引入了 addColumns, addOrReplaceColumns 和 renameColumns 方法,如下图所示:

08 dropColumns.png

解决了刚才的问题后,我们再看下面另一个问题:假设还是一张100列的表,我们需要选第20到第80列,那么我们如何操作呢?为了解决这个问题,我们又引入了 withColumns 和 withoutColumns 方法。对于刚才的问题,我们可以简单地写成 table.select(“withColumns(20 to 80)”)。

09 withColumns.png

4.3 增强功能相关操作

该小节会介绍下 TableAggregateFunction 的功能和用法。在引入 TableAggregateFunction 之前,Flink 里有三种自定义函数:ScalarFunction,TableFunction 和 AggregateFunction。我们可以从输入和输出的维度对这些自定义函数进行分类。如下图所示,ScalarFunction 是输入一行,输出一行;TableFunction 是输入一行,输出多行;AggregateFunction 是输入多行输出一行。为了让语义更加完整,Table API 新加了 TableAggregateFunction,它可以接收和输出多行。TableAggregateFunction 添加后,Table API 的功能可以得到很大的扩展,某种程度上可以用它来实现自定义 operator。比如,我们可以用 TableAggregateFunction 来实现 TopN。

10 table_aggregate_function.png

TableAggregateFunction 使用也很简单,方法签名和用法如下图所示:

11 flatAggregate.png

用法上,我们只需要调用 table.flatAggregate(),然后传入一个 TableAggregateFunction 实例即可。用户可以继承 TableAggregateFunction 来实现自定义的函数。继承的时候,需要先定义一个 Accumulator,用来存取状态,此外自定义的 TableAggregateFunction 需要实现 accumulate 和 emitValue 方法。accumulate 方法用来处理输入的数据,而 emitValue 方法负责根据 accumulator 里的状态输出结果。

三、Table API 动态

最后介绍下 Table API 近期的动态:

1.Flip-29

主要是 Table API 功能和易用性的增强。比如刚刚介绍的 columns 相关操作,还有 TableAggregateFunction。

社区对应的 jira 是:https://issues.apache.org/jira/browse/FLINK-10972

2.Python Table API

希望在 Table API 上增加 python 语言的支持。这个应该是 Python 用户的福音。

社区对应的 jira 是:https://issues.apache.org/jira/browse/FLINK-12308

3.Interactive Programming(交互式编程)

即 Table 上会提供一个 cache 算子,执行 cache 操作可以缓存 table 的结果,并在这个结果上做其他操作。社区对应 jira 是:https://issues.apache.org/jira/browse/FLINK-11199

4.Iterative Processing(迭代计算)

Table 上会支持一个 iterator 的算子,该算子可以用来执行迭代计算。比如迭代 100 次,或者指定一个收敛的条件,在机器学习领域使用比较广泛。社区对应 jira 是:https://issues.apache.org/jira/browse/FLINK-11199


▼ Apache Flink 社区推荐 ▼

Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 重磅开启,目前正在征集议题,限量早鸟票优惠ing。了解 Flink Forward Asia 2019 的更多信息,请查看:

https://developer.aliyun.com/special/ffa2019

首届 Apache Flink 极客挑战赛重磅开启,聚焦机器学习与性能优化两大热门领域,40万奖金等你拿,加入挑战请点击:

https://tianchi.aliyun.com/markets/tianchi/flink2019

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
1月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
321 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
284 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
2月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
1094 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
3月前
|
消息中间件 存储 Kafka
Apache Flink错误处理实战手册:2年生产环境调试经验总结
本文由 Ververica 客户成功经理 Naci Simsek 撰写,基于其在多个行业 Flink 项目中的实战经验,总结了 Apache Flink 生产环境中常见的三大典型问题及其解决方案。内容涵盖 Kafka 连接器迁移导致的状态管理问题、任务槽负载不均问题以及 Kryo 序列化引发的性能陷阱,旨在帮助企业开发者避免常见误区,提升实时流处理系统的稳定性与性能。
352 0
Apache Flink错误处理实战手册:2年生产环境调试经验总结
|
SQL 消息中间件 API
Flink关系型API的公共部分
关系型程序的公共部分 下面的代码段展示了Table&SQL API所编写流式程序的程序模式: val env = StreamExecutionEnvironment.getExecutionEnvironment //创建TableEnvironment对象 val tableEnv = TableEnvironment.
2832 0
|
3月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
472 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3563 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
525 56

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多
    下一篇
    oss云网关配置