FlinkSQL窗口操作

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: group windowsover windows滑动窗口滚动窗口会话窗口

有了时间属性,我们就可以配合窗⼝来完成各种业务的计算,Flink API/SQL 提供了丰富的窗口操作。


窗口分类


Group Windows


所谓Group Windows (分组窗口),就是把event按照时间或者计数 分成若干个组,然后在对每个组执行窗口函数,Group Window 从键控来说,分为键控window 和  非键控window,按照窗口事件分配逻辑又分为若干类型。



键控分window和非键控window


按照是否先根据指定的键(字段/属性)分组再基于时间/计数构建的Window,可以把Group Window分为【键控 Window】【⾮键控Window】


有时把键控窗⼝跟翻滚窗⼝/滑动窗⼝搞混淆,其实是事物的不同层⾯:


概念

定义


键控window

先根据指定的键(字段/属性)分组,再基于时间/计数构建的Window(双重分组)


⾮键控Window 不根据指定的键(字段/属性)分组,直接基于时间/计数构建的Window



注意:窗⼝也是⼀种分组


image.png

按照event分配逻辑分类



时间窗⼝:根据时间对数据流进⾏行行分组切⽚片



翻滚时间窗⼝:Tumbling Time Window

滑动时间窗⼝:Sliding Time Window

会话窗⼝:Session Window


计数窗⼝口:根据元素个数对数据流进⾏行行分组切⽚片


翻滚计数窗:Tumbling CountWindow

滑动计数窗:Sliding CountWindow


注意:时间窗⼝口[startend),左闭右开



滚动窗口



image.png


定义:将数据依据固定的窗⼝⻓度对⽆界数据流进⾏切⽚

特点:时间对⻬、窗⼝⻓度固定、event⽆重叠

适⽤场景:BI统计(计算各个事件段的指标)



滑动窗口


image.png



定义:是固定窗⼝的更⼴义的⼀种形式。滑动窗⼝由固定的窗⼝⻓度和滑动间隔组成

特点:窗⼝⻓度固定、event有重叠

