Flink 三种时间窗口、窗口处理函数使用及案例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。

Flink 在数据处理过程中越来越常见,它在流处理领域提供了丰富的窗口机制来处理无界数据流,我们聊下三种时间窗口,包括时间窗口的概念、窗口处理函数的使用以及实际案例。

一、Flink 中的时间概念

在 Flink 中,有三种时间概念:

  1. 事件时间(Event Time):是事件实际发生的时间,通常由事件中的时间戳表示。这是最符合实际情况的时间概念,但也需要处理数据乱序和延迟的情况。
  2. 处理时间(Processing Time):是指数据在 Flink 算子中被处理的时间。处理时间是最简单的时间概念,不需要考虑数据的乱序和延迟,但可能会导致结果不准确。
  3. 摄入时间(Ingestion Time):是数据进入 Flink 系统的时间。摄入时间介于事件时间和处理时间之间,它可以在一定程度上处理数据乱序,但也不能完全保证结果的准确性。

二、三种时间窗口

Flink 提供了三种主要的时间窗口:滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)和会话窗口(Session Windows)。

1. 滚动窗口(Tumbling Windows)

滚动窗口是一种固定大小、不重叠的窗口。每个元素只属于一个窗口。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TumblingWindowExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滚动窗口,窗口大小为 3 秒
        DataStream<Integer> resultStream = inputStream
               .timeWindowAll(Time.seconds(3))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们创建了一个简单的整数数据流,并应用了滚动窗口,窗口大小为 3 秒。最后,我们对每个窗口中的元素求和并打印结果。

2. 滑动窗口(Sliding Windows)

滑动窗口是一种固定大小、可以重叠的窗口。每个元素可以属于多个窗口。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class SlidingWindowExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滑动窗口,窗口大小为 5 秒,滑动步长为 2 秒
        DataStream<Integer> resultStream = inputStream
               .timeWindowAll(Time.seconds(5), Time.seconds(2))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们创建了一个整数数据流,并应用了滑动窗口,窗口大小为 5 秒,滑动步长为 2 秒。这意味着每 2 秒就会有一个新的窗口生成,并且每个窗口包含最近 5 秒内的数据。最后,我们对每个窗口中的元素求和并打印结果。

3. 会话窗口(Session Windows)

会话窗口是一种根据活动间隙划分的窗口。当一段时间内没有数据到达时,会话窗口会关闭。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.SessionWindow;
public class SessionWindowExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用会话窗口,间隙时间为 3 秒
        DataStream<Integer> resultStream = inputStream
               .windowAll(SessionWindows.withGap(Time.seconds(3)))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们创建了一个整数数据流,并应用了会话窗口,间隙时间为 3 秒。这意味着当连续数据之间的时间间隔超过 3 秒时,一个新的会话窗口会开始。最后,我们对每个窗口中的元素求和并打印结果。

三、窗口处理函数

Flink 提供了多种窗口处理函数,用于对窗口中的数据进行计算。以下是一些常见的窗口处理函数:

1. 增量聚合函数(Incremental Aggregation Functions)

增量聚合函数可以在窗口中逐个处理元素,并在处理过程中维护一个中间结果。常见的增量聚合函数有 sum、min、max 和 count 等。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class IncrementalAggregationExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滚动窗口,窗口大小为 3 秒,并使用增量聚合函数求和
        DataStream<Integer> resultStream = inputStream
               .timeWindowAll(Time.seconds(3))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们使用了 sum 作为增量聚合函数,对每个滚动窗口中的元素求和。

2. 全窗口函数(Full Window Functions)

全窗口函数在窗口关闭时对窗口中的所有元素进行计算。常见的全窗口函数有 reduce、aggregate 和 apply 等。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class FullWindowFunctionExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滚动窗口,窗口大小为 3 秒,并使用全窗口函数求平均值
        DataStream<Double> resultStream = inputStream
               .timeWindowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)))
               .apply(new AverageWindowFunction());
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
    // 自定义全窗口函数,用于求平均值
    public static class AverageWindowFunction implements org.apache.flink.streaming.api.functions.windowing.AllWindowFunction<Integer, Double, TimeWindow> {
        @Override
        public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double> out) throws Exception {
            int sum = 0;
            int count = 0;
            for (Integer value : values) {
                sum += value;
                count++;
            }
            out.collect((double) sum / count);
        }
    }
}

在上述代码中,我们自定义了一个全窗口函数 AverageWindowFunction,用于在滚动窗口关闭时计算窗口中元素的平均值。

四、案例分析

假设我们有一个电商网站的用户行为数据流,其中每个事件包含用户 ID、商品 ID 和事件时间戳。我们想要分析用户在一段时间内的购买行为,例如计算每个用户在每小时内的购买总额。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class EcommerceAnalyticsExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个模拟的用户行为数据流
        DataStream<UserAction> inputStream = env.fromElements(
                new UserAction("user1", "product1", 1000L),
                new UserAction("user1", "product2", 1005L),
                new UserAction("user2", "product3", 1010L),
                new UserAction("user1", "product4", 1020L),
                new UserAction("user2", "product5", 1030L),
                new UserAction("user1", "product6", 1040L),
                new UserAction("user2", "product7", 1050L),
                new UserAction("user1", "product8", 1060L),
                new UserAction("user2", "product9", 1070L),
                new UserAction("user1", "product10", 1080L)
        );
        // 应用滚动窗口,窗口大小为 1 小时,并按用户 ID 分组,计算每个用户在每小时内的购买总额
        DataStream<Tuple2<String, Double>> resultStream = inputStream
               .keyBy(UserAction::getUserId)
               .timeWindow(Time.hours(1))
               .sum("price")
               .map(userActionPriceSum -> new Tuple2<>(userActionPriceSum.f0, userActionPriceSum.f1));
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
    // 自定义用户行为类
    public static class UserAction {
        private String userId;
        private String productId;
        private double price;
        private long eventTimestamp;
        public UserAction(String userId, String productId, long eventTimestamp) {
            this.userId = userId;
            this.productId = productId;
            this.price = 10.0; // 假设每个商品的价格为 10 元
            this.eventTimestamp = eventTimestamp;
        }
        public String getUserId() {
            return userId;
        }
        public String getProductId() {
            return productId;
        }
        public double getPrice() {
            return price;
        }
        public long getEventTimestamp() {
            return eventTimestamp;
        }
    }
}

在上述代码中,我们首先创建了一个模拟的用户行为数据流,其中每个事件包含用户 ID、商品 ID 和事件时间戳。然后,我们应用滚动窗口,窗口大小为 1 小时,并按用户 ID 分组。最后,我们对每个窗口中的元素求和,计算每个用户在每小时内的购买总额,并打印结果。

五、总结

Flink 的窗口机制是处理无界数据流的强大工具。通过三种时间窗口(滚动窗口、滑动窗口和会话窗口)和丰富的窗口处理函数,我们可以灵活地对数据流进行各种分析和计算。在实际应用中,我们需要根据具体的业务需求选择合适的时间窗口和窗口处理函数,以获得准确的结果。同时,我们还需要考虑数据的乱序和延迟等问题,合理地设置时间戳提取器和水印生成器,以确保流处理的准确性和可靠性。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
219 28
|
4月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
122 0
|
4月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
132 0
|
4月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
234 0
|
4月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
380 0
|
2月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
542 2
探索Flink动态CEP:杭州银行的实战案例
|
6月前
|
SQL 存储 Unix
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
94 2
|
4月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
90 0
|
4月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
137 0
|
4月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
297 0

相关产品

  • 实时计算 Flink版