FlinkSQL窗口操作

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: group windowsover windows滑动窗口滚动窗口会话窗口

有了时间属性,我们就可以配合窗⼝来完成各种业务的计算,Flink API/SQL 提供了丰富的窗口操作。


窗口分类


Group Windows


所谓Group Windows (分组窗口),就是把event按照时间或者计数 分成若干个组,然后在对每个组执行窗口函数,Group Window 从键控来说,分为键控window 和  非键控window,按照窗口事件分配逻辑又分为若干类型。



键控分window和非键控window


按照是否先根据指定的键(字段/属性)分组再基于时间/计数构建的Window,可以把Group Window分为【键控 Window】【⾮键控Window】


有时把键控窗⼝跟翻滚窗⼝/滑动窗⼝搞混淆,其实是事物的不同层⾯:


概念

定义


键控window

先根据指定的键(字段/属性)分组,再基于时间/计数构建的Window(双重分组)


⾮键控Window 不根据指定的键(字段/属性)分组,直接基于时间/计数构建的Window



注意:窗⼝也是⼀种分组


image.png

按照event分配逻辑分类



时间窗⼝:根据时间对数据流进⾏行行分组切⽚片



翻滚时间窗⼝:Tumbling Time Window

滑动时间窗⼝:Sliding Time Window

会话窗⼝:Session Window


计数窗⼝口:根据元素个数对数据流进⾏行行分组切⽚片


翻滚计数窗:Tumbling CountWindow

滑动计数窗:Sliding CountWindow


注意:时间窗⼝口[startend),左闭右开



滚动窗口



image.png


定义:将数据依据固定的窗⼝⻓度对⽆界数据流进⾏切⽚

特点:时间对⻬、窗⼝⻓度固定、event⽆重叠

适⽤场景:BI统计(计算各个事件段的指标)



滑动窗口


image.png



定义:是固定窗⼝的更⼴义的⼀种形式。滑动窗⼝由固定的窗⼝⻓度和滑动间隔组成

特点:窗⼝⻓度固定、event有重叠

适⽤场景: 监控场景,对最近⼀个时间段内的统计(求某接⼝最近5min的失败率来决定是否要报警



会话窗口


image.png


定义:类似于web应⽤ 的session,即⼀段时间没有接受到新数据就会⽣成新的窗⼝(固定gap/gap fun)


特点:时间⽆对⻬、event不重叠、没有固定开始和结束时间


使用场景: 线上用户行为分析



全局窗口


image.png



定义:有相同key的所有元素分配给相同的单个全局窗⼝

必须指定⾃定义触发器否则没有任何意义(不然不做计算)

注意:不要跟Non-keyed Window搞混,两个不同的角色




Over Windows



Over window 聚合是标准 SQL 中已有的(Over ⼦句),可以在查询的 SELECT ⼦句中定义。 Over window 聚 合,通过窗⼝聚合计算每个输入行在其相邻行范围内的聚合(如 sum等)


注意:


  1. 在流计算场景中,Over Window 不太常见

  1. Group window每个组聚合运算完输出⼀⾏最终结果,⽽在Over window每⼀⾏都会输出⼀⾏结果。




窗口使用



table api中使用窗口


Group Windows


编程模型


Tabletable=input .window([GroupWindoww].as("w")) // 定义⼀个group window并指定别名w .groupBy($("w")) // 按照窗⼝w对table进⾏分组 .select($("b").sum()); // select⼦句指定返回的列和聚合运算(⾮监控的)Tabletable=input .window([GroupWindoww].as("w")) // 定义⼀个group window并指定别名w .groupBy($("w"), $("a")) // 按照属性a和窗⼝w对table进⾏分组(这不就是键控window) .select($("a"), $("b").sum()); // select⼦句指定返回的列和聚合运算


另外,在select⼦句中,我们还可以返回Window的属性:start,end,rowtime



window属性

说明

获取方式

start

窗口开始时间戳

窗口别名.start()

end

窗口结束时间戳

窗口别名.end()

rowtime

窗口的时间戳

窗口别名.rowtime()


注意:

  • 基于时间的窗⼝是左闭右开的,例如从9点开始创建⼀个1⼩时的窗⼝,则start为09:00:00.000,end为 10:00:00.000,rowtime为09:59:59.999.
  • end的时间戳是不会被分组到这个窗口上的。



Tabletable=input .window([GroupWindoww].as("w")) // 定义⼀个group window并指定别名w .groupBy($("w"), $("a")) // 按照属性a和窗⼝w对table进⾏分组(这不就是键控window) .select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count());
//select⼦句返回字段a、窗⼝的开始时间戳、窗⼝的结束时间戳、窗⼝的时间戳,b字段的count


注意:我们到底取哪个时间戳是由业务决定的,⼀般是start。


GroupWindow参数定义了event划分到某个窗⼝的映射规则(基于时间或者计数),但是GroupWindow不是⼀个可 以实现的接⼝,Table API直接提供了⼀组预定义的具有特殊语意的Window类供我们直接使⽤(滚动窗⼝,滑动窗 ⼝等等)。


滚动窗口


滚动窗⼝通过Tumble类来定义,三个⽅法:

image.png


处理时间滚动窗口



packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.Tumble;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.$;
importstaticorg.apache.flink.table.api.Expressions.lit;
/*** 基于处理时间的滚动窗口*/publicclassFlinkTableTumbleWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Tumble                .over(lit(5).second())
                .on($("ptTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("temp").avg());
result.execute().print();
    }
}



事件时间滚动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.Tumble;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importjava.time.Duration;
importstaticorg.apache.flink.table.api.Expressions.$;
importstaticorg.apache.flink.table.api.Expressions.lit;
/*** 基于事件时间的滚动窗口*/publicclassFlinkTableTumbleWinBaseEventTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据并提取时间戳指定水印生成策略WatermarkStrategy<TempSensorData>watermarkStrategy=WatermarkStrategy                .<TempSensorData>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(newSerializableTimestampAssigner<TempSensorData>() {
@OverridepubliclongextractTimestamp(TempSensorDataelement, longrecordTimestamp) {
returnelement.getTp()*1000;
                    }
                });
DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                }).assignTimestampsAndWatermarks(watermarkStrategy);
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("evTime").rowtime()//新增evTime字段为rowtime        );
//5、自定义窗口并计算Tableresult=table.window(Tumble                .over(lit(5).second())
                .on($("evTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、打印result.execute().print();
    }
}



