点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(已更完)
Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
Flink DataSet
Flink DataSet 转换操作
Flink DataSet 输出
容错机制、对比、发展方向
Flink Window 背景
Flink认为Batch是Streaming的一个特例,因此Flink底层引擎是一个流式引擎,去上面实现了流处理和批处理,而Window就是从Streaming到Batch的桥梁。
通俗讲,Window是用来对一个无限的流的设置一个有限的集合,从而有界数据集上进行操作的一种机制,流上的集合由Window来划定范围,比如“计算过去10分钟”或者“最后50个元素的和”。
Window可以由时间(TimeWindow)比如30秒或者数据,(CountWindow)比如100个元素驱动。
DataStreamAPI提供了Time和Count的Window。
Flink Window 总览
基本概念
Window 是Flink处理无限流的核心,Windows将流拆分为有限大小“桶”,我们可以在其上应用计算。
Flink 认为Batch是Streaming的一个特例,所以Flink底层引擎是一个流式引擎,在上面实现了流处理和批处理。
而Window窗口是从Streaming到Batch的一个桥梁。
Flink提供了非常完善的窗口机制
在流处理中,数据是连续不断的,因此我们不可能等到所有等到所有数据都到了再开始处理。
当然我们可以每来一个消息就处理一次,但是有时候我们需要做一些聚合操作,例如:在过去一分钟内有多少用户点击了我们的网页
在这种情况下,我们必须定义一个窗口,用来收集最近的一分钟内的数据,并对这个窗口的内数据进行计算
窗口可以基于时间驱动、也可以基于事件驱动
同样基于不同事件驱动的可以分为:翻滚窗口(TumblingWindow 无重叠)、滑动窗口(Sliding Window 有重叠)、会话窗口(SessionWindow 活动间隙)、全局窗口
Flink要操作窗口,先要将StreamSource转换成WindowedStream
转换步骤
获取流数据源
获取窗口
操作窗口数据
输出窗口数据
滚动时间窗口
类型特点
将数据依据固定的窗口长度对数据进行切分:
时间对齐
窗口长度固定,没有重叠
Flink 的滚动时间窗口(Tumbling Window)是一种常见的基于时间的窗口机制,可以通过事件驱动进行计算。滚动窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。
在 Flink 中,滚动时间窗口可以基于事件时间(Event Time)或者处理时间(Processing Time)来定义。为了基于事件时间驱动,可以使用 EventTimeSessionWindows 或者 TumblingEventTimeWindows 来进行定义。
关键点
事件时间和水印 (Watermark): 通过 assignTimestampsAndWatermarks 来指定事件时间,并使用水印确保窗口计算不会遗漏延迟的事件。
窗口定义: 使用 TumblingEventTimeWindows.of(Time.seconds(x)) 定义滚动窗口。窗口长度为 x 秒。
触发器: 采用 EventTimeTrigger 触发计算,确保窗口是基于事件时间的。
基于时间驱动
场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window)。
启动的主类:
package icu.wzk; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Random; public class TumblingWindow { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis)); return Tuple2.of(value, random); } }); KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() { @Override public Tuple getKey(Tuple2<String, Integer> value) throws Exception { return Tuple1.of(value.f0); } }); // 基于时间驱动 每隔 10秒 划分一个窗口 WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream .timeWindow(Time.seconds(10)); timeWindow.apply(new MyTimeWindowFunction()).print(); env.execute("TumblingWindow"); } }
我们实现一个 MyTimeWindowFunction,滚动时间窗口:
package icu.wzk; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> { /** * 场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被叫做 翻滚时间窗口(Tumbling Time Window) * @author wzk * @date 16:58 2024/7/26 **/ @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); int sum = 0; for (Tuple2<String, Integer> tuple2 : input) { sum += tuple2.f1; } out.collect("key: " + tuple.getField(0) + ", value: " + sum + ", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd())); } }
基于事件驱动
场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。
编写一个启动类:
package icu.wzk; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Random; public class TumblingWindow { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis)); return Tuple2.of(value, random); } }); KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() { @Override public Tuple getKey(Tuple2<String, Integer> value) throws Exception { return Tuple1.of(value.f0); } }); // 基于时间驱动 每隔 10秒 划分一个窗口 WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream .countWindow(3); globalWindow.apply(new MyCountWindowFuntion()); env.execute("TumblingWindow"); } }
编写一个事件驱动的类:MyCountWindowFuntion
package icu.wzk; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> { /** * 场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个“相同”元素,就会对窗口进行计算。 * @author wzk * @date 17:11 2024/7/26 **/ @Override public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); int sum = 0; for (Tuple2<String, Integer> tuple2 : input) { sum += tuple2.f1; } // 无用的时间戳:默认值是:Long.MAX_VALUE,在事件驱动下,基于计数的情况,不关心时间 long maxTimestamp = window.maxTimestamp(); out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :" + maxTimestamp + "," + format.format(maxTimestamp)); } }