Flink 四大基石之窗口(Window)使用详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。

一、为什么需要 Window

在流处理场景中,数据是连续不断实时到达的,如果不对数据进行切分,直接处理整个无限流数据是不现实的。窗口(Window)就是为了解决这个问题而存在的,它将无限的数据流按照一定的规则切分成有限大小的 “块”,从而可以对每个 “块” 内的数据进行计算处理。例如,在实时统计网站的访问量场景中,我们可能需要每 5 分钟统计一次访问量,这里的 5 分钟就是一个窗口,通过窗口将连续的访问数据切分成多个 5 分钟的片段进行统计。

二、Window 的控制属性

  1. 窗口大小(Window Size):决定了窗口的时间跨度或数据量大小。例如,时间窗口可以设置为 10 秒、1 分钟等,计数窗口可以设置为每 100 条数据为一个窗口。
  2. 滑动步长(Slide):窗口滑动的距离。如果滑动步长小于窗口大小,会出现窗口重叠的情况;如果滑动步长等于窗口大小,窗口之间是不重叠的;如果滑动步长大于窗口大小,则会出现数据遗漏的情况。例如,窗口大小为 10 分钟,滑动步长为 5 分钟,那么每 5 分钟就会生成一个新的窗口,且新窗口与之前的窗口有 5 分钟的数据重叠。
  3. 偏移量(Offset):窗口的起始位置偏移。通过设置偏移量,可以调整窗口的起始时间点,例如在时间窗口中,偏移量可以使窗口从非整点时间开始。

三、Flink 窗口应用代码结构

在 Flink 中,使用窗口进行流处理的基本代码结构如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> stream = env.socketTextStream("localhost", 9999)
  .map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String value) throws Exception {
            String[] parts = value.split(",");
            return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
        }
    });
stream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .sum(1)
  .print();
env.execute("Window Example");

上述代码中,首先通过env.socketTextStream从 socket 获取数据,然后通过map函数将数据转换为Tuple2类型。接着使用keyBy对数据进行分组,window指定窗口类型为滚动事件时间窗口,大小为 5 秒,最后使用sum函数对窗口内的数据进行求和并打印。

四、Window 的生命周期

  1. 窗口创建:当第一条属于该窗口的数据到达时,窗口被创建。例如,在一个滚动时间窗口中,当第一条数据到达且其时间戳落在该窗口时间范围内时,窗口就会被创建。
  2. 窗口活跃:在窗口的生命周期内,不断有属于该窗口的数据流入,窗口处于活跃状态,持续收集数据。
  3. 窗口触发:当满足窗口的触发条件时,窗口会被触发。触发条件通常与窗口的类型和设置的属性有关,例如时间窗口在到达窗口结束时间时触发,计数窗口在数据量达到设定值时触发。
  4. 窗口销毁:窗口触发计算后,通常会被销毁,释放相关资源。但在某些情况下,如会话窗口,如果后续又有符合条件的数据进入,窗口可能会重新激活。

五、Window 的分类

  1. 时间窗口(Time Window)
  • 滚动时间窗口(Tumbling Time Windows):窗口大小固定,且不重叠。例如,每 10 分钟一个窗口,窗口之间没有数据重叠。
  • 滑动时间窗口(Sliding Time Windows):窗口大小固定,通过滑动步长来控制窗口的重叠情况。如窗口大小为 10 分钟,滑动步长为 5 分钟,则会有 5 分钟的数据重叠。
  • 会话时间窗口(Session Time Windows):窗口由一段时间内不活跃的间隙分隔开,即如果在一段时间内没有数据到达,那么之前的窗口结束,新的数据到达会开启新的窗口。会话窗口的大小不固定,取决于数据的活跃情况。
  1. 计数窗口(Count Window)
  • 滚动计数窗口(Tumbling Count Windows):当窗口内的数据量达到设定的数量时,窗口触发计算,且窗口之间不重叠。例如,每 100 条数据为一个窗口。
  • 滑动计数窗口(Sliding Count Windows):与滚动计数窗口类似,但通过滑动步长来控制窗口的重叠情况。例如,窗口大小为 100 条数据,滑动步长为 50 条数据,则每 50 条数据就会生成一个新窗口,且新窗口与之前的窗口有 50 条数据重叠。

六、Windows Function 窗口函数

  1. 增量聚合函数(Incremental Aggregation Functions):在数据进入窗口时,对窗口内的数据进行增量计算。例如SumFunction、MinFunction、MaxFunction等,它们在每条数据到来时更新聚合结果,而不需要缓存整个窗口的数据。
  2. 全窗口函数(Full Window Functions):需要缓存整个窗口的数据,当窗口触发时,对窗口内的所有数据进行计算。例如WindowFunction,它可以访问窗口的元数据(如窗口的起始时间、结束时间等),并对窗口内的所有数据进行自定义的计算。
  3. ProcessWindowFunction:它是WindowFunction的扩展,不仅可以访问窗口的元数据和窗口内的所有数据,还可以访问上下文信息,如当前的时间戳、状态等,提供了更强大的功能。