计数滚动窗口
packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.Tumble;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 基于计数的滚动窗口*/publicclassFlinkTableTumbleWinBaseCount {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Tumble                .over(rowInterval(5L))
                .on($("ptTime"))//基于计数的滚动窗口只能按照proctime属性字段排序                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableTumbleWinBaseEvTime");
    }
}

滑动窗口


滑动窗⼝通过Slide类来定义,关键⽅法如下:

方法

描述

over

定义窗口的长度为事件或者计数间隔

every

定义滑动步长(时间步长或计数步长)。滑动步长和窗口长度必须是同类型的

on

指定时间属性字段用于分组(事件窗口)或者排序(计数窗口)。批处理时该字段可以是任意Long 或者 Timestamp类型的字段,流处理时必须是事先指定的时间属性字段

as

为窗⼝分配别名。 别名⽤于在 groupBy⼦句中引⽤窗⼝,并可以在select() ⼦句中选择窗⼝属性, 例如 start,end, rowtime


事件滑动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Slide;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.$;
importstaticorg.apache.flink.table.api.Expressions.lit;
/*** 基于时间的滑动窗口*/publicclassFlinkTableSlideWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Slide                .over(lit(8).second())
                .every(lit(2).second())
                .on($("ptTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableTumbleWinBaseEvTime");
    }
}



计数滑动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Slide;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 基于计数的滑动窗口*/publicclassFlinkTableSlideWinBaseCount {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Slide                .over(rowInterval(8L))
                .every(rowInterval(2L))
                .on($("ptTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableTumbleWinBaseEvTime");
    }
}

注意:所有类型的计数窗⼝只能是按照proctime排序,也就是计数窗⼝不能解决乱序


会话窗口


通过Session类定义会话窗⼝,⽅法如下:


image.png

注意:没有基于计数的会话窗⼝


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Session;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.$;
importstaticorg.apache.flink.table.api.Expressions.lit;
/*** 基于时间的Session窗口*/publicclassFlinkTableSessionWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、自定义窗口并计算Tableresult=table.window(Session                .withGap(lit(5).second())
                .on($("ptTime"))
                .as("w")
        )
                .groupBy($("sensorID"), $("w"))
                .select($("sensorID"), $("sensorID").count());
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableTumbleWinBaseEvTime");
    }
}

