1. Flink四大基石
Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。
◼ Checkpoint
这是Flink最重要的一个特性。
Flink基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。
Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而Flink则把这个算法发扬光大了。
Spark最近在实现Continue streaming,Continue streaming的目的是为了降低处理的延时,其也需要提供这种一致性的语义,最终也采用了Chandy-Lamport这个算法,说明Chandy-Lamport算法在业界得到了一定的肯定。
https://zhuanlan.zhihu.com/p/53482103
◼ State
提供了一致性的语义之后,Flink为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的State API,包括ValueState、ListState、MapState,BroadcastState。
◼ Time
除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。
◼ Window
另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上
做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及
非常灵活的自定义的窗口。
2. Flink-Window操作
2.1 为什么需要Window
在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。
在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。
2.2 Window的分类
2.2.1 按照time和count分类
time-window:时间窗口:根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据count-window:数量窗口:根据数量划分窗口,如:每xx个数据统计最近xx个数据
2.2.2 按照slide和size分类
窗口有两个重要的属性: 窗口大小size和滑动间隔slide,根据它们的大小关系可分为:tumbling-window: 滚动窗口: size=slide,如:每隔10s统计最近10s的数据
sliding-window:滑动窗口:size>slide,如:每隔5s统计最近10s的数据
注意:当size<slide的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失,所有开发中不用
2.2.3 总结
按照上面窗口的分类方式进行组合,可以得出如下的窗口:
1.基于时间的滚动窗口tumbling-time-window–用的较多
2.基于时间的滑动窗口sliding-time-window–用的较多
3.基于数量的滚动窗口tumbling-count-window–用的较少
4.基于数量的滑动窗口sliding-count-window–用的较少
注意:Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内
没有数据到来,则触发上个窗口的计算
2.3 Window的API2.3.1 window和windowAll
使用keyby的流,应该使用window方法
未使用keyby的流,应该调用windowAll方法
2.3.2 WindowAssigner
window/windowAll 方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 window 中,
Flink提供了很多各种场景用的WindowAssigner:
如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。
2.3.3 evictor–了解
evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor的 evicBefore 和 evicAfter两个方法。
Flink 提供了如下三种通用的 evictor:
CountEvictor 保留指定数量的元素
TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元
素,其中 max_ts 是窗口内时间戳的最大值。
DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删
除一个元素。
2.3.4 trigger–了解
trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:
onElement() 每次往 window 增加一个元素的时候都会触发
onEventTime() 当 event-time timer 被触发的时候会调用
onProcessingTime() 当 processing-time timer 被触发的时候会调用
onMerge() 对两个 `rigger 的 state 进行 merge 操作
clear() window 销毁的时候被调用
上面的接口中前三个会返回一个 TriggerResult, TriggerResult 有如下几种可能的选
择:
CONTINUE 不做任何事情
FIRE 触发 window
PURGE 清空整个 window 的元素并销毁窗口
FIRE_AND_PURGE 触发窗口,然后销毁窗口
2.3.5 API调用示例
source.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
或
source.keyBy(0)…timeWindow(Time.seconds(5))
2.4 案例演示-基于时间的滚动和滑动窗口
2.4.1 需求
nc -lk 9999
有如下数据表示:
信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滚动窗口
需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量–基于时间的滑动窗口
2.4.2 代码实现
package cn.oldlu.window; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; /** * Author oldlu * Desc * nc -lk 9999 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */ public class WindowDemo01_TimeWindow { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999); //3.Transformation //将9,3转为CartInfo(9,3) SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] arr = value.split(","); return new CartInfo(arr[0], Integer.parseInt(arr[1])); } }); //分组 //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId"); // * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口 //timeWindow(Time size窗口大小, Time slide滑动间隔) SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS .keyBy(CartInfo::getSensorId) //.timeWindow(Time.seconds(5))//当size==slide,可以只写一个 //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum("count"); // * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口 SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS .keyBy(CartInfo::getSensorId) //.timeWindow(Time.seconds(10), Time.seconds(5)) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .sum("count"); //4.Sink /* 1,5 2,5 3,5 4,5 */ //result1.print(); result2.print(); //5.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
2.5 案例演示-基于数量的滚动和滑动窗口
2.5.1 需求
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的
滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量
的滑动窗口
2.5.2 代码实现
package cn.oldlu.window; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author oldlu * Desc * nc -lk 9999 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4 * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 */ public class WindowDemo02_CountWindow { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999); //3.Transformation //将9,3转为CartInfo(9,3) SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] arr = value.split(","); return new CartInfo(arr[0], Integer.parseInt(arr[1])); } }); //分组 //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId"); // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 //countWindow(long size, long slide) SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS .keyBy(CartInfo::getSensorId) //.countWindow(5L, 5L) .countWindow( 5L) .sum("count"); // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 //countWindow(long size, long slide) SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS .keyBy(CartInfo::getSensorId) .countWindow(5L, 3L) .sum("count"); //4.Sink //result1.print(); /* 1,1 1,1 1,1 1,1 2,1 1,1 */ result2.print(); /* 1,1 1,1 2,1 1,1 2,1 3,1 4,1 */ //5.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }
2.6 案例演示-会话窗口
2.6.1 需求
设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
2.6.2 代码实现
package cn.oldlu.window; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; /** * Author oldlu * Desc * nc -lk 9999 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4 * 需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!) */ public class WindowDemo03_SessionWindow { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999); //3.Transformation //将9,3转为CartInfo(9,3) SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] arr = value.split(","); return new CartInfo(arr[0], Integer.parseInt(arr[1])); } }); //需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!) SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .sum("count"); //4.Sink result.print(); //5.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 } }