关于Flink框架窗口(window)函数最全解析

简介: 在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。

概述


在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。


Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。

1.png


窗口类型


Flink中的窗口类型有两种:时间窗口(Time Window)、计数窗口(Count Window)。时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)。计数窗口包含了:滚动计数窗口和滑动计数窗口。


滚动窗口(Tumbling Windows)


以时间窗口为例(计数窗口类似),滚动窗口就是按照固定的时间间隔将数据进行切分。

特点就是时间比较对齐、窗口的长度都是固定的且没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。


换句话说:如果制定了一个30分钟时间间隔的滚动窗口,然后就会将无界限的数据以30分钟为一个窗口期进行切割成有限的数据集合。

适用场景:做统计计算。做每个时间段的聚合计算。

1.png


滑动窗口(Sliding Windows)


以时间窗口为例(计数窗口类似),滑动窗口是固定窗口的另一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。


窗口长度是固定的,窗口之间是可以重叠的。


说明:滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。


适用场景:(求某接口最近 5min 的失败率来决定是否要报警)对最近一个时间段内的统计。

2.png


会话窗口(Session Windows)


会话敞口只存在于时间窗口,计数窗口无会话窗口。

特点是时间无对齐

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。


当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去

3.png



Window API使用


窗口分配器window()


在flink中可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。window() 方法接收的输入参数是一个 WindowAssigner WindowAssigner 负责将每条输入的数据分发到正确的 window 中。Flink 提供了通用的 WindowAssigner:滚动窗口(tumbling window)、滑动窗口(sliding window)、 会话窗口(session window)、全局窗口(global window)


代码如下:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        //开窗测试 指定窗口分配器
        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //设置一个15秒的一个滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
                //会话窗口
                //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
                //滑动窗口
                //.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
        env.execute();
    }
}

Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口.


TimeWindow


TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算。

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。


CountWindow


CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。


CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。


创建不同类型的窗口


滚动时间窗口(tumbling time window)


.timeWindow(Time.seconds(15))

滑动时间窗口(sliding time window)


下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素。

.timeWindow(Time.seconds(15),Time.seconds(5))


会话窗口(session window)


.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))

滚动计数窗口(tumbling count window)


默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

.countWindow(10)


滑动计数窗口(sliding count window)


下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 10 个元素。



.countWindow(10,2)

窗口函数


Flink中定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:增量聚合函数、全窗口函数。


增量聚合函数:每条数据到来就进行计算,先保持着一个状态,聚合函数有ReduceFunction AggregateFunction。

1.png

案例代码:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        //开窗测试 指定窗口分配器
        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //对窗口进行聚合操作 增量窗口操作
                .timeWindow(Time.seconds(15))
                .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
                    @Override
                    //创建累加器
                    public Integer createAccumulator() {
                        return 0;
                    }
                    @Override
                    public Integer add(SensorReading sensorReading, Integer accumulator) {
                        return accumulator+1;
                    }
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }
                    @Override
                    public Integer merge(Integer integer, Integer acc1) {
                        return null;
                    }
                });
        resultStream.print();
        env.execute();
    }
}

本地测试:结果输出成功。


1.png

全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。对应的函数:ProcessWindowFunction,WindowFunction

1.png


案例代码:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
          //全量窗口函数
        DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
                        String id =tuple.getField(0);
                        Long windowEnd =window.getEnd();
                        Integer count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3<>(id,windowEnd,count));
                    }
                });
        resultStream.print();
        env.execute();
    }
}

本地运行测试:结果输出成果

1.png


如何在winodws操作系统下使用nc命令进行代码测试:在Windows操作系统中怎样使用nc命令


其他API


触发器:.trigger() 定义 window 什么时候关闭,触发计算并输出结果

移除器:.evitor() 定义移除某些数据的逻辑

.allowedLateness() 允许处理迟到的数据

.sideOutputLateData() 将迟到的数据放入侧输出流

