有了时间属性,我们就可以配合窗⼝来完成各种业务的计算,Flink API/SQL 提供了丰富的窗口操作。
窗口分类
Group Windows
所谓Group Windows (分组窗口),就是把event按照时间或者计数 分成若干个组,然后在对每个组执行窗口函数,Group Window 从键控来说,分为键控window 和 非键控window,按照窗口事件分配逻辑又分为若干类型。
键控分window和非键控window
按照是否先根据指定的键(字段/属性)分组再基于时间/计数构建的Window,可以把Group Window分为【键控 Window】【⾮键控Window】
有时把键控窗⼝跟翻滚窗⼝/滑动窗⼝搞混淆,其实是事物的不同层⾯:
概念 |
定义 |
|
键控window |
先根据指定的键(字段/属性)分组,再基于时间/计数构建的Window(双重分组) |
|
⾮键控Window | 不根据指定的键(字段/属性)分组,直接基于时间/计数构建的Window |
注意:窗⼝也是⼀种分组
按照event分配逻辑分类
时间窗⼝:根据时间对数据流进⾏行行分组切⽚片
翻滚时间窗⼝:Tumbling Time Window
滑动时间窗⼝:Sliding Time Window
会话窗⼝:Session Window
计数窗⼝口:根据元素个数对数据流进⾏行行分组切⽚片
翻滚计数窗:Tumbling CountWindow
滑动计数窗:Sliding CountWindow
注意:时间窗⼝口[start,end),左闭右开
滚动窗口
定义:将数据依据固定的窗⼝⻓度对⽆界数据流进⾏切⽚
特点:时间对⻬、窗⼝⻓度固定、event⽆重叠
适⽤场景:BI统计(计算各个事件段的指标)
滑动窗口
定义:是固定窗⼝的更⼴义的⼀种形式。滑动窗⼝由固定的窗⼝⻓度和滑动间隔组成
特点:窗⼝⻓度固定、event有重叠
适⽤场景: 监控场景,对最近⼀个时间段内的统计(求某接⼝最近5min的失败率来决定是否要报警
会话窗口
定义:类似于web应⽤ 的session,即⼀段时间没有接受到新数据就会⽣成新的窗⼝(固定gap/gap fun)
特点:时间⽆对⻬、event不重叠、没有固定开始和结束时间
使用场景: 线上用户行为分析
全局窗口
定义:有相同key的所有元素分配给相同的单个全局窗⼝
必须指定⾃定义触发器否则没有任何意义(不然不做计算)
注意:不要跟Non-keyed Window搞混,两个不同的角色
Over Windows
Over window 聚合是标准 SQL 中已有的(Over ⼦句),可以在查询的 SELECT ⼦句中定义。 Over window 聚 合,通过窗⼝聚合计算每个输入行在其相邻行范围内的聚合(如 sum等)
注意:
- 在流计算场景中,Over Window 不太常见
- Group window每个组聚合运算完输出⼀⾏最终结果,⽽在Over window每⼀⾏都会输出⼀⾏结果。
窗口使用
table api中使用窗口
Group Windows
编程模型
Tabletable=input .window([GroupWindoww].as("w")) // 定义⼀个group window并指定别名w .groupBy($("w")) // 按照窗⼝w对table进⾏分组 .select($("b").sum()); // select⼦句指定返回的列和聚合运算(⾮监控的)Tabletable=input .window([GroupWindoww].as("w")) // 定义⼀个group window并指定别名w .groupBy($("w"), $("a")) // 按照属性a和窗⼝w对table进⾏分组(这不就是键控window) .select($("a"), $("b").sum()); // select⼦句指定返回的列和聚合运算
另外,在select⼦句中,我们还可以返回Window的属性:start,end,rowtime
window属性 |
说明 |
获取方式 |
start |
窗口开始时间戳 |
窗口别名.start() |
end |
窗口结束时间戳 |
窗口别名.end() |
rowtime |
窗口的时间戳 |
窗口别名.rowtime() |
注意:
- 基于时间的窗⼝是左闭右开的,例如从9点开始创建⼀个1⼩时的窗⼝,则start为09:00:00.000,end为 10:00:00.000,rowtime为09:59:59.999.
- end的时间戳是不会被分组到这个窗口上的。
Tabletable=input .window([GroupWindoww].as("w")) // 定义⼀个group window并指定别名w .groupBy($("w"), $("a")) // 按照属性a和窗⼝w对table进⾏分组(这不就是键控window) .select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count()); //select⼦句返回字段a、窗⼝的开始时间戳、窗⼝的结束时间戳、窗⼝的时间戳,b字段的count
注意:我们到底取哪个时间戳是由业务决定的,⼀般是start。
GroupWindow参数定义了event划分到某个窗⼝的映射规则(基于时间或者计数),但是GroupWindow不是⼀个可 以实现的接⼝,Table API直接提供了⼀组预定义的具有特殊语意的Window类供我们直接使⽤(滚动窗⼝,滑动窗 ⼝等等)。
滚动窗口
滚动窗⼝通过Tumble类来定义,三个⽅法:
处理时间滚动窗口
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.Tumble; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importstaticorg.apache.flink.table.api.Expressions.$; importstaticorg.apache.flink.table.api.Expressions.lit; /*** 基于处理时间的滚动窗口*/publicclassFlinkTableTumbleWinBasePtTime { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、自定义窗口并计算Tableresult=table.window(Tumble .over(lit(5).second()) .on($("ptTime")) .as("w") ) .groupBy($("sensorID"), $("w")) .select($("sensorID"), $("temp").avg()); result.execute().print(); } }
事件时间滚动窗口
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner; importorg.apache.flink.api.common.eventtime.WatermarkStrategy; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.Tumble; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importjava.time.Duration; importstaticorg.apache.flink.table.api.Expressions.$; importstaticorg.apache.flink.table.api.Expressions.lit; /*** 基于事件时间的滚动窗口*/publicclassFlinkTableTumbleWinBaseEventTime { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读取数据并提取时间戳指定水印生成策略WatermarkStrategy<TempSensorData>watermarkStrategy=WatermarkStrategy .<TempSensorData>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(newSerializableTimestampAssigner<TempSensorData>() { publiclongextractTimestamp(TempSensorDataelement, longrecordTimestamp) { returnelement.getTp()*1000; } }); DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }).assignTimestampsAndWatermarks(watermarkStrategy); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("evTime").rowtime()//新增evTime字段为rowtime ); //5、自定义窗口并计算Tableresult=table.window(Tumble .over(lit(5).second()) .on($("evTime")) .as("w") ) .groupBy($("sensorID"), $("w")) .select($("sensorID"), $("sensorID").count()); //6、打印result.execute().print(); } }
计数滚动窗口
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.Tumble; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importorg.apache.flink.types.Row; importstaticorg.apache.flink.table.api.Expressions.*; /*** 基于计数的滚动窗口*/publicclassFlinkTableTumbleWinBaseCount { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、自定义窗口并计算Tableresult=table.window(Tumble .over(rowInterval(5L)) .on($("ptTime"))//基于计数的滚动窗口只能按照proctime属性字段排序 .as("w") ) .groupBy($("sensorID"), $("w")) .select($("sensorID"), $("sensorID").count()); //6、转换为流并打印tEnv.toDataStream(result, Row.class).print(); //7、执行env.execute("FlinkTableTumbleWinBaseEvTime"); } }
滑动窗口
滑动窗⼝通过Slide类来定义,关键⽅法如下:
方法 |
描述 |
over |
定义窗口的长度为事件或者计数间隔 |
every |
定义滑动步长(时间步长或计数步长)。滑动步长和窗口长度必须是同类型的 |
on |
指定时间属性字段用于分组(事件窗口)或者排序(计数窗口)。批处理时该字段可以是任意Long 或者 Timestamp类型的字段,流处理时必须是事先指定的时间属性字段 |
as |
为窗⼝分配别名。 别名⽤于在 groupBy⼦句中引⽤窗⼝,并可以在select() ⼦句中选择窗⼝属性, 例如 start,end, rowtime |
事件滑动窗口
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Slide; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importorg.apache.flink.types.Row; importstaticorg.apache.flink.table.api.Expressions.$; importstaticorg.apache.flink.table.api.Expressions.lit; /*** 基于时间的滑动窗口*/publicclassFlinkTableSlideWinBasePtTime { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、自定义窗口并计算Tableresult=table.window(Slide .over(lit(8).second()) .every(lit(2).second()) .on($("ptTime")) .as("w") ) .groupBy($("sensorID"), $("w")) .select($("sensorID"), $("sensorID").count()); //6、转换为流并打印tEnv.toDataStream(result, Row.class).print(); //7、执行env.execute("FlinkTableTumbleWinBaseEvTime"); } }
计数滑动窗口
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Slide; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importorg.apache.flink.types.Row; importstaticorg.apache.flink.table.api.Expressions.*; /*** 基于计数的滑动窗口*/publicclassFlinkTableSlideWinBaseCount { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、自定义窗口并计算Tableresult=table.window(Slide .over(rowInterval(8L)) .every(rowInterval(2L)) .on($("ptTime")) .as("w") ) .groupBy($("sensorID"), $("w")) .select($("sensorID"), $("sensorID").count()); //6、转换为流并打印tEnv.toDataStream(result, Row.class).print(); //7、执行env.execute("FlinkTableTumbleWinBaseEvTime"); } }
注意:所有类型的计数窗⼝只能是按照proctime排序,也就是计数窗⼝不能解决乱序
会话窗口
通过Session类定义会话窗⼝,⽅法如下:
注意:没有基于计数的会话窗⼝
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Session; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importorg.apache.flink.types.Row; importstaticorg.apache.flink.table.api.Expressions.$; importstaticorg.apache.flink.table.api.Expressions.lit; /*** 基于时间的Session窗口*/publicclassFlinkTableSessionWinBasePtTime { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、自定义窗口并计算Tableresult=table.window(Session .withGap(lit(5).second()) .on($("ptTime")) .as("w") ) .groupBy($("sensorID"), $("w")) .select($("sensorID"), $("sensorID").count()); //6、转换为流并打印tEnv.toDataStream(result, Row.class).print(); //7、执行env.execute("FlinkTableTumbleWinBaseEvTime"); } }
注意:基数event Time的窗⼝需要先指定时间属性和⽔印⽣成策略
Over windows
编程模型
Tabletable=input .window([OverWindoww].as("w")) // 定义⼀个Over Window并指定别名为w .select($("a"), $("b").sum().over($("w")), $("c").min().over($("w"))); //在Over Window执⾏聚合运算(返回a、b在窗⼝上求sum、c在窗⼝上求min)
OverWindow 定义了计算聚合的⾏范围。
Table API 提供 Over类来配置 over 窗⼝的属性。 可以在事件时间或处理 时间以及时间周期或⾏数的范围上定义窗⼝。
Over类的⽅法如下:
方法 |
是否必须 |
描述 |
|
partitionBy |
可选 |
根据⼀个或多个属性分区。 每个分区单独排序,聚合函数分别应⽤于每个分区。注 意:在流式环境中,只有当窗⼝包含 partition by ⼦句时,窗⼝聚合函数才能并⾏计 算。 如果没有 partitionBy(...),则流由单个⾮并⾏任务处理 |
|
orderBy |
是 |
定义每个分区中事件的顺序,即定义聚合函数应⽤于事件的顺序;对于流式查询,必须 是声明的事件时间或处理时间属性。 ⽬前,仅⽀持单个字段排序。 |
|
preceding |
可选 |
定义包含在窗⼝中并位于当前⾏之前的⾏的间隔(指定下限,往前多⻓时间或者多少 ⾏开窗)。 间隔可以指定为时间间隔或⾏计数间隔。有界窗⼝由间隔的⼤⼩指定,例 如,时间间隔为 10.minutes,⾏计数间隔为 10.rows。⽆界窗⼝使⽤常量指定, UNBOUNDED_RANGE ⽤于时间间隔或 UNBOUNDED_ROW ⽤于⾏计数间隔。 ⽆界 窗⼝从分区的第⼀⾏开始。如果省略preceding⼦句,则 UNBOUNDED_RANGE 和 CURRENT_RANGE 将⽤作窗⼝的默认preceding 和following的默认值。 |
|
following |
可选 |
定义包含在窗⼝中并位于当前⾏之后的⾏的间隔(指定上限,往后多⻓时间或者多少 ⾏开窗)。 间隔必须以与preceding间隔(时间或⾏数)相同的单位。注意:流处理环 境下,⽬前不⽀持往后开窗。我们保持默认值就好了 |
|
as |
必选 |
为窗⼝分配别名。 别名⽤于在 groupBy⼦句中引⽤窗⼝。 |
注意:⽬前,同⼀个 select() 调⽤中的所有聚合函数必须在同⼀个窗⼝中计算
实例伪代码:
// ⽆界Event-time排序开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_RANGE).as("w"));// ⽆界Processing-time排序开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w") ); // ⽆界Event-time排序Row-count开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_ROW).as("w") ); // ⽆界Processing-time排序Row-count开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(UNBOUNDED_ROW).as("w")); // 有界Event-time排序向前1分钟开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(lit(1).minutes()).as("w"));// 有界Processing-time排序向前1分钟开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(lit(1).minutes()).as( "w")); // 有界Event-time排序向前10⾏开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(rowInterval(10)).as("w"));// 有界Processing-time排序向前10⾏开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(rowInterval(10)).as("w"));
第一行到当前行开窗
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Over; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importstaticorg.apache.flink.table.api.Expressions.*; /*** 基于处理时间开窗1:第一行到当前行开窗*/publicclassFlinkTableOverWinBasePtTime1 { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、定义无界Over窗口(往前无界)Tableresult=table.window(Over .partitionBy($("sensorID")) .orderBy($("ptTime")) .preceding(UNBOUNDED_RANGE)//可以省略不写 .as("w") ).select( $("sensorID"), $("temp").max().over($("w")) ); result.execute().print(); } }
指定⾏到当前⾏开窗
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Over; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importorg.apache.flink.types.Row; importstaticorg.apache.flink.table.api.Expressions.*; /*** 基于处理时间开窗2:指定行到当前行开窗*/publicclassFlinkTableOverWinBasePtTime2 { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、定义无界窗口(往前无界)Tableresult=table.window(Over .partitionBy($("sensorID")) .orderBy($("ptTime")) .preceding(rowInterval(2L)) .as("w") ).select( $("sensorID"), $("temp").max().over($("w")) ); //6、转换为流并打印tEnv.toDataStream(result, Row.class).print(); //7、执行env.execute("FlinkTableOverWinBasePtTime1"); } }
flink sql 中使用窗口
Group windows
Flink SQL中通过Group Windows函数来定义分组窗⼝
分组窗口函数 |
描述 |
|
TUMBLE(time_attr, interval) |
定义滚动窗⼝。翻转窗⼝可以在事件时间(流 + 批处理)或处理时间(流)上定义。 |
|
HOP(time_attr, interval, interval) |
定义滑动窗⼝,名字⽐较有意思。可以在事件时间(流 + 批处理)或处理时间(流) 上定义滑动窗⼝。注意:第⼀个interval是滑动步⻓,第⼆个interval才是窗⼝⼤⼩ |
|
SESSION(time_attr, interval) |
定义会话窗⼝。会话窗⼝可以在事件时间(流 + 批处理)或处理时间(流)上⼯作。 |
特别注意:FlinkSQL只⽀持基于时间的分组窗⼝,对于批处理time_attr必须是TIMESTAMP类型的
另外还有⼀些辅助函数,可以在select⼦句中⽤来查询 Group Window 的开始和结束时间戳,以及时间属性 (start,end,rowtime,proctime)。
辅助函数 |
说明 |
|
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) |
返回窗⼝的开始时间戳(start),即窗⼝的下边界时间戳 |
|
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) |
返回窗⼝的结束时间戳(end),即窗⼝的上边界之外紧挨着的不包含在 本窗⼝的时间戳 注意:结束时间戳【不能】⽤作【后续】基于时间的操作中的⾏时间属 性,例如group window或者over window的聚合操作 |
|
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) |
返回窗⼝的上边界时间戳,以rowtime形式返回 注意:结果是rowtime时间属性,【可以】⽤做后续基于时间的操作,例 如group window 或者 over window的聚合操作等 |
|
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) |
返回 proctime 时间属性 注意: 可以用作后续基于时间的操作,例如group window或者over window的聚合操作等 |
注意:必须使⽤与 GROUP BY ⼦句中的窗⼝函数完全相同的参数来调⽤辅助函数。
CREATETABLEOrders ( userBIGINT, productSTRING, amountINT, order_timeTIMESTAMP(3), WATERMARKFORorder_timeASorder_time-INTERVAL'1'MINUTE) WITH (...); SELECTuser, TUMBLE_START(order_time, INTERVAL'1'DAY) ASwStart,//参数必须跟下⾯GROUP BY⼦句中的窗⼝函数参数⼀致SUM(amount) FROMOrdersGROUPBYTUMBLE(order_time, INTERVAL'1'DAY), user
滚动窗口
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importstaticorg.apache.flink.table.api.Expressions.*; /*** FlinkSQL基于处理时间的滚动窗口*/publicclassFlinkSQLTumbleWinBasePtTime { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); // Table result = table.window(Tumble// .over(rowInterval(5L))// .on($("ptTime"))//基于计数的滚动窗口只能按照proctime属性字段排序// .as("w")// )// .groupBy($("sensorID"), $("w"))// .select($("sensorID"), $("sensorID").count());//5、基于SQL的滚动窗口Tableresult=tEnv.sqlQuery("select "+"sensorID,"+"max(temp),"+"TUMBLE_START(ptTime, INTERVAL '5' second) as winstart "+"from "+table+" GROUP BY sensorID,TUMBLE(ptTime, INTERVAL '5' second)"); //6、指定并打印result.execute().print(); } }
滑动窗口
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.DataTypes; importorg.apache.flink.table.api.Schema; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importstaticorg.apache.flink.table.api.Expressions.$; /*** FlinkSQL基于处理时间的滑动窗口*/publicclassFlinkSQLHOPWinBasePtTime { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、基于SQL的滑动窗口(滑动步长在前)Tableresult=tEnv.sqlQuery("select "+"sensorID,"+"count(sensorID),"+"HOP_START(ptTime, INTERVAL '2' second,INTERVAL '4' second) as winstart "+"from "+table+" GROUP BY sensorID,HOP(ptTime, INTERVAL '2' second,INTERVAL '4' second)"); //6、指定并打印result.execute().print(); } }
会话窗口
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importstaticorg.apache.flink.table.api.Expressions.$; /*** FlinkSQL基于处理时间的会话窗口*/publicclassFlinkSQLSessionWinBasePtTime { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、基于SQL的滑动窗口(滑动步长在前)Tableresult=tEnv.sqlQuery("select "+"sensorID,"+"count(sensorID),"+"SESSION_START(ptTime, INTERVAL '5' second) as winstart "+"from "+table+" GROUP BY sensorID,SESSION(ptTime, INTERVAL '5' second)"); //6、指定并打印result.execute().print(); } }
Over windows
在Flink SQL中Over窗⼝⽤法⽐较简单,这⾥不展开了,可参考如下链接:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/over-agg/
举例:为每个订单计算在当前订单前⼀⼩时内同⼀产品的所有订单⾦额的总和。
SELECTorder_id, order_time, amount, SUM(amount) OVER ( PARTITIONBYproductORDERBYorder_timeRANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW ) ASone_hour_prod_amount_sumFROMOrders
可以在select⼦句之外通过WINDOW关键字自定义窗口并给定别名便于引用:
SELECTorder_id, order_time, amount, SUM(amount) OVERwASsum_amount, AVG(amount) OVERwASavg_amountFROMOrdersWINDOWwAS ( PARTITIONBYproductORDERBYorder_timeRANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW)
packagecom.blink.sb.window; importcom.blink.sb.beans.TempSensorData; importorg.apache.flink.streaming.api.datastream.DataStream; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.table.api.Table; importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment; importstaticorg.apache.flink.table.api.Expressions.$; publicclassFlinkSQLOverWinBasePtTime { publicstaticvoidmain(String[] args) throwsException { //1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env); env.setParallelism(1); //3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888) .map(event-> { String[] arr=event.split(","); returnTempSensorData .builder() .sensorID(arr[0]) .tp(Long.parseLong(arr[1])) .temp(Integer.parseInt(arr[2])) .build(); }); //4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData, $("sensorID"), $("tp"), $("temp"), $("ptTime").proctime()//新增ptTime字段为proctime ); //5、定义无界Over窗口(往前无界)//Table API写法// Table result = table.window(Over// .partitionBy($("sensorID"))// .orderBy($("ptTime"))// .preceding(UNBOUNDED_RANGE)//可以省略不写// .as("w")// ).select(// $("sensorID"),// $("temp").max().over($("w"))// );//一次可以执行多个聚合,但是目前只支持在相同的over窗口上Tableresult=tEnv.sqlQuery("select sensorID,"+"max(temp) OVER(PARTITION BY sensorID ORDER BY ptTime) as max_temp, "+"sum(temp) OVER(PARTITION BY sensorID ORDER BY ptTime) as sum_temp "+"from "+table); //6、指定并打印result.execute().print(); } }