【Flink-API】之复习窗口Window

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink-API】之复习窗口Window

一、Flink时间


1.1 概念

20200922165717319.png


1.EventTime 时间创建的时间,时间戳描述。

2.Ingestion Time 数据进入到Flink的时间

3.Processing Time 是每一个执行操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time


二、Window简介


2.1 Streaming


这是一种无界的数据流,不断增长的数据流。


2.2 window


window 是将一个无线的stream逻辑上拆分成有限大小的 bucket 筒,进行操作。


三、Window类型


3.1 CountWindow-固定条数窗口


按照指定的数据条数生成一个window,与时间无关


3.1.1 CountWindow.java

public class CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //把传进来的数据String换成int类型
        SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });
        //不分组 整体是一个组
        //输入5条后计算
        AllWindowedStream<Integer, GlobalWindow> window = nums.countWindowAll(5);
        //窗口中聚合
        SingleOutputStreamOperator<Integer> sumed = window.sum(0);
        sumed.print();
        env.execute();
    }
}

程序执行结果

20200923084659302.png


3.1.2 CountWindowGroup.java

public class CountWindowGroup {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //spark,3
        //hadoop,2
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //分组后在进行划分
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                String word = fields[0];
                Integer count = Integer.parseInt(fields[1]);
                return Tuple2.of(word, count);
            }
        });
        //1.先分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        //2.划分窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> window = keyed.countWindow(5);
        //3.所有分组达到了条数才会执行 【5条数据全部拿到了,可以进行各种计算】
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1);
        summed.print();
        env.execute();
    }
}

执行结果下:

20200923085625162.png

3.2 Tumbing Window-滚动窗口

20200923083227889.png


特点:特订的步长,比如5S滑动一次。适合做BI统计等等。


3.2.1 TumblingWindowAll.java

public class TumblingWindowAll {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //把传进来的数据String换成int类型
        SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });
        //不分组 5s 中聚合一次
        AllWindowedStream<Integer, TimeWindow> window = nums.timeWindowAll(Time.seconds(5));
        //窗口中聚合
        SingleOutputStreamOperator<Integer> sumed = window.sum(0);
        sumed.print();
        env.execute();
    }
}

3.2.2 TumblingWindowGroup.java

public class TumblingWindowGroup {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //spark,3
        //hadoop,2
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //分组后在进行划分
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                String word = fields[0];
                Integer count = Integer.parseInt(fields[1]);
                return Tuple2.of(word, count);
            }
        });
        //先分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        //划分窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.timeWindow(Time.seconds(5));
        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1);
        summed.print();
        env.execute();
    }
}

3.3 Sliging Window-滑动窗口

20200923083542491.png


滑动窗口的大小可以与步长不等大小。窗口固定长度,有重叠。时间对齐。

特点:算趋势。


3.3.1 SlidingWindow.java

public class SlidingWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //把传进来的数据String换成int类型
        SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });
        //不分组 整体是一个组
        //窗口的长度为10s 5s中滑动一次
        AllWindowedStream<Integer, TimeWindow> window = nums.timeWindowAll(Time.seconds(10), Time.seconds(5));
        //窗口中聚合
        SingleOutputStreamOperator<Integer> sumed = window.sum(0);
        sumed.print();
        env.execute();
    }
}

执行结果

20200923091307889.png

3.4 Session Window-会话窗口

202009230836165.png


按照指定的时间间隔划分一个窗口。


四、WaterMark


1.它是window延迟触发的机制

2.watermark >= 上一个窗口的结束边界就会触发窗口执行

3.watermark = 数据锁携带的时间【窗口的最大时间】- 延迟执行的时间

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
5天前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
5天前
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
5天前
|
流计算
Flink窗口——window
Flink窗口——window
24 0
|
7月前
|
消息中间件 程序员 API
Flink中时间和窗口
Flink中时间和窗口
102 0
|
4天前
|
SQL API 数据处理
实时计算 Flink版产品使用合集之如果一个窗口区间没有数据,若不会开窗就没法使用triggers赋默认值
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
15 1
|
7月前
|
程序员 API 数据安全/隐私保护
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
Flink--8、时间语义、水位线(事件和窗口、水位线和窗口的工作原理、生产水位线、水位线的传递、迟到数据的处理)
|
3天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如果窗口中没有数据,但是想要在UDAF中输出一个默认值,该怎么操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
15 0
Qt 窗口常用位置API函数 & 绘图原理 & 双缓冲机制 总结
Qt 窗口常用位置API函数 & 绘图原理 & 双缓冲机制 总结
|
5天前
|
BI API 流计算
[实时流基础 flink] 窗口
[实时流基础 flink] 窗口
|
5天前
|
BI 数据处理 Apache
[AIGC] 深入理解Flink中的窗口、水位线和定时器
[AIGC] 深入理解Flink中的窗口、水位线和定时器

热门文章

最新文章