Flink 三种时间窗口、窗口处理函数使用及案例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。

Flink 在数据处理过程中越来越常见,它在流处理领域提供了丰富的窗口机制来处理无界数据流,我们聊下三种时间窗口,包括时间窗口的概念、窗口处理函数的使用以及实际案例。

一、Flink 中的时间概念

在 Flink 中,有三种时间概念:

  1. 事件时间(Event Time):是事件实际发生的时间,通常由事件中的时间戳表示。这是最符合实际情况的时间概念,但也需要处理数据乱序和延迟的情况。
  2. 处理时间(Processing Time):是指数据在 Flink 算子中被处理的时间。处理时间是最简单的时间概念,不需要考虑数据的乱序和延迟,但可能会导致结果不准确。
  3. 摄入时间(Ingestion Time):是数据进入 Flink 系统的时间。摄入时间介于事件时间和处理时间之间,它可以在一定程度上处理数据乱序,但也不能完全保证结果的准确性。

二、三种时间窗口

Flink 提供了三种主要的时间窗口:滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)和会话窗口(Session Windows)。

1. 滚动窗口(Tumbling Windows)

滚动窗口是一种固定大小、不重叠的窗口。每个元素只属于一个窗口。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TumblingWindowExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滚动窗口,窗口大小为 3 秒
        DataStream<Integer> resultStream = inputStream
               .timeWindowAll(Time.seconds(3))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们创建了一个简单的整数数据流,并应用了滚动窗口,窗口大小为 3 秒。最后,我们对每个窗口中的元素求和并打印结果。

2. 滑动窗口(Sliding Windows)

滑动窗口是一种固定大小、可以重叠的窗口。每个元素可以属于多个窗口。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class SlidingWindowExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滑动窗口,窗口大小为 5 秒,滑动步长为 2 秒
        DataStream<Integer> resultStream = inputStream
               .timeWindowAll(Time.seconds(5), Time.seconds(2))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们创建了一个整数数据流,并应用了滑动窗口,窗口大小为 5 秒,滑动步长为 2 秒。这意味着每 2 秒就会有一个新的窗口生成,并且每个窗口包含最近 5 秒内的数据。最后,我们对每个窗口中的元素求和并打印结果。

3. 会话窗口(Session Windows)

会话窗口是一种根据活动间隙划分的窗口。当一段时间内没有数据到达时,会话窗口会关闭。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.SessionWindow;
public class SessionWindowExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用会话窗口,间隙时间为 3 秒
        DataStream<Integer> resultStream = inputStream
               .windowAll(SessionWindows.withGap(Time.seconds(3)))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们创建了一个整数数据流,并应用了会话窗口,间隙时间为 3 秒。这意味着当连续数据之间的时间间隔超过 3 秒时,一个新的会话窗口会开始。最后,我们对每个窗口中的元素求和并打印结果。

三、窗口处理函数

Flink 提供了多种窗口处理函数,用于对窗口中的数据进行计算。以下是一些常见的窗口处理函数:

1. 增量聚合函数(Incremental Aggregation Functions)

增量聚合函数可以在窗口中逐个处理元素,并在处理过程中维护一个中间结果。常见的增量聚合函数有 sum、min、max 和 count 等。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class IncrementalAggregationExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滚动窗口,窗口大小为 3 秒,并使用增量聚合函数求和
        DataStream<Integer> resultStream = inputStream
               .timeWindowAll(Time.seconds(3))
               .sum(0);
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
}

在上述代码中,我们使用了 sum 作为增量聚合函数,对每个滚动窗口中的元素求和。

2. 全窗口函数(Full Window Functions)

全窗口函数在窗口关闭时对窗口中的所有元素进行计算。常见的全窗口函数有 reduce、aggregate 和 apply 等。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class FullWindowFunctionExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个简单的 DataStream
        DataStream<Integer> inputStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 应用滚动窗口,窗口大小为 3 秒,并使用全窗口函数求平均值
        DataStream<Double> resultStream = inputStream
               .timeWindowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)))
               .apply(new AverageWindowFunction());
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
    // 自定义全窗口函数,用于求平均值
    public static class AverageWindowFunction implements org.apache.flink.streaming.api.functions.windowing.AllWindowFunction<Integer, Double, TimeWindow> {
        @Override
        public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double> out) throws Exception {
            int sum = 0;
            int count = 0;
            for (Integer value : values) {
                sum += value;
                count++;
            }
            out.collect((double) sum / count);
        }
    }
}

