3. 支持的数据类型
整体来看,DataStream中支持的数据类型,Table中也是都支持的,只不过在进行转换时需要注意一些细节。(1)原子类型 在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。
StreamTableEnvironment tableEnv = ...; DataStream<Long> stream = ...; // 将数据流转换成动态表,动态表只有一个字段,重命名为myLong Table table = tableEnv.fromDataStream(stream, $("myLong"));
(2)Tuple类型 当原子类型不做重命名时,默认的字段名就是“f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1的处理结果。Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名f0、f1、f2...。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。
StreamTableEnvironment tableEnv = ...; DataStream<Tuple2<Long, Integer>> stream = ...; // 将数据流转换成只包含f1字段的表 Table table = tableEnv.fromDataStream(stream, $("f1")); // 将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换 Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0")); // 将f1字段命名为myInt,f0命名为myLong Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
(3)POJO 类型 Flink也支持多种数据类型组合成的“复合类型”,最典型的就是简单Java对象(POJO 类型)。由于POJO中已经定义好了可读性强的字段名,这种类型的数据流转换成Table就显得无比顺畅了。将POJO类型的DataStream转换成Table,如果不指定字段名称,就会直接使用原始 POJO 类型中的字段名称。POJO中的字段同样可以被重新排序、提却和重命名。
StreamTableEnvironment tableEnv = ...; DataStream<Event> stream = ...; Table table = tableEnv.fromDataStream(stream); Table table = tableEnv.fromDataStream(stream, $("user")); Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));
(4)Row类型 Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的“模式结构”(Schema)。
4. 综合应用示例
现在,我们可以将介绍过的所有API整合起来,写出一段完整的代码。同样还是用户的一组点击事件,我们可以查询出某个用户(例如Alice)点击的url列表,也可以统计出每个用户累计的点击次数,这可以用两句SQL来分别实现。具体代码如下:
public class TableToStreamExample { public static void main(String[] args) throws Exception { // 获取流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取数据源 SingleOutputStreamOperator<Event> eventStream = env .fromElements( new Event("Alice", "./home", 1000L), new Event("Bob", "./cart", 1000L), new Event("Alice", "./prod?id=1", 5 * 1000L), new Event("Cary", "./home", 60 * 1000L), new Event("Bob", "./prod?id=3", 90 * 1000L), new Event("Alice", "./prod?id=7", 105 * 1000L) ); // 获取表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 将数据流转换成表 tableEnv.createTemporaryView("EventTable", eventStream); // 查询Alice的访问url列表 Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Alice'"); // 统计每个用户的点击次数 Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) FROM EventTable GROUP BY user"); // 将表转换成数据流,在控制台打印输出 tableEnv.toDataStream(aliceVisitTable).print("alice visit"); tableEnv.toChangelogStream(urlCountTable).print("count"); // 执行程序 env.execute(); } }
三、流处理中的表 我们可以将关系型表/SQL与流处理做一个对比,如表所示。
可以看到,其实关系型表和SQL,主要就是针对批处理设计的,这和流处理有着天生的隔阂。接下来我们就来深入探讨一下流处理中表的概念。
3.1 动态表和持续查询
流处理面对的数据是连续不断的,这导致了流处理中的“表”跟我们熟悉的关系型数据库中的表完全不同;而基于表执行的查询操作,也就有了新的含义。
1. 动态表(Dynamic Tables)
当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的SQL查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。动态表是Flink在Table API和SQL中的核心概念,它为流数据处理提供了表和SQL支持。我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态表则完全不同,它里面的数据会随时间变化。
2. 持续查询(Continuous Query)
动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的SQL查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”(Continuous Query)。对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”(snapshot),当作有限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,就构成了“持续查询”。
持续查询的步骤如下:
(1)流(stream)被转换为动态表(dynamic table); (2)对动态表进行持续查询(continuous query),生成新的动态表; (3)生成的动态表被转换成流。
这样,只要API将流和动态表的转换封装起来,我们就可以直接在数据流上执行SQL查询,用处理表的方式来做流处理了。
3.2 将流转换成动态表
为了能够使用SQL来做流处理,我们必须先把流(stream)转换成动态表。当然,之前在讲解基本API时,已经介绍过代码中的DataStream和Table如何转换;现在我们则要抛开具体的数据类型,从原理上理解流和动态表的转换过程。如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入(Insert)操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只能在后面追加;所以我们其实是通过一个只有插入操作(insert-only)的更新日志(changelog)流,来构建一个表。例如,当用户点击事件到来时,就对应着动态表中的一次插入(Insert)操作,每条数据就是表中的一行;随着插入更多的点击事件,得到的动态表将不断增长。
3.3 用SQL持续查询
1. 更新(Update)查询
我们在代码中定义了一个SQL查询。
Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");
当原始动态表不停地插入新的数据时,查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用toChangelogStream()方法。
2. 追加(Append)查询
上面的例子中,查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一个简单的条件查询,结果表中就会像原始表EventTable一样,只有插入(Insert)操作了。
Table aliceVisitTable = tableEnv.sqlQuery("SELECT url, user FROM EventTable WHERE user = 'Cary'");
这样的持续查询,就被称为追加查询(Append Query),它定义的结果表的更新日志(changelog)流中只有INSERT操作。追加查询得到的结果表,转换成DataStream调用方法没有限制,可以直接用toDataStream(),也可以像更新查询一样调用toChangelogStream()。
由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中只会包含插入INSERT操作,而没有更新UPDATE操作。所以这里的持续查询,依然是一个追加(Append)查询。结果表result如果转换成DataStream,可以直接调用toDataStream()方法。
3.4 将动态表转换为流
与关系型数据库中的表一样,动态表也可以通过插入(Insert)、更新(Update)和删除(Delete)操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。在Flink中,Table API和SQL支持三种编码方式:
仅追加(Append-only)流
仅通过插入(Insert)更改来修改的动态表,可以直接转换为“仅追加”流。这个流中发出的数据,其实就是动态表中新增的每一行。
撤回(Retract)流
撤回流是包含两类消息的流,添加(add)消息和撤回(retract)消息。具体的编码规则是:INSERT插入操作编码为add消息;DELETE删除操作编码为retract消息;而UPDATE更新操作则编码为被更改行的retract消息,和更新后行(新行)的add消息。这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流了。
更新插入(Upsert)流
更新插入流中只包含两种类型的消息:更新插入(upsert)消息和删除(delete)消息。所谓的“upsert”其实是“update”和“insert”的合成词,所以对于更新插入流来说,INSERT插入操作和UPDATE更新操作,统一被编码为upsert消息;而DELETE删除操作则被编码为delete消息。
需要注意的是,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流,我们调用toChangelogStream()得到的其实就是撤回流。而连接到外部系统时,则可以支持不同的编码方法,这取决于外部系统本身的特性。
四、时间属性和窗口
基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。在Table API和SQL中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。所以所谓的时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的DDL里直接定义为一个字段,也可以在DataStream转换成表时定义。一旦定义了时间属性,它就可以作为一个普通字段引用,并且可以在基于时间的操作中使用。时间属性的数据类型为TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)两种情况。
4.1 事件时间
事件时间属性可以在创建表DDL中定义,也可以在数据流和表的转换中定义。
1. 在创建表的DDL中定义
在创建表的DDL(CREATE TABLE语句)中,可以增加一个字段,通过WATERMARK语句来定义事件时间属性。具体定义方式如下:
CREATE TABLE EventTable( user STRING, url STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( ... );
这里我们把ts字段定义为事件时间属性,而且基于ts设置了5秒的水位线延迟。
2. 在数据流转换为表时定义
事件时间属性也可以在将DataStream 转换为表的时候来定义。我们调用fromDataStream()方法创建表时,可以追加参数来定义表中的字段结构;这时可以给某个字段加上.rowtime() 后缀,就表示将当前字段指定为事件时间属性。这个字段可以是数据中本不存在、额外追加上去的“逻辑字段”,也可以是本身固有的字段。不论那种方式,时间属性字段中保存的都是事件的时间戳(TIMESTAMP类型)。需要注意的是,这种方式只负责指定时间属性,而时间戳的提取和水位线的生成应该之前就在DataStream上定义好了。在代码中的定义方式如下:
// 方法一: // 流中数据类型为二元组Tuple2,包含两个字段;需要自定义提取时间戳并生成水位线 DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...); // 声明一个额外的逻辑字段作为事件时间属性 Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").rowtime()); // 方法二: // 流中数据类型为三元组Tuple3,最后一个字段就是事件时间戳 DataStream<Tuple3<String, String, Long>> stream = inputStream.assignTimestampsAndWatermarks(...); // 不再声明额外字段,直接用最后一个字段作为事件时间属性 Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").rowtime());
4.2 处理时间
在定义处理时间属性时,必须要额外声明一个字段,专门用来保存当前的处理时间。类似地,处理时间属性的定义也有两种方式:创建表DDL中定义,或者在数据流转换成表时定义。
1. 在创建表的DDL中定义
在创建表的DDL(CREATE TABLE语句)中,可以增加一个额外的字段,通过调用系统内置的PROCTIME()函数来指定当前的处理时间属性。
CREATE TABLE EventTable( user STRING, url STRING, ts AS PROCTIME() ) WITH ( ... );
2. 在数据流转换为表时定义
处理时间属性同样可以在将DataStream 转换为表的时候来定义。我们调用fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。代码中定义处理时间属性的方法如下:
DataStream<Tuple2<String, String>> stream = ...; // 声明一个额外的字段作为处理时间属性字段 Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime());
4.3 窗口(Window)
有了时间属性,接下来就可以定义窗口进行计算了。在DataStream API中提供了对不同类型的窗口进行定义和处理的接口,而在Table API和SQL中,类似的功能也都可以实现。
1. 分组窗口(Group Window,老版本)
在Flink 1.12之前的版本中,Table API和SQL提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在SQL中就是调用TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。以滚动窗口为例:TUMBLE(ts, INTERVAL '1' HOUR) 这里的ts是定义好的时间属性字段,窗口大小用“时间间隔”INTERVAL来定义。在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。基本使用方式如下:
Table result = tableEnv.sqlQuery( "SELECT " + "user, " + "TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " + "COUNT(url) AS cnt " + "FROM EventTable " + "GROUP BY " + // 使用窗口和用户名进行分组 "user, " + "TUMBLE(ts, INTERVAL '1' HOUR)" // 定义1小时滚动窗口 );
分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。
2. 窗口表值函数(Windowing TVFs,新版本)
从1.13版本开始,Flink开始使用窗口表值函数(Windowing table-valued functions, Windowing TVFs)来定义窗口。窗口表值函数是Flink定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数,关于这部分内容,我们会在11.6节进行介绍。目前Flink提供了以下几个窗口TVF:
滚动窗口(Tumbling Windows) 滑动窗口(Hop Windows,跳跃窗口) 累积窗口(Cumulate Windows) 会话窗口(Session Windows,目前尚未完全支持)
窗口表值函数性能得到了优化,拥有更强大的功能,可以完全替代传统的分组窗口函数。目前窗口TVF的功能还不完善,会话窗口和很多高级功能还不支持,不过正在快速地更新完善。可以预见在未来的版本中,窗口TVF将越来越强大,将会是窗口处理的唯一入口。在SQL中的声明方式,与以前的分组窗口是类似的,直接调用TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。下面我们就分别对这几种窗口TVF进行介绍。
(1)滚动窗口(TUMBLE)
滚动窗口在SQL中的概念与DataStream API中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。在SQL中通过调用TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。在SQL中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口TVF本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)