注意:基数event Time的窗⼝需要先指定时间属性和⽔印⽣成策略



Over windows


编程模型


Tabletable=input .window([OverWindoww].as("w")) // 定义⼀个Over Window并指定别名为w .select($("a"), $("b").sum().over($("w")), $("c").min().over($("w")));
//在Over Window执⾏聚合运算(返回a、b在窗⼝上求sum、c在窗⼝上求min)


OverWindow 定义了计算聚合的⾏范围。

Table API 提供 Over类来配置 over 窗⼝的属性。 可以在事件时间或处理 时间以及时间周期或⾏数的范围上定义窗⼝。



Over类的⽅法如下:

方法

是否必须

描述


partitionBy

可选

根据⼀个或多个属性分区。 每个分区单独排序,聚合函数分别应⽤于每个分区。注 意:在流式环境中,只有当窗⼝包含 partition by ⼦句时,窗⼝聚合函数才能并⾏计 算。 如果没有 partitionBy(...),则流由单个⾮并⾏任务处理


orderBy

定义每个分区中事件的顺序,即定义聚合函数应⽤于事件的顺序;对于流式查询,必须 是声明的事件时间或处理时间属性。 ⽬前,仅⽀持单个字段排序


preceding

可选

定义包含在窗⼝中并位于当前⾏之前的⾏的间隔(指定下限,往前多⻓时间或者多少 ⾏开窗)。 间隔可以指定为时间间隔或⾏计数间隔。有界窗⼝由间隔的⼤⼩指定,例 如,时间间隔为 10.minutes,⾏计数间隔为 10.rows。⽆界窗⼝使⽤常量指定, UNBOUNDED_RANGE ⽤于时间间隔或 UNBOUNDED_ROW ⽤于⾏计数间隔。 ⽆界 窗⼝从分区的第⼀⾏开始。如果省略preceding⼦句,则 UNBOUNDED_RANGE 和 CURRENT_RANGE 将⽤作窗⼝的默认preceding 和following的默认值。


following

可选

定义包含在窗⼝中并位于当前⾏之后的⾏的间隔(指定上限,往后多⻓时间或者多少 ⾏开窗)。 间隔必须以与preceding间隔(时间或⾏数)相同的单位。注意:流处理环 境下,⽬前不⽀持往后开窗。我们保持默认值就好了


as

必选

为窗⼝分配别名。 别名⽤于在 groupBy⼦句中引⽤窗⼝。




注意:⽬前,同⼀个 select() 调⽤中的所有聚合函数必须在同⼀个窗⼝中计算


实例伪代码:

// ⽆界Event-time排序开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_RANGE).as("w"));// ⽆界Processing-time排序开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w")
);
// ⽆界Event-time排序Row-count开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_ROW).as("w")
);
// ⽆界Processing-time排序Row-count开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(UNBOUNDED_ROW).as("w"));
// 有界Event-time排序向前1分钟开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(lit(1).minutes()).as("w"));// 有界Processing-time排序向前1分钟开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(lit(1).minutes()).as(
"w"));
// 有界Event-time排序向前10⾏开窗 (假设event-time属性是"rowtime").window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(rowInterval(10)).as("w"));// 有界Processing-time排序向前10⾏开窗 (假设processing-time属性是"proctime").window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(rowInterval(10)).as("w"));


第一行到当前行开窗

packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Over;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 基于处理时间开窗1:第一行到当前行开窗*/publicclassFlinkTableOverWinBasePtTime1 {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、定义无界Over窗口(往前无界)Tableresult=table.window(Over                .partitionBy($("sensorID"))
                .orderBy($("ptTime"))
                .preceding(UNBOUNDED_RANGE)//可以省略不写                .as("w")
        ).select(
$("sensorID"),
$("temp").max().over($("w"))
        );
result.execute().print();
    }
}


指定⾏到当前⾏开窗

packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Over;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importorg.apache.flink.types.Row;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** 基于处理时间开窗2:指定行到当前行开窗*/publicclassFlinkTableOverWinBasePtTime2 {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、定义无界窗口(往前无界)Tableresult=table.window(Over                .partitionBy($("sensorID"))
                .orderBy($("ptTime"))
                .preceding(rowInterval(2L))
                .as("w")
        ).select(
$("sensorID"),
$("temp").max().over($("w"))
        );
//6、转换为流并打印tEnv.toDataStream(result, Row.class).print();
//7、执行env.execute("FlinkTableOverWinBasePtTime1");
    }
}


flink sql 中使用窗口


Group windows


Flink SQL中通过Group Windows函数来定义分组窗⼝



分组窗口函数

描述


TUMBLE(time_attr, interval)

定义滚动窗⼝。翻转窗⼝可以在事件时间(流 + 批处理)或处理时间(流)上定义。


HOP(time_attr, interval, interval)

定义滑动窗⼝,名字⽐较有意思。可以在事件时间(流 + 批处理)或处理时间(流) 上定义滑动窗⼝。注意:第⼀个interval是滑动步⻓,第⼆个interval才是窗⼝⼤⼩


SESSION(time_attr, interval)

定义会话窗⼝。会话窗⼝可以在事件时间(流 + 批处理)或处理时间(流)上⼯作。


特别注意:FlinkSQL只⽀持基于时间的分组窗⼝,对于批处理time_attr必须是TIMESTAMP类型的


另外还有⼀些辅助函数,可以在select⼦句中⽤来查询 Group Window 的开始和结束时间戳,以及时间属性 (start,end,rowtime,proctime)。


辅助函数

说明

TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval)

返回窗⼝的开始时间戳(start),即窗⼝的下边界时间戳

TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval)

返回窗⼝的结束时间戳(end),即窗⼝的上边界之外紧挨着的不包含在 本窗⼝的时间戳

注意:结束时间戳【不能】⽤作【后续】基于时间的操作中的⾏时间属 性,例如group window或者over window的聚合操作

TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval)

返回窗⼝的上边界时间戳,以rowtime形式返回 注意:结果是rowtime时间属性,【可以】⽤做后续基于时间的操作,例 如group window 或者 over window的聚合操作等

TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval)

返回 proctime 时间属性

注意: 可以用作后续基于时间的操作,例如group window或者over window的聚合操作等


注意:必须使⽤与 GROUP BY ⼦句中的窗⼝函数完全相同的参数来调⽤辅助函数。



CREATETABLEOrders (
userBIGINT,
productSTRING,
amountINT,
order_timeTIMESTAMP(3),
WATERMARKFORorder_timeASorder_time-INTERVAL'1'MINUTE) WITH (...);
SELECTuser,
TUMBLE_START(order_time, INTERVAL'1'DAY) ASwStart,//参数必须跟下⾯GROUP BY⼦句中的窗⼝函数参数⼀致SUM(amount) FROMOrdersGROUPBYTUMBLE(order_time, INTERVAL'1'DAY),
user


滚动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.*;
/*** FlinkSQL基于处理时间的滚动窗口*/publicclassFlinkSQLTumbleWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//        Table result = table.window(Tumble//                .over(rowInterval(5L))//                .on($("ptTime"))//基于计数的滚动窗口只能按照proctime属性字段排序//                .as("w")//        )//                .groupBy($("sensorID"), $("w"))//                .select($("sensorID"), $("sensorID").count());//5、基于SQL的滚动窗口Tableresult=tEnv.sqlQuery("select "+"sensorID,"+"max(temp),"+"TUMBLE_START(ptTime, INTERVAL '5' second) as winstart "+"from "+table+" GROUP BY sensorID,TUMBLE(ptTime, INTERVAL '5' second)");
//6、指定并打印result.execute().print();
    }
}



滑动窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.DataTypes;
importorg.apache.flink.table.api.Schema;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.$;
/*** FlinkSQL基于处理时间的滑动窗口*/publicclassFlinkSQLHOPWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、基于SQL的滑动窗口(滑动步长在前)Tableresult=tEnv.sqlQuery("select "+"sensorID,"+"count(sensorID),"+"HOP_START(ptTime, INTERVAL '2' second,INTERVAL '4' second) as winstart "+"from "+table+" GROUP BY sensorID,HOP(ptTime, INTERVAL '2' second,INTERVAL '4' second)");
//6、指定并打印result.execute().print();
    }
}


