01 引言
在前面的博客,我们已经对Flink
批流一体API
的使用有了一定的了解了,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
- 《Flink教程(10)- Flink批流一体API(其它)》
在前面的教程,我们知道Flink
的四大基石十分重要,如下图,本文先讲解下Window
:
02 Window
流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算,Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。
2.1 为什么需要Window?
在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。
在这种情况下,我们必须定义一个窗口(window
),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。
2.2 Window分类
2.2.1 按照time和count分类
- 时间窗口(time-window) :根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据
- 数量窗口(count-window):根据数量划分窗口,如:每xx个数据统计最近xx个数据
2.2.2 按照slide和size分类
窗口有两个重要的属性,窗口大小size
和滑动间隔slide
,根据它们的大小关系可分为:
- 滚动窗口(tumbling-window):
size=slide
,比如: 每隔10s统计最近10s的数据 - 滑动窗口(sliding-window):
size>slide
,比如:每隔5s统计最近10s的数据
注意:当size<slide
的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失,所有开发中不用。
2.2.3 总结
按照上面窗口的分类方式进行组合,可以得出如下的窗口:
分类 | 使用频率 |
基于时间的滚动窗口:tumbling-time-window |
用的较多 |
基于时间的滑动窗口:sliding-time-window |
用的较多 |
基于数量的滚动窗口:tumbling-count-window |
用的较少 |
基于数量的滑动窗口:sliding-count-window |
用的较少 |
注意:Flink
还支持一个特殊的窗口,即 Session
会话窗口,需要设置一个会话超时时间,如30s:则表示30s内没有数据到来,则触发上个窗口的计算。
2.3 Window API
2.3.1 window和windowAll
何时使用:
- 使用
keyby
的流,应该使用window
方法 - 未使用
keyby
的流,应该调用windowAll
方法
2.3.2 WindowAssigner
window/windowAll
方法接收的输入是一个 WindowAssigner
, WindowAssigner
负责将每条输入的数据分发到正确的 window
中,Flink
提供了很多各种场景用的WindowAssigner
:
如果需要自己定制数据分发策略,则可以实现一个 class
,继承自WindowAssigner
。
2.3.3 evictor
evictor
主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor
的 evicBefore
和 evicAfter
两个方法。
Flink 提供了如下三种通用的 evictor:
CountEvictor
保留指定数量的元素TimeEvictor
设定一个阈值interval
,删除所有不再max_ts - interval
范围内的元
素,其中max_ts
是窗口内时间戳的最大值。DeltaEvictor
通过执行用户给定的DeltaFunction
以及预设的theshold
,判断是否删除一个元素。
2.3.4 trigger
trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner
都自带一个默认的trigger
,如果默认的trigger
不能满足你的需求,则可以自定义一个类,继承自Trigger
即可,我们详细描述下 Trigger
的接口以及含义:
- onElement() :每次往
window
增加一个元素的时候都会触发 - onEventTime() :当
event-time timer
被触发的时候会调用 - onProcessingTime() :当
processing-time timer
被触发的时候会调用 - onMerge() :对两个
rigger
的state
进行merge
操作 - clear() :
window
销毁的时候被调用
上面的接口中前三个会返回一个 TriggerResult
, TriggerResult
有如下几种可能的选
择:
CONTINUE
不做任何事情;FIRE
触发window
;PURGE
清空整个window
的元素并销毁窗口;FIRE_AND_PURGE
触发窗口,然后销毁窗口。
2.3.5 API调用示例
source.keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))); 或 source.keyBy(0). timeWindow(Time.seconds(5))
03 Window案例演示
3.1 基于时间的滚动和滑动窗口
需求1:基于时间的滚动窗口 – 每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量
需求2:基于时间的滑动窗口 --每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量
模拟数据如下(信号灯编号和通过该信号灯的车的数量):
9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4
代码实现:
/** * TimeWindow * * @author : YangLinWei * @createTime: 2022/3/7 6:35 下午 */ public class TimeWindow { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999); //3.Transformation //将9,3转为CartInfo(9,3) SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] arr = value.split(","); return new CartInfo(arr[0], Integer.parseInt(arr[1])); } }); //分组 //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId"); // * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口 //timeWindow(Time size窗口大小, Time slide滑动间隔) SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS .keyBy(CartInfo::getSensorId) //.timeWindow(Time.seconds(5))//当size==slide,可以只写一个 //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum("count"); // * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口 SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS .keyBy(CartInfo::getSensorId) //.timeWindow(Time.seconds(10), Time.seconds(5)) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .sum("count"); //4.Sink /* 1,5 2,5 3,5 4,5 */ //result1.print(); result2.print(); //5.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
3.2 基于数量的滚动和滑动窗口
需求1:基于数量的滚动窗口:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计
需求2:基于数量的滑动窗口:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计
示例代码如下:
/** * CountWindow * * @author : YangLinWei * @createTime: 2022/3/7 6:40 下午 */ public class CountWindow { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999); //3.Transformation //将9,3转为CartInfo(9,3) SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] arr = value.split(","); return new CartInfo(arr[0], Integer.parseInt(arr[1])); } }); //分组 //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId"); // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 //countWindow(long size, long slide) SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS .keyBy(CartInfo::getSensorId) //.countWindow(5L, 5L) .countWindow(5L) .sum("count"); // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 //countWindow(long size, long slide) SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS .keyBy(CartInfo::getSensorId) .countWindow(5L, 3L) .sum("count"); //4.Sink //result1.print(); /* 1,1 1,1 1,1 1,1 2,1 1,1 */ result2.print(); /* 1,1 1,1 2,1 1,1 2,1 3,1 4,1 */ //5.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
3.3 会话窗口
需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
示例代码如下:
/** * SessionWindow * * @author : YangLinWei * @createTime: 2022/3/7 6:42 下午 */ public class SessionWindow { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999); //3.Transformation //将9,3转为CartInfo(9,3) SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] arr = value.split(","); return new CartInfo(arr[0], Integer.parseInt(arr[1])); } }); //需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!) SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .sum("count"); //4.Sink result.print(); //5.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
04 文末
本文主要讲解Flink
高级API
之Window
,谢谢大家的阅读,本文完!