一、为什么需要 Window
在流处理场景中,数据是连续不断实时到达的,如果不对数据进行切分,直接处理整个无限流数据是不现实的。窗口(Window)就是为了解决这个问题而存在的,它将无限的数据流按照一定的规则切分成有限大小的 “块”,从而可以对每个 “块” 内的数据进行计算处理。例如,在实时统计网站的访问量场景中,我们可能需要每 5 分钟统计一次访问量,这里的 5 分钟就是一个窗口,通过窗口将连续的访问数据切分成多个 5 分钟的片段进行统计。
二、Window 的控制属性
- 窗口大小(Window Size):决定了窗口的时间跨度或数据量大小。例如,时间窗口可以设置为 10 秒、1 分钟等,计数窗口可以设置为每 100 条数据为一个窗口。
- 滑动步长(Slide):窗口滑动的距离。如果滑动步长小于窗口大小,会出现窗口重叠的情况;如果滑动步长等于窗口大小,窗口之间是不重叠的;如果滑动步长大于窗口大小,则会出现数据遗漏的情况。例如,窗口大小为 10 分钟,滑动步长为 5 分钟,那么每 5 分钟就会生成一个新的窗口,且新窗口与之前的窗口有 5 分钟的数据重叠。
- 偏移量(Offset):窗口的起始位置偏移。通过设置偏移量,可以调整窗口的起始时间点,例如在时间窗口中,偏移量可以使窗口从非整点时间开始。
三、Flink 窗口应用代码结构
在 Flink 中,使用窗口进行流处理的基本代码结构如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> stream = env.socketTextStream("localhost", 9999) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] parts = value.split(","); return new Tuple2<>(parts[0], Integer.parseInt(parts[1])); } }); stream.keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum(1) .print(); env.execute("Window Example");
上述代码中,首先通过env.socketTextStream从 socket 获取数据,然后通过map函数将数据转换为Tuple2类型。接着使用keyBy对数据进行分组,window指定窗口类型为滚动事件时间窗口,大小为 5 秒,最后使用sum函数对窗口内的数据进行求和并打印。
四、Window 的生命周期
- 窗口创建:当第一条属于该窗口的数据到达时,窗口被创建。例如,在一个滚动时间窗口中,当第一条数据到达且其时间戳落在该窗口时间范围内时,窗口就会被创建。
- 窗口活跃:在窗口的生命周期内,不断有属于该窗口的数据流入,窗口处于活跃状态,持续收集数据。
- 窗口触发:当满足窗口的触发条件时,窗口会被触发。触发条件通常与窗口的类型和设置的属性有关,例如时间窗口在到达窗口结束时间时触发,计数窗口在数据量达到设定值时触发。
- 窗口销毁:窗口触发计算后,通常会被销毁,释放相关资源。但在某些情况下,如会话窗口,如果后续又有符合条件的数据进入,窗口可能会重新激活。
五、Window 的分类
- 时间窗口(Time Window)
- 滚动时间窗口(Tumbling Time Windows):窗口大小固定,且不重叠。例如,每 10 分钟一个窗口,窗口之间没有数据重叠。
- 滑动时间窗口(Sliding Time Windows):窗口大小固定,通过滑动步长来控制窗口的重叠情况。如窗口大小为 10 分钟,滑动步长为 5 分钟,则会有 5 分钟的数据重叠。
- 会话时间窗口(Session Time Windows):窗口由一段时间内不活跃的间隙分隔开,即如果在一段时间内没有数据到达,那么之前的窗口结束,新的数据到达会开启新的窗口。会话窗口的大小不固定,取决于数据的活跃情况。
- 计数窗口(Count Window)
- 滚动计数窗口(Tumbling Count Windows):当窗口内的数据量达到设定的数量时,窗口触发计算,且窗口之间不重叠。例如,每 100 条数据为一个窗口。
- 滑动计数窗口(Sliding Count Windows):与滚动计数窗口类似,但通过滑动步长来控制窗口的重叠情况。例如,窗口大小为 100 条数据,滑动步长为 50 条数据,则每 50 条数据就会生成一个新窗口,且新窗口与之前的窗口有 50 条数据重叠。
六、Windows Function 窗口函数
- 增量聚合函数(Incremental Aggregation Functions):在数据进入窗口时,对窗口内的数据进行增量计算。例如SumFunction、MinFunction、MaxFunction等,它们在每条数据到来时更新聚合结果,而不需要缓存整个窗口的数据。
- 全窗口函数(Full Window Functions):需要缓存整个窗口的数据,当窗口触发时,对窗口内的所有数据进行计算。例如WindowFunction,它可以访问窗口的元数据(如窗口的起始时间、结束时间等),并对窗口内的所有数据进行自定义的计算。
- ProcessWindowFunction:它是WindowFunction的扩展,不仅可以访问窗口的元数据和窗口内的所有数据,还可以访问上下文信息,如当前的时间戳、状态等,提供了更强大的功能。
七、应用案例
实时流量统计
假设有一个实时监控网站流量的需求,需要每 1 分钟统计一次每个 IP 的访问量。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> stream = env.socketTextStream("localhost", 9999) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] parts = value.split(","); return new Tuple2<>(parts[0], Integer.parseInt(parts[1])); } }); stream.keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .sum(1) .print(); env.execute("Real - time Traffic Statistics");
上述代码通过keyBy按照 IP 地址分组,使用滚动处理时间窗口,每 1 分钟统计一次每个 IP 的访问量并打印。
电商实时销售分析
在电商场景中,需要实时统计每 10 分钟内每个商品类别的销售总额,并且可以根据滑动窗口实时查看最近 30 分钟内每 10 分钟的销售趋势。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple3<String, Double, Long>> stream = env.socketTextStream("localhost", 9999) .map(new MapFunction<String, Tuple3<String, Double, Long>>() { @Override public Tuple3<String, Double, Long> map(String value) throws Exception { String[] parts = value.split(","); return new Tuple3<>(parts[0], Double.parseDouble(parts[1]), Long.parseLong(parts[2])); } }); // 滚动窗口统计每10分钟销售总额 stream.keyBy(0) .window(TumblingEventTimeWindows.of(Time.minutes(10))) .aggregate(new AggregateFunction<Tuple3<String, Double, Long>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>() { @Override public Tuple2<Double, Integer> createAccumulator() { return new Tuple2<>(0.0, 0); } @Override public Tuple2<Double, Integer> add(Tuple3<String, Double, Long> value, Tuple2<Double, Integer> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1); } @Override public Tuple2<Double, Integer> getResult(Tuple2<Double, Integer> accumulator) { return accumulator; } @Override public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } }) .print("Tumbling Window Sales"); // 滑动窗口查看最近30分钟内每10分钟销售趋势 stream.keyBy(0) .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10))) .aggregate(new AggregateFunction<Tuple3<String, Double, Long>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>() { @Override public Tuple2<Double, Integer> createAccumulator() { return new Tuple2<>(0.0, 0); } @Override public Tuple2<Double, Integer> add(Tuple3<String, Double, Long> value, Tuple2<Double, Integer> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1); } @Override public Tuple2<Double, Integer> getResult(Tuple2<Double, Integer> accumulator) { return accumulator; } @Override public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } }) .print("Sliding Window Sales Trend"); env.execute("E - commerce Real - time Sales Analysis");
上述代码中,通过keyBy按照商品类别分组,分别使用滚动事件时间窗口和滑动事件时间窗口进行销售总额统计和销售趋势分析。
通过以上对 Flink 窗口的详细介绍,相信你对 Flink 窗口的使用有了更深入的理解,可以在实际的流处理项目中灵活运用窗口功能来满足各种业务需求。