代码仓库
会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo
上节进度
上节修改了数据源从 Socket
到 kafka
。此外,完成了 滚动窗口-事件驱动
。
核心代码
每三个数据(key相同数据)
触发一次事件
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream.countWindow(3); countWindow.apply(new MyCountWindowFunction()).print();
运行效果如下图
滑动窗口
什么是滑动窗口
Flink 的滑动窗口(Sliding Window)是一种在流处理应用中使用的窗口类型,用于对连续流数据进行分割和处理。
滑动窗口相对于滚动窗口(Tumbling Window)来说更灵活,因为它允许窗口在时间上重叠,从而可以更加精细地分析流数据。
滑动窗口按照固定的时间间隔(滑动步长)在数据流上滑动,并生成多个窗口。
这些窗口可以重叠,因此每条数据可能会被分配到多个窗口中。每个窗口都会独立地进行计算和聚合操作。
|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----|-----| 0 5 10 15 20 25 30 35 40 45 50 55 60 (时间轴) 窗口 1:从 0 分钟到 10 分钟 窗口 2:从 5 分钟到 15 分钟 窗口 3:从 10 分钟到 20 分钟 窗口 4:从 15 分钟到 25 分钟 窗口 5:从 20 分钟到 30 分钟 ...
时间驱动
StartApp
package icu.wzk.demo07; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import java.text.SimpleDateFormat; import java.util.Random; /** * 滑动窗口 SlidingWindow * 窗口长度固定 可以有重叠 * 基于时间驱动、基于事件驱动 * @author wzk * @date 10:51 2024/6/22 **/ public class SlidingWindow { private static final Random RANDOM = new Random(); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = RANDOM.nextInt(10); System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<>(value, random); } }); KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); // ==================== 时间驱动 ============================ // 基于时间驱动,每隔5s计算一下最近10s的数据 WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5)); timeWindow.sum(1).print(); timeWindow.apply(new MyTimeWindowFunction()).print(); env.execute(); } }
MyTimeWindowFunction
package icu.wzk.demo06; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; /** * 基于时间驱动 TimeWindow * @author wzk * @date 10:26 2024/6/22 **/ public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, String, TimeWindow> { @Override public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for(Tuple2<String,Integer> tuple2 : input){ sum +=tuple2.f1; } long start = window.getStart(); long end = window.getEnd(); out.collect("key:" + s + " value: " + sum + "| window_start :" + format.format(start) + " window_end :" + format.format(end) ); } }
事件驱动
StartApp
package icu.wzk.demo07; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import java.text.SimpleDateFormat; import java.util.Random; /** * 滑动窗口 SlidingWindow * 窗口长度固定 可以有重叠 * 基于时间驱动、基于事件驱动 * @author wzk * @date 10:51 2024/6/22 **/ public class SlidingWindow { private static final Random RANDOM = new Random(); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = RANDOM.nextInt(10); System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<>(value, random); } }); KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); // =================== 事件驱动 ============================= //基于事件驱动,每隔2个事件,触发一次计算,本次窗口的大小为3,代表窗口里的每种事件最多为3个 WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream .countWindow(3, 2); countWindow.sum(1).print(); countWindow.apply(new MyCountWindowFunction()).print(); env.execute(); } }
MyCountWindowFunction
package icu.wzk.demo06; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; /** * 基于事件驱动 GlobalWindow * @author wzk * @date 10:27 2024/6/22 **/ public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, String, GlobalWindow> { @Override public void apply(String s, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for (Tuple2<String, Integer> tuple2 : input){ sum += tuple2.f1; } // 无用的时间戳,默认值为: Long.MAX_VALUE,因为基于事件计数的情况下,不关心时间。 long maxTimestamp = window.maxTimestamp(); out.collect("key:" + s + " value: " + sum + "| maxTimeStamp :" + maxTimestamp + "," + format.format(maxTimestamp) ); } }