一、Flink时间
1.1 概念
1.EventTime 时间创建的时间,时间戳描述。
2.Ingestion Time 数据进入到Flink的时间
3.Processing Time 是每一个执行操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time
二、Window简介
2.1 Streaming
这是一种无界的数据流,不断增长的数据流。
2.2 window
window 是将一个无线的stream逻辑上拆分成有限大小的 bucket 筒,进行操作。
三、Window类型
3.1 CountWindow-固定条数窗口
按照指定的数据条数生成一个window,与时间无关
3.1.1 CountWindow.java
public class CountWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888); //把传进来的数据String换成int类型 SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) throws Exception { return Integer.parseInt(value); } }); //不分组 整体是一个组 //输入5条后计算 AllWindowedStream<Integer, GlobalWindow> window = nums.countWindowAll(5); //窗口中聚合 SingleOutputStreamOperator<Integer> sumed = window.sum(0); sumed.print(); env.execute(); } }
程序执行结果
3.1.2 CountWindowGroup.java
public class CountWindowGroup { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //spark,3 //hadoop,2 DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888); //分组后在进行划分 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] fields = value.split(","); String word = fields[0]; Integer count = Integer.parseInt(fields[1]); return Tuple2.of(word, count); } }); //1.先分组 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //2.划分窗口 WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> window = keyed.countWindow(5); //3.所有分组达到了条数才会执行 【5条数据全部拿到了,可以进行各种计算】 SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1); summed.print(); env.execute(); } }
执行结果下:
3.2 Tumbing Window-滚动窗口
特点:特订的步长,比如5S滑动一次。适合做BI统计等等。
3.2.1 TumblingWindowAll.java
public class TumblingWindowAll { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888); //把传进来的数据String换成int类型 SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) throws Exception { return Integer.parseInt(value); } }); //不分组 5s 中聚合一次 AllWindowedStream<Integer, TimeWindow> window = nums.timeWindowAll(Time.seconds(5)); //窗口中聚合 SingleOutputStreamOperator<Integer> sumed = window.sum(0); sumed.print(); env.execute(); } }
3.2.2 TumblingWindowGroup.java
public class TumblingWindowGroup { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //spark,3 //hadoop,2 DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888); //分组后在进行划分 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] fields = value.split(","); String word = fields[0]; Integer count = Integer.parseInt(fields[1]); return Tuple2.of(word, count); } }); //先分组 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //划分窗口 WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.timeWindow(Time.seconds(5)); //聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1); summed.print(); env.execute(); } }
3.3 Sliging Window-滑动窗口
滑动窗口的大小可以与步长不等大小。窗口固定长度,有重叠。时间对齐。
特点:算趋势。
3.3.1 SlidingWindow.java
public class SlidingWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888); //把传进来的数据String换成int类型 SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() { @Override public Integer map(String value) throws Exception { return Integer.parseInt(value); } }); //不分组 整体是一个组 //窗口的长度为10s 5s中滑动一次 AllWindowedStream<Integer, TimeWindow> window = nums.timeWindowAll(Time.seconds(10), Time.seconds(5)); //窗口中聚合 SingleOutputStreamOperator<Integer> sumed = window.sum(0); sumed.print(); env.execute(); } }
执行结果
3.4 Session Window-会话窗口
按照指定的时间间隔划分一个窗口。
四、WaterMark
1.它是window延迟触发的机制
2.watermark >= 上一个窗口的结束边界就会触发窗口执行
3.watermark = 数据锁携带的时间【窗口的最大时间】- 延迟执行的时间