会话窗口


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.$;
/*** FlinkSQL基于处理时间的会话窗口*/publicclassFlinkSQLSessionWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、基于SQL的滑动窗口(滑动步长在前)Tableresult=tEnv.sqlQuery("select "+"sensorID,"+"count(sensorID),"+"SESSION_START(ptTime, INTERVAL '5' second) as winstart "+"from "+table+" GROUP BY sensorID,SESSION(ptTime, INTERVAL '5' second)");
//6、指定并打印result.execute().print();
    }
}



Over windows


在Flink SQL中Over窗⼝⽤法⽐较简单,这⾥不展开了,可参考如下链接:


https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/over-agg/


举例:为每个订单计算在当前订单前⼀⼩时内同⼀产品的所有订单⾦额的总和。



SELECTorder_id, order_time, amount,
SUM(amount) OVER (
PARTITIONBYproductORDERBYorder_timeRANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW ) ASone_hour_prod_amount_sumFROMOrders



可以在select⼦句之外通过WINDOW关键字自定义窗口并给定别名便于引用:


SELECTorder_id, order_time, amount,
SUM(amount) OVERwASsum_amount,
AVG(amount) OVERwASavg_amountFROMOrdersWINDOWwAS (
PARTITIONBYproductORDERBYorder_timeRANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW)


packagecom.blink.sb.window;
importcom.blink.sb.beans.TempSensorData;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.Table;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
importstaticorg.apache.flink.table.api.Expressions.$;
publicclassFlinkSQLOverWinBasePtTime {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读入数据DataStream<TempSensorData>tempSensorData=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnTempSensorData                            .builder()
                            .sensorID(arr[0])
                            .tp(Long.parseLong(arr[1]))
                            .temp(Integer.parseInt(arr[2]))
                            .build();
                });
//4、流转换为动态表Tabletable=tEnv.fromDataStream(tempSensorData,
$("sensorID"),
$("tp"),
$("temp"),
$("ptTime").proctime()//新增ptTime字段为proctime        );
//5、定义无界Over窗口(往前无界)//Table API写法//        Table result = table.window(Over//                .partitionBy($("sensorID"))//                .orderBy($("ptTime"))//                .preceding(UNBOUNDED_RANGE)//可以省略不写//                .as("w")//        ).select(//                $("sensorID"),//                $("temp").max().over($("w"))//        );//一次可以执行多个聚合,但是目前只支持在相同的over窗口上Tableresult=tEnv.sqlQuery("select sensorID,"+"max(temp) OVER(PARTITION BY sensorID ORDER BY ptTime) as max_temp, "+"sum(temp) OVER(PARTITION BY sensorID ORDER BY ptTime) as sum_temp "+"from "+table);
//6、指定并打印result.execute().print();
    }
}
















































































相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
SQL 双11 流计算
Flink SQL 功能解密系列 —— 流计算“撤回(Retraction)”案例分析
通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理方式。
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
SQL 消息中间件 存储
Flink报错问题之Flink报错:Table sink 'a' doesn't support consuming update and delete changes which is produced by node如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
375 15
|
7月前
|
人工智能 算法 大数据
数据的“潘多拉魔盒”:大数据伦理的深度思考
数据的“潘多拉魔盒”:大数据伦理的深度思考
384 25
|
大数据 分布式数据库 Hbase
Hbase学习三:Hbase常用命令总结
Hbase学习三:Hbase常用命令总结
2752 0
|
SQL 存储 缓存
最佳实践|如何写出简单高效的 Flink SQL?
通过几个经典案例介绍 Flink SQL 的最佳实践:如何写出简单高效的 Flink SQL,哪些 SQL 是 BAD SQL。帮助大家更好地的认识 Flink SQL。
46016 0
最佳实践|如何写出简单高效的 Flink SQL?
|
关系型数据库 MySQL 对象存储
FlinkSQL写入对象存储S3报错"Use persist() to create a persistent"
FlinkSQL写入对象存储S3报错"java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point."
490 1
|
存储 缓存 NoSQL
SpringBoot3集成Redis
Redis典型的应用场景就是数据缓存能力,用来解决业务中最容易出现的查询性能问题,提升系统的响应效率;其次就是分布式锁机制,用来解决分布式系统中多线程并发处理资源的安全问题;
855 0