Flink Table API 编程| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink Table API 编程。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink Table API 编程】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10033


Flink Table API 编程

 

目录

一、什么是 Table API

二、Table API 编程

三、Table API 动态

 

一、 什么是 Table API

(1) Flink API 总览

由上到下,表达能力增强,可以用来操作 Timer 等复杂的操作。

Date Stream 提供一些 Window。SQL 和 Table API 使用起来是最便捷的,它自身有很多特点。

1. 声明式-用户只关心做什么,不用关心怎么做

2.高性能-支持查询优化,可以获取更好的执行性能

只需要描述做一些聚合的操作,不需要关心底层怎么实现。Table 和底层有一个优化器,例如:写了一个聚合操作,写了两个 Count,多写了一个 Count,查询优化器在优化的时候就会发现重复的聚合操作,在最终执行的时候,只会用到一个聚合操作。输出的时候把相同的一个值输出两次即可,程序的性能会更好。 

流批统-相同的统计逻辑,既可以流模式运行,也可以批模式运行,方便开发

4.标准稳定-语义遵循 SL 标准,不易变动。不用考虑 API 兼容问题。

5.易理解一语义明确,所见即所得。很清晰知道背后做 Count 操作。

Table API:

tab.groupBy("word")

.select("word, count(1)as count")

SQL:

SELECT word, COUNT(")AS cnt

FROM MyTable

GROUP BY word

(2) Table API 的特性

1.Table API 使得多声明的数据处理写起来比较容易

比如做一些过滤的操作,把 a<10 的数据筛选出来,然后写到一个外部表里,a>10 进行过滤,插入到另外一张表。这种数据处理很简单,用了两行就可以把逻辑写出来。

2.Table 使得扩展标准 SQL 更为容易(当直仅当需要的时候)

扩展 SQL 并不是随意扩展,需要考虑标准 SQL 的语义,不能乱写,慎重采取,当且仅当需要的时候。

总的来说,Table API 和 SQL 比较,Table API 是 SQL 的一个抄写。SQL 里面的操作 Table API 里面也有,他是 Flink 自身的一套 API,所以可以有一些易用性和功能性的提升。

 

二、 Table API 编程

(1) WordCount示例

https: //github.com/hequn8128/TableApiDemo

impart org. apache. flink. api. commontypeinfo. Types;

import org. apache. flink. api. Java. ExecutionEnvironnent;

inport oro. apache. funk. table. ap1. Table:

import org. apache. flink. table. api. java.

BatchTablcEnvironnent;

import org. apache. flink. tabledescriptors. FileSystem;

inport org. apache. flink, table. descriptor.oldcsv;

Amport oro. apache. flink, table. descriptors. Schema;

import org. apache. flink. types. Row;

publit class JavaBatchwordCount

public static veid main(String [ args) throws Exception

ExecutionEnvironment env= Executio. getExecutionEnviromm

BatchTableenvironment tEnv= BatchTableEn. createenv

String path= JavaBatchkordCount. class. getClassLoader(). getResource(

tEny. connect(new Filesystem() path(path))

1thFormat( Oldcsv() field( fieldName "word", Types. STRING).

withschema(new Schema() ficld( fieldName: "word", Types. STRING))

registerTableSource( name: "fileSource"):

Table result tEnv. scan( strings: "fileSource")

groupBy( $ "word")

select("word, count(1) as count");

tEnv. toDataSet(result, Row. class).print()

第十五,十六行是对 environment 的初始化,一开始会返回一个执行的环节,拿到执行环节后,再用 Batch Tab environment,都是 java 版本。

一开始初学者可能会调入坑中,目前的 environment 很多。

batch

a. java

org.apache flink.api.java. ExecutionEnvironment

b. scala

org.apache. flink.table.api.java. Batch TableEnvironment

stream

a.java

org.apache.flink.api.scala. ExecutionEnvironment

orgapache. flink.table.api.scala. BatchTableE

b.scala

org. apache. flink. streaming. api.environment. StreamExecutionEnvironment

org.apache. flink.table.api.java. Stream

org. apache. flink. streaming. api.scala .StreamExecutionEnvironment

org.apache.flink.table.api.scala.StreamTable Environment

大致概括了一下,进行分类,每一种都有自己特有的 Environment。使用过程中,注意不要写错。而且对于流和批来说,一定区分开。

可以用一个 Environment,用户操作多个容易搞错。

Table Environment 使用优化

https: //mal. google. com/mail/u/o/?tab searchAabel%3Aflink-dev+table+ environment/ FMfcgxvz MBlhTWVjxlzCnVZLvvDkmph

https: //cwiki apache  org/confluence/d

.32%3A+Restructure+flink-table+ for+ future+ contributions.

拿到 environment 后,注册 keep source。首先指定一个文件的路径,指定格式,指定文件对应的 schema。简单的例子,只有单词一列,类型是 STRING 类型。定义后,把 source 注册到 environment 里面去。

18~22行把 source 注册好了,可以通过 environment 的 schema 的 file source 来拿到 Table,拿到 Table 对象之后,可以执行 Table 的一些操作。

把 Table 输出,输出成 Dateset。到这里就是完整的例子。

(2)Table API 操作

1. how to get a Table?

可以认为 Table 是从 Table environment 里面 scan 出来的,scan 里的 Table 又是注册进去的。注册 Table 方式有三种。

Table myTable tableEnvironment.scan(MyTable")

a.Table descriptor

tEnv

.connect(

new FileSystem()

.path(path))

.with Format(

new OldCsv()

field("word", Types. STRING)

lineDelimiter("\n"))

.withSchema(

new Schema(

.field("word", Types.STRING)

register TableSource("source Table)

指定格式和 schema,注册到 environment 里,如果需要自定义的 source,可以根据 Table source 的接口写自己的自定义的 Table source,然后可以用 Table environment 的 Table source 把自己的 Table source 注册到 environment 里,然后 scan 出来。

b. User defined table source

Tablesource csvSource new Csv TableSource(

path,

new StringU("word"}

new Typelnformation [KTypes.STRING))

tEnv.registerTableSource("source Table2", csvSource

c. Register a DataStream

Datastreamsstring stream

/register the DataStream as table "myTable3"with

fields"word

tableEmv

registerDataStream("my Table3", stream, "word"):

命名 my Table3,对应的只有一列 word。

所以这三种方式都可以注册,就可以拿到 Table。也可以用这三种方式输出 Table。

2. how to emit a Table

当拿到结果表的时候,result Table 是 Table 的类型。执行 insertInto,跟 SQL 很相似,所以他的 API 都是根据标准的 SQL 来定义的。insertInto 到一个目标表里去。以下三种方式和注册 Table 大致是一样的。

resultTable. insertInto( "TargetTable");

a.Table descriptor tEnv

.connect(

new FileSystem()

.path(path))

.with Format(

new OldCsv()

field("word", Types. STRING)

.lineDelimiter("\n"))

.withSchema(

new Schema()

field("word", Types.STRING))

.registerTableSink("targetTable");

b.User defined table sink

TableSink csvSink new CsvTableSink(

path,

new StringU("word")

new Typelnformation[](Types. STRING);

tEnv.registerTable Sink("sinkTable2", csvSink);

c. emit to a DataStream

emit the result table to a Datastream

DataStream<Tuple2<Boolean, Row>> stream

tableEnv

toRetractStream(result Table, Row.class),

Boolean可以表明是一个ant消息或者是delete消息。

3. how to query Table 

Table

-select

-as

-filter

-where

-groupBy

-distinct

-join(inner, left, right, full)

- joinLateral(inner, left)

-minus

-minusAll

-union

-unionAll

-window(OverWindow)

-window(GroupWindow)

-dropColumns

-map/flatmap

Table 是有很多操作,这些操作跟标准 SQL 定义是一样的。除此之外,做 group By 的时候生成 GroupdTable,它只会有 select 的操作又会转成 Table,然后在调用这些操作。

对于 over WindowTable,只会有 select 的操作,又会转成 Table。Group Window Table 然后做 group By 得到 Window GroupTable,然后得到 select 操作,最后转成 Table。

可以大致看下 Flink 代码。

Flink 里面可以找到 Flink 的类,这个类里面定义了很多方法。比如 Group by 一些字段,返回 GroupTable,可以看到 Group Table 里面只有 select 的操作。select 之后返回的是一个 Table。

一个 Table 之外为什么还需要很多T able?

拥有隐身 Table,保证 API 操作的便利性,如果只有一个 Table,Groupby 返回的还是一个 Table,再继续 groupby,再定义一个 groupBy,就没办法回去了。更方便用户写出正确的 API。

API 操作可以分类。

a. 和 SQL 对齐的操作

SQL 操作比较熟悉,下次课会讲。

b. 提升  Table API 易用性的操作

Columns Operation-易用性

假设有一张100列的表我们需要去掉一列需要怎么操作?

Operators

exemples

AddColumns

 Table orders= tableEnv.scan("Orders");

 Table result =orders. addColumns ("concat(c, 'sunny') as desc");

AddOrReplaceColumns

 Table orders =tableEnv.scan("Orders");

 Table result= orders. addOrReplaceColumns("concat(c, 'sunny'") as desc");

DropColumns

 Table orders =tableEnv.scan("Orders");

 Table result= orders. dropColumns("b, c");

RenameColumns

 Table orders =tableEnv.scan("Orders");

 Table result =orders. renameColumns("b as b2, c as c2");

只用一个新的 API,比如 dropcolumns。scan 出 order 一个表,把里面 b, c 两列去掉,就不需要把其他的列选出来了。

对于 AddColumns 来说,可以添加一个新的列,比如叫 desc,它有一个要求,不能跟之前原有的列重合。原先order里面也有一个 desc 的列,就会错误。

如果业务上先加的列覆盖原有的列,用 AddOrReplaceColumns 操作,如果 orders 表里面也有这个列的话,会自动覆盖。

RenameColumns 希望给某些列重命名。把 b 叫做 b2,c 叫做 c2,所以 Table 上 Columns Operation 提升易用性。

假设有一张100列的表,我们需要选择第20第80列,需要怎么操作?

SYNTAX

DESC

withColumns(...)

 select the specified columns

 

withoutColumns(...)

deselect the columns specified

一个是选择,另一个是反选,就比如刚才选择20-80列可以用 withColumns。

假设一个圆表里面又 abcde,selectwithColumns(2 to4),可以得到bcd三列。如果用 withoutColumns,得到 ae。

Syntax

The proposed column operation syntax is as follows:

columnOperation:

withColumns(columnExprs)/ withoutColumns(columnExprs)

columnExprs

columnExpr.[ columnExpr]"

columnExpr:

columnRef |columnindex to columnIndex |columnName to columnName

columnRef:

columnName(The field name that exists in the table) columnindex(a positive

integer starting at 1)

Example: withColumns(a, b, 2 to 10, w to 2)

可以是 name 也可以是 name 范围。

总结

API

Example

Columns Operation

 table.dropColumns('a,b)

 

Columns Function

table.select(withColumns('a, 1 to 10)

 

Row-based operation

Map operation-易用性

Method signature

方法签名

del map(scalarFunction: Expression): Table

Usage

用法

val res lab

.map(fun().as(a, b, 'c)

.select('a,)

Benefit

好处

table.select((), udf20), udf3()....)

VS

table.map(udt())

class MyMap extends ScalarFunction

var param: String=

def evaluser defined inputs)row={

val result new Row(3)

//Business processing based on data and parameters

//根据数据和参数进行业务处理

result

override def get ResultType(signature: Array[Class_]):

TypelnformationL]={

Types. ROW(Types.5TRING, Types.INT, Types.LONG)

eval 接受一些参数,并输入,进行业务逻辑的处理,再把最终的结果返回出来。

getResult Type 的方法根据结果指定类型。row 里面有三列,三个类型。

拿到一个 tab,map 里面可以看方法签名,传入一个 scala Function。返回三列,名字 abc,可以做接下来的操作,比如 select。如果列很多的时候,每一列都要进行 udf,并且返回一个结果的时候,如果不用 map,用 scala,需要把每一列写出来。现在直接点 map 和 udf。

需要说明,map 和 Date map 差不多,输入一条输出一条,类似映射的关系。是不是有其他的算子,输入一条,输出多条。

Method signature

方法签名

def flatMap( tableFunction: Expression): Table

Usage

用法

val res tab

.flatMap(fun('e,' f).as(name,'age)

.select('name,'age)

Benefit

好处

table.joinLateral(udtf) VS table. flatMap(udtf())

case class User(name: String, age: Int)

class MyFlatMap extends TableFunction[User)

def eval([user defined inputs]): Unit=

for(..)

collect(User(name, age))

))

传入的是一个 Table Function,实现的例子里继承的是 Table Function,跟刚才的 scala Function 有点类似。但是这里没有 gapresulttap,因为这里返回值的类型是 u  的类型,Flink 可以自动分析出它的类型,名字也可以分析出来,这样就不用定义 gapresulrta p。

定义要 Table Function,点 flatmap 传入一个 Table Function,可以直接 select 出来。

Aggregate operation-易用性

Method signature

方法签名

def aggregate(aggregate Function): Expression: AggregatedTable

class AggregatedTable(table: Table, groupKeys Seq[Expression], aggFunction: Expression)

Usage

用法

val res= tab

.groupBy('a)

.aggregate(agg('e,' f) as ('a, 'b, 'c))

.select('a,'c)

Benefit

好处

table.select(agg1(), agg20), agg30)...)

VS

table.aggregate(agg())

class CountAccumulator extends JTuple1(Long)

f0 OL //count

class CountAgg extends AggregateFunction[JLong, CountAccumulator]

def accumulate(acc: CountAccumulator): Unit={

acc.f0+=1L

override def getValue(acc: CountAccumulator): JLong={

acc.fo

INPUT(Row) OUTPUT(Row)

. retract()/merge()

N(>=0)

}

方法签名接受一个 aggregate Function,举了一个 Count 例子,首先需要定义 Accumulator,累加器。一开始初始化为0,定义好之后定义 Count 聚合函数。

有两个方法,一个是 accumulator 怎么计算,对 Count 来说加一,最终 getvalue 把结果返回。

用法上tab点group by,aggregate把Function传进来,重命名结果,最后进行select操作。

aggregate Function可以接受多行输出一行。

有没有操作输入多行同时输出多行呢?

通常 groupby t 之后,求 topn,就是这种聚合操作。这个操作提供了新的 operation,类似于 flat aggregate。

c.增强 Table API 功能性的操作

FlatAggregate operation功能性

Method signature

方法签名

def flatAggregate(tableAggregateFounction Expression): FlatAggregateTable

class FlatAggregateTable(table: Table, groupKey: SeqlExpression], tableAggFun: Expression)

Usage

用法

val res =tab

groupBy('a)

flatAggregate( flatAggFunc('e,' f) as ('a, 'b, c))

.select(a, c)

Benefit

好处

功能性提升。如果不增加这种 Operation,其他没有这种语义,没办法输入多行聚合输出多行,所以是一种功能性填充。

增了一种 agg能输出多行

class TopNAcc{

var data: MapView[JInt, JLong] =_/ (rank-> value),

class TopN(n: Int) extends TableAggregateFunction[(, Long),

TopNAccum]

def accumulate(acc: TopNAcc, [user defined inputs])

def emitValue(acc: TopNAcc, out: Collector[(Int, Long)): Unit={

INPUT(Row) OUTPUT(Row)

..retract/merge

它的参数需要 tableAggregateFounction,与前面有很大的相似之处,先定义一个 topNacc,然后里面有一个accumulator 处理方法,不断地来,不断进行操作。

区别 emitvalue 的方法,可以拿到 collector,之后多次输出结果。

Aggregate VS TableAggregate

Aggregate 做一个 max 的操作,然后用 TableAggregate 做 Top2 的操作。这里有个输入表,对 price 求最大值,或者求 top2。

看一下 max 操作,第一步 creatAccumulator,创建累加器,第二步在累加器上进行 Accumulate 操作,比如数据依次输入到 agg 里。

第三步 getValue,得到最后结果 

总的区别就是getValue和emitvalue。

总结

 

 Single Row Input

单行输入

 Multiple Row Input

多行输入

 Single Row Output

单行输出

 ScalarFunction

 (select/map)

 AggregateFunction

 (select/aggregate)

 Multiple Row output

多行输出

 TableFunction

 oinLateral/flatmap)

 TableAggregateFunction

 (flatAggregate)


三、 Table API 动态

3.1 Flip29

https: //issues. apache. org/iira/browse/flink-10972

3.2 Python Table API

https: //issues apache.org/iira/browse/flink-12308

3.3 Interactive Programming(交互式编程)

https: //issues. apache org/iira/browse/flink-11199

交互式编程提供一个 catch 的算子,Table 点 catch 的操作,把之前操作保存。可以便捷操作 API,提高程序性能。 

3.4 Iterative processing(迭代计算)

https://issues.apache.org/iira/browse/FLINK-11199

在机器学习上用的较多。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
Java API
掌握Java 8 Stream API的艺术:详解流式编程(三)
掌握Java 8 Stream API的艺术:详解流式编程
16 2
|
2月前
|
算法 Linux API
【Linux系统编程】Linux下删除文件的 API方式以及文件删除机制差异
【Linux系统编程】Linux下删除文件的 API方式以及文件删除机制差异
43 0
|
22小时前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
109 3
|
2天前
|
Linux API
Linux系统编程之文件编程常用API回顾和文件编程一般步骤
Linux系统编程之文件编程常用API回顾和文件编程一般步骤
Linux系统编程之文件编程常用API回顾和文件编程一般步骤
|
15天前
|
存储 算法 API
Flink DataStream API 批处理能力演进之路
本文由阿里云 Flink 团队郭伟杰老师撰写,旨在向 Flink Batch 社区用户介绍 Flink DataStream API 批处理能力的演进之路。
261 1
Flink DataStream API 批处理能力演进之路
|
18天前
|
存储 SQL Java
Java8 Stream API 详解:流式编程进行数据处理
Java8 Stream API 详解:流式编程进行数据处理
|
19天前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
|
19天前
|
Java 大数据 API
[AIGC] Flink入门教程:理解DataStream API(Java版)
[AIGC] Flink入门教程:理解DataStream API(Java版)
|
1月前
|
存储 Java BI
掌握Java 8 Stream API的艺术:详解流式编程(二)
掌握Java 8 Stream API的艺术:详解流式编程
28 1
|
1月前
|
存储 Java 关系型数据库
掌握Java 8 Stream API的艺术:详解流式编程(一)
掌握Java 8 Stream API的艺术:详解流式编程
51 1