适⽤场景: 监控场景,对最近⼀个时间段内的统计(求某接⼝最近5min的失败率来决定是否要报警



会话窗口


image.png


定义:类似于web应⽤ 的session,即⼀段时间没有接受到新数据就会⽣成新的窗⼝(固定gap/gap fun)


特点:时间⽆对⻬、event不重叠、没有固定开始和结束时间


使用场景: 线上用户行为分析



全局窗口


image.png



定义:有相同key的所有元素分配给相同的单个全局窗⼝

必须指定⾃定义触发器否则没有任何意义(不然不做计算)

注意:不要跟Non-keyed Window搞混,两个不同的角色




Over Windows



Over window 聚合是标准 SQL 中已有的(Over ⼦句),可以在查询的 SELECT ⼦句中定义。 Over window 聚 合,通过窗⼝聚合计算每个输入行在其相邻行范围内的聚合(如 sum等)


注意:


  1. 在流计算场景中,Over Window 不太常见

  1. Group window每个组聚合运算完输出⼀⾏最终结果,⽽在Over window每⼀⾏都会输出⼀⾏结果。




窗口使用



table api中使用窗口


Group Windows


编程模型


Tabletable=input .window([GroupWindoww].as("w")) // 定义⼀个group window并指定别名w .groupBy($("w")) // 按照窗⼝w对table进⾏分组 .select($("b").sum()); // select⼦句指定返回的列和聚合运算(⾮监控的)Tabletable=input .window([GroupWindoww].as("w")) // 定义⼀个group window并指定别名w .groupBy($("w"), $("a")) // 按照属性a和窗⼝w对table进⾏分组(这不就是键控window) .select($("a"), $("b").sum()); // select⼦句指定返回的列和聚合运算


另外,在select⼦句中,我们还可以返回Window的属性:start,end,rowtime



window属性

说明

获取方式

start

窗口开始时间戳

窗口别名.start()

end

窗口结束时间戳

窗口别名.end()

rowtime

窗口的时间戳

窗口别名.rowtime()


注意:

  • 基于时间的窗⼝是左闭右开的,例如从9点开始创建⼀个1⼩时的窗⼝,则start为09:00:00.000,end为 10:00:00.000,rowtime为09:59:59.999.
  • end的时间戳是不会被分组到这个窗口上的。



Tabletable=input .window([GroupWindoww].as("w")) // 定义⼀个group window并指定别名w .groupBy($("w"), $("a")) // 按照属性a和窗⼝w对table进⾏分组(这不就是键控window) .select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count());
//select⼦句返回字段a、窗⼝的开始时间戳、窗⼝的结束时间戳、窗⼝的时间戳,b字段的count


注意:我们到底取哪个时间戳是由业务决定的,⼀般是start。


GroupWindow参数定义了event划分到某个窗⼝的映射规则(基于时间或者计数),但是GroupWindow不是⼀个可 以实现的接⼝,Table API直接提供了⼀组预定义的具有特殊语意的Window类供我们直接使⽤(滚动窗⼝,滑动窗 ⼝等等)。


滚动窗口


滚动窗⼝通过Tumble类来定义,三个⽅法:

image.png


处理时间滚动窗口



packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.Tumble;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.$;
importstaticorg.apache.flink.table.api.Expressions.lit;
/*** 基于处理时间的滚动窗口*/publicclassFlinkTableTumbleWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Tumble                .over(lit(5).second())
                .on($("ptTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("temp").avg());
result.execute().print();
    }
}



事件时间滚动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.Tumble;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importjava.time.Duration;
importstaticorg.apache.flink.table.api.Expressions.$;
importstaticorg.apache.flink.table.api.Expressions.lit;
/*** 基于事件时间的滚动窗口*/publicclassFlinkTableTumbleWinBaseEventTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据并提取时间戳指定水印生成策略WatermarkStrategy<TempSensorData>watermarkStrategy=WatermarkStrategy                .<TempSensorData>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(newSerializableTimestampAssigner<TempSensorData>() {
@OverridepubliclongextractTimestamp(TempSensorDataelement, longrecordTimestamp) {
returnelement.getTp()*1000;
                    }
                });
DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                }).assignTimestampsAndWatermarks(watermarkStrategy);
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("evTime").rowtime()//新增evTime字段为rowtime        );
//5、自定义窗口并计算Tableresult=table.window(Tumble                .over(lit(5).second())
                .on($("evTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、打印result.execute().print();
    }
}



计数滚动窗口
packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.Tumble;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 基于计数的滚动窗口*/publicclassFlinkTableTumbleWinBaseCount {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Tumble                .over(rowInterval(5L))
                .on($("ptTime"))//基于计数的滚动窗口只能按照proctime属性字段排序                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableTumbleWinBaseEvTime");
    }
}

滑动窗口


滑动窗⼝通过Slide类来定义,关键⽅法如下:

方法

描述

over

定义窗口的长度为事件或者计数间隔

every

定义滑动步长(时间步长或计数步长)。滑动步长和窗口长度必须是同类型的

on

指定时间属性字段用于分组(事件窗口)或者排序(计数窗口)。批处理时该字段可以是任意Long 或者 Timestamp类型的字段,流处理时必须是事先指定的时间属性字段

as

为窗⼝分配别名。 别名⽤于在 groupBy⼦句中引⽤窗⼝,并可以在select() ⼦句中选择窗⼝属性, 例如 start,end, rowtime


事件滑动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Slide;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.$;
importstaticorg.apache.flink.table.api.Expressions.lit;
/*** 基于时间的滑动窗口*/publicclassFlinkTableSlideWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Slide                .over(lit(8).second())
                .every(lit(2).second())
                .on($("ptTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableTumbleWinBaseEvTime");
    }
}



计数滑动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Slide;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 基于计数的滑动窗口*/publicclassFlinkTableSlideWinBaseCount {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Slide                .over(rowInterval(8L))
                .every(rowInterval(2L))
                .on($("ptTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableTumbleWinBaseEvTime");
    }
}