在上述代码中,我们自定义了一个全窗口函数 AverageWindowFunction,用于在滚动窗口关闭时计算窗口中元素的平均值。

四、案例分析

假设我们有一个电商网站的用户行为数据流,其中每个事件包含用户 ID、商品 ID 和事件时间戳。我们想要分析用户在一段时间内的购买行为,例如计算每个用户在每小时内的购买总额。

代码示例

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class EcommerceAnalyticsExample {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建一个模拟的用户行为数据流
        DataStream<UserAction> inputStream = env.fromElements(
                new UserAction("user1", "product1", 1000L),
                new UserAction("user1", "product2", 1005L),
                new UserAction("user2", "product3", 1010L),
                new UserAction("user1", "product4", 1020L),
                new UserAction("user2", "product5", 1030L),
                new UserAction("user1", "product6", 1040L),
                new UserAction("user2", "product7", 1050L),
                new UserAction("user1", "product8", 1060L),
                new UserAction("user2", "product9", 1070L),
                new UserAction("user1", "product10", 1080L)
        );
        // 应用滚动窗口,窗口大小为 1 小时,并按用户 ID 分组,计算每个用户在每小时内的购买总额
        DataStream<Tuple2<String, Double>> resultStream = inputStream
               .keyBy(UserAction::getUserId)
               .timeWindow(Time.hours(1))
               .sum("price")
               .map(userActionPriceSum -> new Tuple2<>(userActionPriceSum.f0, userActionPriceSum.f1));
        // 打印结果
        resultStream.print();
        // 执行流程序
        env.execute();
    }
    // 自定义用户行为类
    public static class UserAction {
        private String userId;
        private String productId;
        private double price;
        private long eventTimestamp;
        public UserAction(String userId, String productId, long eventTimestamp) {
            this.userId = userId;
            this.productId = productId;
            this.price = 10.0; // 假设每个商品的价格为 10 元
            this.eventTimestamp = eventTimestamp;
        }
        public String getUserId() {
            return userId;
        }
        public String getProductId() {
            return productId;
        }
        public double getPrice() {
            return price;
        }
        public long getEventTimestamp() {
            return eventTimestamp;
        }
    }
}

在上述代码中,我们首先创建了一个模拟的用户行为数据流,其中每个事件包含用户 ID、商品 ID 和事件时间戳。然后,我们应用滚动窗口,窗口大小为 1 小时,并按用户 ID 分组。最后,我们对每个窗口中的元素求和,计算每个用户在每小时内的购买总额,并打印结果。

五、总结

Flink 的窗口机制是处理无界数据流的强大工具。通过三种时间窗口(滚动窗口、滑动窗口和会话窗口)和丰富的窗口处理函数,我们可以灵活地对数据流进行各种分析和计算。在实际应用中,我们需要根据具体的业务需求选择合适的时间窗口和窗口处理函数,以获得准确的结果。同时,我们还需要考虑数据的乱序和延迟等问题,合理地设置时间戳提取器和水印生成器,以确保流处理的准确性和可靠性。

