代码仓库
会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo
上节进度
上节完成了 SlideWindow
时间窗口中的 时间驱动
和 事件驱动
核心代码
/ 时间驱动 WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5)); timeWindow.sum(1).print(); timeWindow.apply(new MyTimeWindowFunction()).print(); // 事件驱动 WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyedStream .countWindow(3, 2); countWindow.sum(1).print(); countWindow.apply(new MyCountWindowFunction()).print(); env.execute();
会话窗口
Flink 会话窗口(Session Window)是一种基于会话活动来划分窗口的机制。与固定时间窗口(Tumbling Window)和滑动窗口(Sliding Window)不同,会话窗口不依赖固定的时间间隔,而是根据数据的活跃度来动态地划分窗口。具体来说,当数据流中存在一定时间的间隔(即没有数据到达),会话窗口会根据这个间隔结束一个窗口,并在新的数据到来时开始一个新的窗口。
会话窗口场景
用户行为分析:如电商网站用户会话,分析用户的购物行为。
IoT 数据处理:物联网设备的活动周期,如传感器的间歇性数据上传。
网络流量分析:根据流量数据的间隔时间来分析网络活动会话。
时间驱动
package icu.wzk.demo07; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import icu.wzk.demo06.MyTimeWindowFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Random; /** * 会话窗口 * @author wzk * @date 14:10 2024/6/24 **/ public class SessionWindow { private static final Random RANDOM = new Random(); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = RANDOM.nextInt(10); System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<>(value, random); } }); KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); // 如果连续10s内,没有数据进来,则会话窗口断开。 WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyedStream .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))); window.sum(1).print(); window.apply(new MyTimeWindowFunction()).print(); env.execute(); } }
事件驱动
package icu.wzk.demo07; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import icu.wzk.demo06.MyTimeWindowFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Random; /** * 会话窗口 * @author wzk * @date 14:10 2024/6/24 **/ public class SessionWindow { private static final Random RANDOM = new Random(); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("0.0.0.0", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = RANDOM.nextInt(10); System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<>(value, random); } }); KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); // 如果连续10s内,没有数据进来,则会话窗口断开。 WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyedStream .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))); window.sum(1).print(); window.apply(new MyCountWindowFunction()).print(); env.execute(); } }
MyTimeWindowFunction
package icu.wzk.demo06; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; /** * 基于时间驱动 TimeWindow * @author wzk * @date 10:26 2024/6/22 **/ public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, String, TimeWindow> { @Override public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for(Tuple2<String,Integer> tuple2 : input){ sum +=tuple2.f1; } long start = window.getStart(); long end = window.getEnd(); out.collect("key:" + s + " value: " + sum + "| window_start :" + format.format(start) + " window_end :" + format.format(end) ); } }
MyCountWindow
package icu.wzk.demo06; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; /** * 基于事件驱动 GlobalWindow * @author wzk * @date 10:27 2024/6/22 **/ public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, String, GlobalWindow> { @Override public void apply(String s, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for (Tuple2<String, Integer> tuple2 : input){ sum += tuple2.f1; } // 无用的时间戳,默认值为: Long.MAX_VALUE,因为基于事件计数的情况下,不关心时间。 long maxTimestamp = window.maxTimestamp(); out.collect("key:" + s + " value: " + sum + "| maxTimeStamp :" + maxTimestamp + "," + format.format(maxTimestamp) ); } }