注意:所有类型的计数窗⼝只能是按照proctime排序,也就是计数窗⼝不能解决乱序


会话窗口


通过Session类定义会话窗⼝,⽅法如下:


image.png

注意:没有基于计数的会话窗⼝


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Session;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.$;
importstaticorg.apache.flink.table.api.Expressions.lit;
/*** 基于时间的Session窗口*/publicclassFlinkTableSessionWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Session                .withGap(lit(5).second())
                .on($("ptTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableTumbleWinBaseEvTime");
    }
}

注意:基数event Time的窗⼝需要先指定时间属性和⽔印⽣成策略



Over windows


编程模型


Tabletable=input .window([OverWindoww].as("w")) // 定义⼀个Over Window并指定别名为w .select($("a"), $("b").sum().over($("w")), $("c").min().over($("w")));
//在Over Window执⾏聚合运算(返回a、b在窗⼝上求sum、c在窗⼝上求min)


OverWindow 定义了计算聚合的⾏范围。

Table API 提供 Over类来配置 over 窗⼝的属性。 可以在事件时间或处理 时间以及时间周期或⾏数的范围上定义窗⼝。



Over类的⽅法如下:

方法

是否必须

描述


partitionBy

可选

根据⼀个或多个属性分区。 每个分区单独排序,聚合函数分别应⽤于每个分区。注 意:在流式环境中,只有当窗⼝包含 partition by ⼦句时,窗⼝聚合函数才能并⾏计 算。 如果没有 partitionBy(...),则流由单个⾮并⾏任务处理


orderBy

定义每个分区中事件的顺序,即定义聚合函数应⽤于事件的顺序;对于流式查询,必须 是声明的事件时间或处理时间属性。 ⽬前,仅⽀持单个字段排序


preceding

可选

定义包含在窗⼝中并位于当前⾏之前的⾏的间隔(指定下限,往前多⻓时间或者多少 ⾏开窗)。 间隔可以指定为时间间隔或⾏计数间隔。有界窗⼝由间隔的⼤⼩指定,例 如,时间间隔为 10.minutes,⾏计数间隔为 10.rows。⽆界窗⼝使⽤常量指定, UNBOUNDED_RANGE ⽤于时间间隔或 UNBOUNDED_ROW ⽤于⾏计数间隔。 ⽆界 窗⼝从分区的第⼀⾏开始。如果省略preceding⼦句,则 UNBOUNDED_RANGE 和 CURRENT_RANGE 将⽤作窗⼝的默认preceding 和following的默认值。


following

可选

定义包含在窗⼝中并位于当前⾏之后的⾏的间隔(指定上限,往后多⻓时间或者多少 ⾏开窗)。 间隔必须以与preceding间隔(时间或⾏数)相同的单位。注意:流处理环 境下,⽬前不⽀持往后开窗。我们保持默认值就好了


as

必选

为窗⼝分配别名。 别名⽤于在 groupBy⼦句中引⽤窗⼝。




注意:⽬前,同⼀个 select() 调⽤中的所有聚合函数必须在同⼀个窗⼝中计算


实例伪代码:

// ⽆界Event-time排序开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_RANGE).as("w"));// ⽆界Processing-time排序开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w")
);
// ⽆界Event-time排序Row-count开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_ROW).as("w")
);
// ⽆界Processing-time排序Row-count开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(UNBOUNDED_ROW).as("w"));
// 有界Event-time排序向前1分钟开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(lit(1).minutes()).as("w"));// 有界Processing-time排序向前1分钟开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(lit(1).minutes()).as(
"w"));
// 有界Event-time排序向前10⾏开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(rowInterval(10)).as("w"));// 有界Processing-time排序向前10⾏开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(rowInterval(10)).as("w"));


第一行到当前行开窗

packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Over;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 基于处理时间开窗1:第一行到当前行开窗*/publicclassFlinkTableOverWinBasePtTime1 {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、定义无界Over窗口(往前无界)Tableresult=table.window(Over                .partitionBy($("sensorID"))
                .orderBy($("ptTime"))
                .preceding(UNBOUNDED_RANGE)//可以省略不写                .as("w")
        ).select(
$("sensorID"),
$("temp").max().over($("w"))
        );
result.execute().print();
    }
}