.getSideOutput()获取侧输出流


案例代码:


public class WindowTest2_CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] field = line.split(",");
            return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
        });
        // 开计数窗口测试
        DataStream<Double> resultStream = dataStream.keyBy("id")
                .countWindow(10, 2)
                .aggregate(new MyAvgTemp());
        //其他可选API 对迟到数据的处理方式
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
        SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                //.trigger()
                //.evictor()
                .allowedLateness(Time.minutes(1))
                //输出到测流
                .sideOutputLateData(outputTag)
                .sum("temperature");
       sumStream.getSideOutput(outputTag).print("late");
        resultStream.print();
        env.execute();
    }
    public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0,0);
        }
        @Override
        public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
        }
        @Override
        public Double getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }
        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
        }
    }
}

1.png

此图来源于网络,window API 总览



注意点:


在 flink中我们定义一个window,必须在 keyBy操作之后。

窗口函数之后一定要有聚合操作。


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL 数据挖掘 测试技术
南大通用GBase8s数据库:LISTAGG函数的解析
南大通用GBase8s数据库:LISTAGG函数的解析
|
存储 Java
深入探讨了Java集合框架中的HashSet和TreeSet,解析了两者在元素存储上的无序与有序特性。
【10月更文挑战第16天】本文深入探讨了Java集合框架中的HashSet和TreeSet,解析了两者在元素存储上的无序与有序特性。HashSet基于哈希表实现,添加元素时根据哈希值分布,遍历时顺序不可预测;而TreeSet利用红黑树结构,按自然顺序或自定义顺序存储元素,确保遍历时有序输出。文章还提供了示例代码,帮助读者更好地理解这两种集合类型的使用场景和内部机制。
217 3
|
人工智能 API 开发者
HarmonyOS Next~鸿蒙应用框架开发实战:Ability Kit与Accessibility Kit深度解析
本书深入解析HarmonyOS应用框架开发,聚焦Ability Kit与Accessibility Kit两大核心组件。Ability Kit通过FA/PA双引擎架构实现跨设备协同,支持分布式能力开发;Accessibility Kit提供无障碍服务构建方案,优化用户体验。内容涵盖设计理念、实践案例、调试优化及未来演进方向,助力开发者打造高效、包容的分布式应用,体现HarmonyOS生态价值。
810 27
|
人工智能 自然语言处理 搜索推荐
ViDoRAG:开源多模态文档检索框架,多智能体推理+图文理解精准解析文档
ViDoRAG 是阿里巴巴通义实验室联合中国科学技术大学和上海交通大学推出的视觉文档检索增强生成框架,基于多智能体协作和动态迭代推理,显著提升复杂视觉文档的检索和生成效率。
979 8
ViDoRAG:开源多模态文档检索框架,多智能体推理+图文理解精准解析文档
|
机器学习/深度学习 人工智能 Java
Java机器学习实战:基于DJL框架的手写数字识别全解析
在人工智能蓬勃发展的今天,Python凭借丰富的生态库(如TensorFlow、PyTorch)成为AI开发的首选语言。但Java作为企业级应用的基石,其在生产环境部署、性能优化和工程化方面的优势不容忽视。DJL(Deep Java Library)的出现完美填补了Java在深度学习领域的空白,它提供了一套统一的API,允许开发者无缝对接主流深度学习框架,将AI模型高效部署到Java生态中。本文将通过手写数字识别的完整流程,深入解析DJL框架的核心机制与应用实践。
839 3
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
2432 28
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
1831 27
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
582 16
|
C语言 开发者
【C语言】断言函数 -《深入解析C语言调试利器 !》
断言(assert)是一种调试工具,用于在程序运行时检查某些条件是否成立。如果条件不成立,断言会触发错误,并通常会终止程序的执行。断言有助于在开发和测试阶段捕捉逻辑错误。
449 5

热门文章

最新文章

推荐镜像

更多
  • DNS
  • 下一篇
    开通oss服务