关于Flink框架窗口(window)函数最全解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。

概述


在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。


Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。

1.png


窗口类型


Flink中的窗口类型有两种:时间窗口(Time Window)、计数窗口(Count Window)。时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)。计数窗口包含了:滚动计数窗口和滑动计数窗口。


滚动窗口(Tumbling Windows)


以时间窗口为例(计数窗口类似),滚动窗口就是按照固定的时间间隔将数据进行切分。

特点就是时间比较对齐、窗口的长度都是固定的且没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。


换句话说:如果制定了一个30分钟时间间隔的滚动窗口,然后就会将无界限的数据以30分钟为一个窗口期进行切割成有限的数据集合。

适用场景:做统计计算。做每个时间段的聚合计算。

1.png


滑动窗口(Sliding Windows)


以时间窗口为例(计数窗口类似),滑动窗口是固定窗口的另一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。


窗口长度是固定的,窗口之间是可以重叠的。


说明:滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。


适用场景:(求某接口最近 5min 的失败率来决定是否要报警)对最近一个时间段内的统计。

2.png


会话窗口(Session Windows)


会话敞口只存在于时间窗口,计数窗口无会话窗口。

特点是时间无对齐

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。


当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去

3.png



Window API使用


窗口分配器window()


在flink中可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。window() 方法接收的输入参数是一个 WindowAssigner WindowAssigner 负责将每条输入的数据分发到正确的 window 中。Flink 提供了通用的 WindowAssigner:滚动窗口(tumbling window)、滑动窗口(sliding window)、 会话窗口(session window)、全局窗口(global window)


代码如下:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        //开窗测试 指定窗口分配器
        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //设置一个15秒的一个滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
                //会话窗口
                //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
                //滑动窗口
                //.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
        env.execute();
    }
}

Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口.


TimeWindow


TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算。

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。


CountWindow


CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。


CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。


创建不同类型的窗口


滚动时间窗口(tumbling time window)


.timeWindow(Time.seconds(15))

滑动时间窗口(sliding time window)


下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素。

.timeWindow(Time.seconds(15),Time.seconds(5))


会话窗口(session window)


.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))

滚动计数窗口(tumbling count window)


默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

.countWindow(10)


滑动计数窗口(sliding count window)


下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 10 个元素。



.countWindow(10,2)

窗口函数


Flink中定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:增量聚合函数、全窗口函数。


增量聚合函数:每条数据到来就进行计算,先保持着一个状态,聚合函数有ReduceFunction AggregateFunction。

1.png

案例代码:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        //开窗测试 指定窗口分配器
        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //对窗口进行聚合操作 增量窗口操作
                .timeWindow(Time.seconds(15))
                .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
                    @Override
                    //创建累加器
                    public Integer createAccumulator() {
                        return 0;
                    }
                    @Override
                    public Integer add(SensorReading sensorReading, Integer accumulator) {
                        return accumulator+1;
                    }
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }
                    @Override
                    public Integer merge(Integer integer, Integer acc1) {
                        return null;
                    }
                });
        resultStream.print();
        env.execute();
    }
}

本地测试:结果输出成功。


1.png

全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。对应的函数:ProcessWindowFunction,WindowFunction

1.png


案例代码:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
          //全量窗口函数
        DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
                        String id =tuple.getField(0);
                        Long windowEnd =window.getEnd();
                        Integer count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3<>(id,windowEnd,count));
                    }
                });
        resultStream.print();
        env.execute();
    }
}

本地运行测试:结果输出成果

1.png


如何在winodws操作系统下使用nc命令进行代码测试:在Windows操作系统中怎样使用nc命令


其他API


触发器:.trigger() 定义 window 什么时候关闭,触发计算并输出结果

移除器:.evitor() 定义移除某些数据的逻辑

.allowedLateness() 允许处理迟到的数据

.sideOutputLateData() 将迟到的数据放入侧输出流

.getSideOutput()获取侧输出流


案例代码:


public class WindowTest2_CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] field = line.split(",");
            return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
        });
        // 开计数窗口测试
        DataStream<Double> resultStream = dataStream.keyBy("id")
                .countWindow(10, 2)
                .aggregate(new MyAvgTemp());
        //其他可选API 对迟到数据的处理方式
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
        SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                //.trigger()
                //.evictor()
                .allowedLateness(Time.minutes(1))
                //输出到测流
                .sideOutputLateData(outputTag)
                .sum("temperature");
       sumStream.getSideOutput(outputTag).print("late");
        resultStream.print();
        env.execute();
    }
    public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0,0);
        }
        @Override
        public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
        }
        @Override
        public Double getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }
        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
        }
    }
}

1.png

此图来源于网络,window API 总览



注意点:


在 flink中我们定义一个window,必须在 keyBy操作之后。

窗口函数之后一定要有聚合操作。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
25 3
|
8天前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
46 3
|
13天前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
34 0
|
13天前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
26 0
|
13天前
|
存储 分布式计算 API
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
大数据-107 Flink 基本概述 适用场景 框架特点 核心组成 生态发展 处理模型 组件架构
43 0
|
1天前
|
存储 Java
深入探讨了Java集合框架中的HashSet和TreeSet,解析了两者在元素存储上的无序与有序特性。
【10月更文挑战第16天】本文深入探讨了Java集合框架中的HashSet和TreeSet,解析了两者在元素存储上的无序与有序特性。HashSet基于哈希表实现,添加元素时根据哈希值分布,遍历时顺序不可预测;而TreeSet利用红黑树结构,按自然顺序或自定义顺序存储元素,确保遍历时有序输出。文章还提供了示例代码,帮助读者更好地理解这两种集合类型的使用场景和内部机制。
9 3
|
14天前
|
人工智能 缓存 Java
深入解析Spring AI框架:在Java应用中实现智能化交互的关键
【10月更文挑战第12天】Spring AI 是 Spring 框架家族的新成员,旨在满足 Java 应用程序对人工智能集成的需求。它支持自然语言处理、图像识别等多种 AI 技术,并提供与云服务(如 OpenAI、Azure Cognitive Services)及本地模型的无缝集成。通过简单的配置和编码,开发者可轻松实现 AI 功能,同时应对模型切换、数据安全及性能优化等挑战。
|
13天前
|
存储
atoi函数解析以及自定义类型经典练习题
atoi函数解析以及自定义类型经典练习题
16 0
|
5天前
|
分布式计算 Java 应用服务中间件
NettyIO框架的深度技术解析与实战
【10月更文挑战第13天】Netty是一个异步事件驱动的网络应用程序框架,由JBOSS提供,现已成为Github上的独立项目。
15 0
|
12天前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
44 0

推荐镜像

更多