指定⾏到当前⾏开窗

packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Over;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 基于处理时间开窗2:指定行到当前行开窗*/publicclassFlinkTableOverWinBasePtTime2 {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、定义无界窗口(往前无界)Tableresult=table.window(Over                .partitionBy($("sensorID"))
                .orderBy($("ptTime"))
                .preceding(rowInterval(2L))
                .as("w")
        ).select(
$("sensorID"),
$("temp").max().over($("w"))
        );
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableOverWinBasePtTime1");
    }
}


flink sql 中使用窗口


Group windows


Flink SQL中通过Group Windows函数来定义分组窗⼝



分组窗口函数

描述


TUMBLE(time_attr, interval)

定义滚动窗⼝。翻转窗⼝可以在事件时间(流 + 批处理)或处理时间(流)上定义。


HOP(time_attr, interval, interval)

定义滑动窗⼝,名字⽐较有意思。可以在事件时间(流 + 批处理)或处理时间(流) 上定义滑动窗⼝。注意:第⼀个interval是滑动步⻓,第⼆个interval才是窗⼝⼤⼩


SESSION(time_attr, interval)

定义会话窗⼝。会话窗⼝可以在事件时间(流 + 批处理)或处理时间(流)上⼯作。


特别注意:FlinkSQL只⽀持基于时间的分组窗⼝,对于批处理time_attr必须是TIMESTAMP类型的


另外还有⼀些辅助函数,可以在select⼦句中⽤来查询 Group Window 的开始和结束时间戳,以及时间属性 (start,end,rowtime,proctime)。


辅助函数

说明

TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval)

返回窗⼝的开始时间戳(start),即窗⼝的下边界时间戳

TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval)

返回窗⼝的结束时间戳(end),即窗⼝的上边界之外紧挨着的不包含在 本窗⼝的时间戳

注意:结束时间戳【不能】⽤作【后续】基于时间的操作中的⾏时间属 性,例如group window或者over window的聚合操作

TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval)

返回窗⼝的上边界时间戳,以rowtime形式返回 注意:结果是rowtime时间属性,【可以】⽤做后续基于时间的操作,例 如group window 或者 over window的聚合操作等

TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval)

返回 proctime 时间属性

注意: 可以用作后续基于时间的操作,例如group window或者over window的聚合操作等


注意:必须使⽤与 GROUP BY ⼦句中的窗⼝函数完全相同的参数来调⽤辅助函数。



CREATETABLEOrders (
userBIGINT,
productSTRING,
amountINT,
order_timeTIMESTAMP(3),
WATERMARKFORorder_timeASorder_time-INTERVAL'1'MINUTE) WITH (...);
SELECTuser,
TUMBLE_START(order_time, INTERVAL'1'DAY) ASwStart,//参数必须跟下⾯GROUP BY⼦句中的窗⼝函数参数⼀致SUM(amount) FROMOrdersGROUPBYTUMBLE(order_time, INTERVAL'1'DAY),
user


滚动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** FlinkSQL基于处理时间的滚动窗口*/publicclassFlinkSQLTumbleWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//        Table result = table.window(Tumble//                .over(rowInterval(5L))//                .on($("ptTime"))//基于计数的滚动窗口只能按照proctime属性字段排序//                .as("w")//        )//                .groupBy($("sensorID"), $("w"))//                .select($("sensorID"), $("sensorID").count());//5、基于SQL的滚动窗口Tableresult=tEnv.sqlQuery("select "+"sensorID,"+"max(temp),"+"TUMBLE_START(ptTime, INTERVAL '5' second) as winstart "+"from "+table+" GROUP BY sensorID,TUMBLE(ptTime, INTERVAL '5' second)");
//6、指定并打印result.execute().print();
    }
}