七、应用案例

实时流量统计

假设有一个实时监控网站流量的需求,需要每 1 分钟统计一次每个 IP 的访问量。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> stream = env.socketTextStream("localhost", 9999)
  .map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String value) throws Exception {
            String[] parts = value.split(",");
            return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));
        }
    });
stream.keyBy(0)
  .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
  .sum(1)
  .print();
env.execute("Real - time Traffic Statistics");

上述代码通过keyBy按照 IP 地址分组,使用滚动处理时间窗口,每 1 分钟统计一次每个 IP 的访问量并打印。

电商实时销售分析

在电商场景中,需要实时统计每 10 分钟内每个商品类别的销售总额,并且可以根据滑动窗口实时查看最近 30 分钟内每 10 分钟的销售趋势。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<String, Double, Long>> stream = env.socketTextStream("localhost", 9999)
  .map(new MapFunction<String, Tuple3<String, Double, Long>>() {
        @Override
        public Tuple3<String, Double, Long> map(String value) throws Exception {
            String[] parts = value.split(",");
            return new Tuple3<>(parts[0], Double.parseDouble(parts[1]), Long.parseLong(parts[2]));
        }
    });
// 滚动窗口统计每10分钟销售总额
stream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .aggregate(new AggregateFunction<Tuple3<String, Double, Long>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>() {
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0, 0);
        }
        @Override
        public Tuple2<Double, Integer> add(Tuple3<String, Double, Long> value, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1);
        }
        @Override
        public Tuple2<Double, Integer> getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator;
        }
        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
        }
    })
  .print("Tumbling Window Sales");
// 滑动窗口查看最近30分钟内每10分钟销售趋势
stream.keyBy(0)
  .window(SlidingEventTimeWindows.of(Time.minutes(30), Time.minutes(10)))
  .aggregate(new AggregateFunction<Tuple3<String, Double, Long>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>() {
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0, 0);
        }
        @Override
        public Tuple2<Double, Integer> add(Tuple3<String, Double, Long> value, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1);
        }
        @Override
        public Tuple2<Double, Integer> getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator;
        }
        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
        }
    })
  .print("Sliding Window Sales Trend");
env.execute("E - commerce Real - time Sales Analysis");

上述代码中,通过keyBy按照商品类别分组,分别使用滚动事件时间窗口和滑动事件时间窗口进行销售总额统计和销售趋势分析。

通过以上对 Flink 窗口的详细介绍,相信你对 Flink 窗口的使用有了更深入的理解,可以在实际的流处理项目中灵活运用窗口功能来满足各种业务需求。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
29
28
1
168
分享
相关文章
Flink 四大基石之 Checkpoint 使用详解
Flink 的 Checkpoint 机制通过定期插入 Barrier 将数据流切分并进行快照,确保故障时能从最近的 Checkpoint 恢复,保障数据一致性。Checkpoint 分为精确一次和至少一次两种语义,前者确保每个数据仅处理一次,后者允许重复处理但不会丢失数据。此外,Flink 提供多种重启策略,如固定延迟、失败率和无重启策略,以应对不同场景。SavePoint 是手动触发的 Checkpoint,用于作业升级和迁移。Checkpoint 执行流程包括 Barrier 注入、算子状态快照、Barrier 对齐和完成 Checkpoint。
82 20
Flink 四大基石之 Time (时间语义) 的使用详解
Flink 中的时间分为三类:Event Time(事件发生时间)、Ingestion Time(数据进入系统时间)和 Processing Time(数据处理时间)。Event Time 通过嵌入事件中的时间戳准确反映数据顺序,支持复杂窗口操作。Watermark 机制用于处理 Event Time,确保数据完整性并触发窗口计算。Flink 还提供了多种迟到数据处理方式,如默认丢弃、侧输出流和允许延迟处理,以应对不同场景需求。掌握这些时间语义对编写高效、准确的 Flink 应用至关重要。
74 21
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
125 0
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
183 0
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
192 27
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
54 0
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
51 0
数仓系列 | Flink 窗口的应用与实现
本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、OPPO 大数据平台研发负责人张俊老师分享。主要内容如下: 1. 整体思路与学习路径 2. 应用场景与编程模型 3. 工作流程与实现机制
数仓系列 | Flink 窗口的应用与实现
数仓系列 | Flink 窗口的应用与实现
本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、OPPO 大数据平台研发负责人张俊老师分享。主要内容如下: 1. 整体思路与学习路径 2. 应用场景与编程模型 3. 工作流程与实现机制
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等