目录
相关文章
|
7天前
|
存储 运维 安全
云上金融量化策略回测方案与最佳实践
2024年11月29日,阿里云在上海举办金融量化策略回测Workshop,汇聚多位行业专家,围绕量化投资的最佳实践、数据隐私安全、量化策略回测方案等议题进行深入探讨。活动特别设计了动手实践环节,帮助参会者亲身体验阿里云产品功能,涵盖EHPC量化回测和Argo Workflows量化回测两大主题,旨在提升量化投研效率与安全性。
云上金融量化策略回测方案与最佳实践
|
9天前
|
人工智能 自然语言处理 前端开发
从0开始打造一款APP:前端+搭建本机服务,定制暖冬卫衣先到先得
通义灵码携手科技博主@玺哥超carry 打造全网第一个完整的、面向普通人的自然语言编程教程。完全使用 AI,再配合简单易懂的方法,只要你会打字,就能真正做出一个完整的应用。
8503 20
|
13天前
|
Cloud Native Apache 流计算
资料合集|Flink Forward Asia 2024 上海站
Apache Flink 年度技术盛会聚焦“回顾过去,展望未来”,涵盖流式湖仓、流批一体、Data+AI 等八大核心议题,近百家厂商参与,深入探讨前沿技术发展。小松鼠为大家整理了 FFA 2024 演讲 PPT ,可在线阅读和下载。
4569 11
资料合集|Flink Forward Asia 2024 上海站
|
13天前
|
自然语言处理 数据可视化 API
Qwen系列模型+GraphRAG/LightRAG/Kotaemon从0开始构建中医方剂大模型知识图谱问答
本文详细记录了作者在短时间内尝试构建中医药知识图谱的过程,涵盖了GraphRAG、LightRAG和Kotaemon三种图RAG架构的对比与应用。通过实际操作,作者不仅展示了如何利用这些工具构建知识图谱,还指出了每种工具的优势和局限性。尽管初步构建的知识图谱在数据处理、实体识别和关系抽取等方面存在不足,但为后续的优化和改进提供了宝贵的经验和方向。此外,文章强调了知识图谱构建不仅仅是技术问题,还需要深入整合领域知识和满足用户需求,体现了跨学科合作的重要性。
|
21天前
|
人工智能 自动驾驶 大数据
预告 | 阿里云邀您参加2024中国生成式AI大会上海站,马上报名
大会以“智能跃进 创造无限”为主题,设置主会场峰会、分会场研讨会及展览区,聚焦大模型、AI Infra等热点议题。阿里云智算集群产品解决方案负责人丛培岩将出席并发表《高性能智算集群设计思考与实践》主题演讲。观众报名现已开放。
|
9天前
|
人工智能 容器
三句话开发一个刮刮乐小游戏!暖ta一整个冬天!
本文介绍了如何利用千问开发一款情侣刮刮乐小游戏,通过三步简单指令实现从单个功能到整体框架,再到多端优化的过程,旨在为生活增添乐趣,促进情感交流。在线体验地址已提供,鼓励读者动手尝试,探索编程与AI结合的无限可能。
三句话开发一个刮刮乐小游戏!暖ta一整个冬天!
|
1月前
|
存储 人工智能 弹性计算
阿里云弹性计算_加速计算专场精华概览 | 2024云栖大会回顾
2024年9月19-21日,2024云栖大会在杭州云栖小镇举行,阿里云智能集团资深技术专家、异构计算产品技术负责人王超等多位产品、技术专家,共同带来了题为《AI Infra的前沿技术与应用实践》的专场session。本次专场重点介绍了阿里云AI Infra 产品架构与技术能力,及用户如何使用阿里云灵骏产品进行AI大模型开发、训练和应用。围绕当下大模型训练和推理的技术难点,专家们分享了如何在阿里云上实现稳定、高效、经济的大模型训练,并通过多个客户案例展示了云上大模型训练的显著优势。
104589 10
|
8天前
|
消息中间件 人工智能 运维
12月更文特别场——寻找用云高手,分享云&AI实践
我们寻找你,用云高手,欢迎分享你的真知灼见!
731 45
|
6天前
|
弹性计算 运维 监控
阿里云云服务诊断工具:合作伙伴架构师的深度洞察与优化建议
作为阿里云的合作伙伴架构师,我深入体验了其云服务诊断工具,该工具通过实时监控与历史趋势分析,自动化检查并提供详细的诊断报告,极大提升了运维效率和系统稳定性,特别在处理ECS实例资源不可用等问题时表现突出。此外,它支持预防性维护,帮助识别潜在问题,减少业务中断。尽管如此,仍建议增强诊断效能、扩大云产品覆盖范围、提供自定义诊断选项、加强教育与培训资源、集成第三方工具,以进一步提升用户体验。
640 243
|
3天前
|
弹性计算 运维 监控
云服务测评 | 基于云服务诊断全方位监管云产品
本文介绍了阿里云的云服务诊断功能,包括健康状态和诊断两大核心功能。作者通过个人账号体验了该服务,指出其在监控云资源状态和快速排查异常方面的优势,同时也提出了一些改进建议,如增加告警配置入口和扩大诊断范围等。