窗口函数
Flink SQL支持基于无限大窗口的聚合(无需显式定义在SQL Query中添加任何的窗口)以及对一个特定的窗口的聚合。例如,需要统计在过去的1分钟内有多少用户点击了某个的网页,可以通过定义一个窗口来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
Flink SQL支持的窗口聚合主要是两种:window aggregate和over aggregate。两者最核心的区别是,over aggregate从语义上保障了对每个输入都有一个输出。因此over agregate常被用于ranking及moving average等场景。
本节接下来主要介绍window aggregate。Window aggregate支持两种时间类型做窗口:Event Time和Processing Time。每种类型下,又分别支持三种窗口类型:滚动窗口(TUMBLE)、滑动窗口(HOP)和会话窗口(SESSION)。
时间类型
Flink SQL支持两种时间类型。
- Processing Time:系统对事件进行处理的本地系统时间。
- Event Time:您提供的事件时间(通常是数据的最原始的创建时间),event time一定是您提供在Schema里的数据。
下图中是不同时间属性在实时计算流程中的位置。
从图的定义可以看出,ingestion time和 processing time是系统为流记录增加的时间属性,您并不能控制。EventTime则是流记录本身携带的时间属性。由于数据本身存在乱序以及网络抖动等其它原因,event time为t1(对应partition1)时刻的纪录,有可能会晚于t2(对应partition2)时刻的被Flink处理,即t2 > t1。
基于Processing Time的Aggregate:
processing time是系统产生的,不在您的原始数据中,您需要显式定义一个processing time列。
filedName as PROCTIME()
这个定义需要在source的DDL中显式指明,示例如下。
CREATE TABLE tt_stream (
a varchar,
b varchar,
c BIGINT,
d AS PROCTIME()
) with (
type = 'SLS',
topic = 'blink_tt2tt_test',
accessId = '06221748XXXX',
accessKey = 'a62cfe86-ba5a-4eeXXXXXXX7b'
);
CREATE TABLE rds_output (
id varchar,
c TIMESTAMP,
f TIMESTAMP,
cnt BIGINT
) with (
type = 'rds',
url = 'jdbc:mysql://XXXXXXXXX:3306/test',
tableName = 'datahub2rds',
userName = 'tXXXX',
password = '1XXXXX6'
);
INSERT INTO rds_output
SELECT a AS id,
SESSION_START(d, INTERVAL '1' SECOND) AS c,
SESSION_END(d, INTERVAL '1' SECOND) AS f,
COUNT(a) AS cnt
FROM tt_stream
GROUP BY SESSION(d, INTERVAL '1' SECOND), a
基于Event Time的Aggregate:
Event Time是您的原始数据,您不需要显式重新定义一个event time列, 但必须指定watermark的计算方法。这是因为您的数据往往是乱序的,如果不配置一个watermark来合理的delay您的数据,数据聚合的结果可能存在很大的偏差。
Watermark
Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,watermark的定义是source表DDL定义的一部分。Flink提供了如下语法定义watermark。
WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
watermarkName
标识watermark的名字,可选。<rowtime_field>
必须是表中已定义的一列(当前仅支持为Timestamp
类型),含义是基于该列生成 watermark,并且标识该列为Event Time列,可以在后续query中用来定义窗口。withOffset
是目前提供的watermark的生成策略,是根据<rowtime_field> - offset
生成watermark的值。withOffset的第一个参数必须是<rowtime_field>
。offset
单位为毫秒,含义为watermark值与event time值的偏移量。
通常一条记录中的某个字段就代表了该记录的发生时间。例如,表中有个rowtime字段,类型为Timestamp,其中某个字段为1501750584000(2017-08-03 08:56:24.000)
。如果您需要定义一个基于该rowtime列的watermark,watermark策略为偏移4秒,需要如下定义。
WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
在这种情况下,这条数据的watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)
。这条数据的watermark时间含义即:timestamp小于1501750580000(2017-08-03 08:56:20.000)
的数据,都已经到达了。
注意:
- 在使用Event Time watermark时的rowtime必须是TIMESTAMP类型。当前支持毫秒级别的、在Unix时间戳里是13位的TIMESTAMP。如果是其他类型或是在Unix时间戳不是13位,建议使用计算列来做转换。
- Event Time和Processing Time的声明只能在源表上声明。
总结:
- WaterMark的含义是所有时间t’< t 的事件都已经发生。假如watermark t已经生效,那么后续eventTime小于t的记录都会被丢弃掉(目前Flink的处理是丢弃这些来的更晚的数据,后续支持用户配置让更晚的数据也能继续update)。
- 针对乱序的的流,WaterMark至关重要。即使一些事件延迟到达,也不至于过于影响window窗口的计算的正确性。
- 并行数据流中,当Operator有多个输入流时,Operator的event time以最小流event time为准。
示例
以下为一个使用event time聚合的示例。
CREATE TABLE tt_stream(
a varchar,
b varchar,
c timeStamp,
WATERMARK wk1 FOR c as withOffset(c, 1000)
) with (
type = 'SLS',
topic = 'blink_tt2tt_test',
accessId = '0622174XXXXXXTS',
accessKey = 'a62cfe86-bXXXXXXXb9fad2618e7b'
);
CREATE TABLE rds_output(
id varchar,
c TIMESTAMP,
f TIMESTAMP,
cnt BIGINT
) with (
type = 'rds',
url = 'jdbc:mysql://XXXXXXXX3306/test',
tableName = 'datahub2rds',
userName = 'XXXXXt',
password = '1XXXXX'
);
INSERT INTO rds_output
SELECT a AS id,
SESSION_START(c, INTERVAL '1' SECOND) AS c,
CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f,
COUNT(a) AS cnt
FROM tt_stream
GROUP BY SESSION(c, INTERVAL '1' SECOND), a