滑动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.Schema;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.$;
/*** FlinkSQL基于处理时间的滑动窗口*/publicclassFlinkSQLHOPWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、基于SQL的滑动窗口(滑动步长在前)Tableresult=tEnv.sqlQuery("select "+"sensorID,"+"count(sensorID),"+"HOP_START(ptTime, INTERVAL '2' second,INTERVAL '4' second) as winstart "+"from "+table+" GROUP BY sensorID,HOP(ptTime, INTERVAL '2' second,INTERVAL '4' second)");
//6、指定并打印result.execute().print();
    }
}


会话窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.$;
/*** FlinkSQL基于处理时间的会话窗口*/publicclassFlinkSQLSessionWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、基于SQL的滑动窗口(滑动步长在前)Tableresult=tEnv.sqlQuery("select "+"sensorID,"+"count(sensorID),"+"SESSION_START(ptTime, INTERVAL '5' second) as winstart "+"from "+table+" GROUP BY sensorID,SESSION(ptTime, INTERVAL '5' second)");
//6、指定并打印result.execute().print();
    }
}



Over windows


在Flink SQL中Over窗⼝⽤法⽐较简单,这⾥不展开了,可参考如下链接:


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/over-agg/


举例:为每个订单计算在当前订单前⼀⼩时内同⼀产品的所有订单⾦额的总和。



SELECTorder_id, order_time, amount,
SUM(amount) OVER (
PARTITIONBYproductORDERBYorder_timeRANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW ) ASone_hour_prod_amount_sumFROMOrders



可以在select⼦句之外通过WINDOW关键字自定义窗口并给定别名便于引用:


SELECTorder_id, order_time, amount,
SUM(amount) OVERwASsum_amount,
AVG(amount) OVERwASavg_amountFROMOrdersWINDOWwAS (
PARTITIONBYproductORDERBYorder_timeRANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW)


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.$;
publicclassFlinkSQLOverWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、定义无界Over窗口(往前无界)//Table API写法//        Table result = table.window(Over//                .partitionBy($("sensorID"))//                .orderBy($("ptTime"))//                .preceding(UNBOUNDED_RANGE)//可以省略不写//                .as("w")//        ).select(//                $("sensorID"),//                $("temp").max().over($("w"))//        );//一次可以执行多个聚合,但是目前只支持在相同的over窗口上Tableresult=tEnv.sqlQuery("select sensorID,"+"max(temp) OVER(PARTITION BY sensorID ORDER BY ptTime) as max_temp, "+"sum(temp) OVER(PARTITION BY sensorID ORDER BY ptTime) as sum_temp "+"from "+table);
//6、指定并打印result.execute().print();
    }
}
















































































相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
流计算
Flink窗口——window
Flink窗口——window
43 0
|
消息中间件 程序员 API
Flink中时间和窗口
Flink中时间和窗口
176 0
|
缓存 API 流计算
Flink--7、窗口(窗口的概念、分类、API、分配器、窗口函数)、触发器、移除器
Flink--7、窗口(窗口的概念、分类、API、分配器、窗口函数)、触发器、移除器
|
SQL 存储 监控
FlinkSQL窗口新特性(Window TVF)
理解Window TVF Window TVF使用
FlinkSQL窗口新特性(Window TVF)
|
2月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
110 0
|
6天前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
61 27
|
7月前
|
SQL API 数据处理
实时计算 Flink版产品使用合集之如果一个窗口区间没有数据,若不会开窗就没法使用triggers赋默认值
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 流计算
Flink SQL 在快手实践问题之Window TVF改进窗口聚合功能如何解决
Flink SQL 在快手实践问题之Window TVF改进窗口聚合功能如何解决
35 1
|
6月前
|
数据采集 关系型数据库 MySQL
实时计算 Flink版操作报错合集之源表的数据已经被手动删除,时间窗口内的数据仍存在,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
140 1
|
7月前
|
SQL 自然语言处理 机器人
Flink sql滚动窗口怎么操作能实现stream里的allowlateness?
【1月更文挑战第3天】【1月更文挑战第12篇】Flink sql滚动窗口怎么操作能实现stream里的allowlateness?
97 1