关于Flink框架窗口(window)函数最全解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。

概述


在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。


Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。

1.png


窗口类型


Flink中的窗口类型有两种:时间窗口(Time Window)、计数窗口(Count Window)。时间窗口中又包含了:滚动时间窗口(Tumbling Window)、滑动时间窗口(Sliding Window)、会话窗口(Session Window)。计数窗口包含了:滚动计数窗口和滑动计数窗口。


滚动窗口(Tumbling Windows)


以时间窗口为例(计数窗口类似),滚动窗口就是按照固定的时间间隔将数据进行切分。

特点就是时间比较对齐、窗口的长度都是固定的且没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。


换句话说:如果制定了一个30分钟时间间隔的滚动窗口,然后就会将无界限的数据以30分钟为一个窗口期进行切割成有限的数据集合。

适用场景:做统计计算。做每个时间段的聚合计算。

1.png


滑动窗口(Sliding Windows)


以时间窗口为例(计数窗口类似),滑动窗口是固定窗口的另一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。


窗口长度是固定的,窗口之间是可以重叠的。


说明:滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。


适用场景:(求某接口最近 5min 的失败率来决定是否要报警)对最近一个时间段内的统计。

2.png


会话窗口(Session Windows)


会话敞口只存在于时间窗口,计数窗口无会话窗口。

特点是时间无对齐

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。


当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去

3.png



Window API使用


窗口分配器window()


在flink中可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。window() 方法接收的输入参数是一个 WindowAssigner WindowAssigner 负责将每条输入的数据分发到正确的 window 中。Flink 提供了通用的 WindowAssigner:滚动窗口(tumbling window)、滑动窗口(sliding window)、 会话窗口(session window)、全局窗口(global window)


代码如下:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        //开窗测试 指定窗口分配器
        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //设置一个15秒的一个滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
                //会话窗口
                //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
                //滑动窗口
                //.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
        env.execute();
    }
}

Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口.


TimeWindow


TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算。

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。


CountWindow


CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。


CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。


创建不同类型的窗口


滚动时间窗口(tumbling time window)


.timeWindow(Time.seconds(15))

滑动时间窗口(sliding time window)


下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素。

.timeWindow(Time.seconds(15),Time.seconds(5))


会话窗口(session window)


.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))

滚动计数窗口(tumbling count window)


默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

.countWindow(10)


滑动计数窗口(sliding count window)


下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 10 个元素。



.countWindow(10,2)

窗口函数


Flink中定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:增量聚合函数、全窗口函数。


增量聚合函数:每条数据到来就进行计算,先保持着一个状态,聚合函数有ReduceFunction AggregateFunction。

1.png

案例代码:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        //开窗测试 指定窗口分配器
        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //对窗口进行聚合操作 增量窗口操作
                .timeWindow(Time.seconds(15))
                .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
                    @Override
                    //创建累加器
                    public Integer createAccumulator() {
                        return 0;
                    }
                    @Override
                    public Integer add(SensorReading sensorReading, Integer accumulator) {
                        return accumulator+1;
                    }
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }
                    @Override
                    public Integer merge(Integer integer, Integer acc1) {
                        return null;
                    }
                });
        resultStream.print();
        env.execute();
    }
}

本地测试:结果输出成功。


1.png

全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。对应的函数:ProcessWindowFunction,WindowFunction

1.png


案例代码:


public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        //构建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设施并行度为1
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
          //全量窗口函数
        DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
                        String id =tuple.getField(0);
                        Long windowEnd =window.getEnd();
                        Integer count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3<>(id,windowEnd,count));
                    }
                });
        resultStream.print();
        env.execute();
    }
}

本地运行测试:结果输出成果

1.png


如何在winodws操作系统下使用nc命令进行代码测试:在Windows操作系统中怎样使用nc命令


其他API


触发器:.trigger() 定义 window 什么时候关闭,触发计算并输出结果

移除器:.evitor() 定义移除某些数据的逻辑

.allowedLateness() 允许处理迟到的数据

.sideOutputLateData() 将迟到的数据放入侧输出流

.getSideOutput()获取侧输出流


案例代码:


public class WindowTest2_CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] field = line.split(",");
            return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
        });
        // 开计数窗口测试
        DataStream<Double> resultStream = dataStream.keyBy("id")
                .countWindow(10, 2)
                .aggregate(new MyAvgTemp());
        //其他可选API 对迟到数据的处理方式
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
        SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                //.trigger()
                //.evictor()
                .allowedLateness(Time.minutes(1))
                //输出到测流
                .sideOutputLateData(outputTag)
                .sum("temperature");
       sumStream.getSideOutput(outputTag).print("late");
        resultStream.print();
        env.execute();
    }
    public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0,0);
        }
        @Override
        public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
        }
        @Override
        public Double getResult(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }
        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
            return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
        }
    }
}

1.png

此图来源于网络,window API 总览



注意点:


在 flink中我们定义一个window,必须在 keyBy操作之后。

窗口函数之后一定要有聚合操作。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7天前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
1月前
|
算法 Linux C++
【Linux系统编程】深入解析Linux中read函数的错误场景
【Linux系统编程】深入解析Linux中read函数的错误场景
205 0
|
1月前
|
存储 并行计算 前端开发
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术(二)
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术
39 1
|
1月前
|
数据安全/隐私保护 C++ 容器
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术(一)
【C++ 函数 基础教程 第五篇】C++深度解析:函数包裹与异步计算的艺术
47 0
|
1月前
|
存储 监控 Linux
【Linux IO多路复用 】 Linux下select函数全解析:驾驭I-O复用的高效之道
【Linux IO多路复用 】 Linux下select函数全解析:驾驭I-O复用的高效之道
54 0
|
3天前
|
BI API 流计算
[实时流基础 flink] 窗口
[实时流基础 flink] 窗口
|
13天前
|
运维 监控 Java
面经:Storm实时计算框架原理与应用场景
【4月更文挑战第11天】本文是关于Apache Storm实时流处理框架的面试攻略和核心原理解析。文章分享了面试常见主题,包括Storm的架构与核心概念(如Spout、Bolt、Topology、Tuple和Ack机制),编程模型与API,部署与运维,以及应用场景与最佳实践。通过代码示例展示了如何构建一个简单的WordCountTopology,强调理解和运用Storm的关键知识点对于面试和实际工作的重要性。
27 4
面经:Storm实时计算框架原理与应用场景
|
14天前
|
SQL API 数据库
Python中的SQLAlchemy框架:深度解析与实战应用
【4月更文挑战第13天】在Python的众多ORM(对象关系映射)框架中,SQLAlchemy以其功能强大、灵活性和易扩展性脱颖而出,成为许多开发者首选的数据库操作工具。本文将深入探讨SQLAlchemy的核心概念、功能特点以及实战应用,帮助读者更好地理解和使用这一框架。
|
16天前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
39 0
|
30天前
|
C语言
字符函数和字符串函数解析及模拟实现
字符函数和字符串函数解析及模拟实现
48 0

推荐镜像

更多