【Flink-API】之复习窗口Window

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 【Flink-API】之复习窗口Window

一、Flink时间


1.1 概念

20200922165717319.png


1.EventTime 时间创建的时间,时间戳描述。

2.Ingestion Time 数据进入到Flink的时间

3.Processing Time 是每一个执行操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time


二、Window简介


2.1 Streaming


这是一种无界的数据流,不断增长的数据流。


2.2 window


window 是将一个无线的stream逻辑上拆分成有限大小的 bucket 筒,进行操作。


三、Window类型


3.1 CountWindow-固定条数窗口


按照指定的数据条数生成一个window,与时间无关


3.1.1 CountWindow.java

public class CountWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //把传进来的数据String换成int类型
        SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });
        //不分组 整体是一个组
        //输入5条后计算
        AllWindowedStream<Integer, GlobalWindow> window = nums.countWindowAll(5);
        //窗口中聚合
        SingleOutputStreamOperator<Integer> sumed = window.sum(0);
        sumed.print();
        env.execute();
    }
}

程序执行结果

20200923084659302.png


3.1.2 CountWindowGroup.java

public class CountWindowGroup {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //spark,3
        //hadoop,2
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //分组后在进行划分
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                String word = fields[0];
                Integer count = Integer.parseInt(fields[1]);
                return Tuple2.of(word, count);
            }
        });
        //1.先分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        //2.划分窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> window = keyed.countWindow(5);
        //3.所有分组达到了条数才会执行 【5条数据全部拿到了,可以进行各种计算】
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1);
        summed.print();
        env.execute();
    }
}

执行结果下:

20200923085625162.png

3.2 Tumbing Window-滚动窗口

20200923083227889.png


特点:特订的步长,比如5S滑动一次。适合做BI统计等等。


3.2.1 TumblingWindowAll.java

public class TumblingWindowAll {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //把传进来的数据String换成int类型
        SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });
        //不分组 5s 中聚合一次
        AllWindowedStream<Integer, TimeWindow> window = nums.timeWindowAll(Time.seconds(5));
        //窗口中聚合
        SingleOutputStreamOperator<Integer> sumed = window.sum(0);
        sumed.print();
        env.execute();
    }
}

3.2.2 TumblingWindowGroup.java

public class TumblingWindowGroup {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //spark,3
        //hadoop,2
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //分组后在进行划分
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                String word = fields[0];
                Integer count = Integer.parseInt(fields[1]);
                return Tuple2.of(word, count);
            }
        });
        //先分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        //划分窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.timeWindow(Time.seconds(5));
        //聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sum(1);
        summed.print();
        env.execute();
    }
}

3.3 Sliging Window-滑动窗口

20200923083542491.png


滑动窗口的大小可以与步长不等大小。窗口固定长度,有重叠。时间对齐。

特点:算趋势。


3.3.1 SlidingWindow.java

public class SlidingWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> lines = env.socketTextStream("192.168.52.200", 8888);
        //把传进来的数据String换成int类型
        SingleOutputStreamOperator<Integer> nums = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });
        //不分组 整体是一个组
        //窗口的长度为10s 5s中滑动一次
        AllWindowedStream<Integer, TimeWindow> window = nums.timeWindowAll(Time.seconds(10), Time.seconds(5));
        //窗口中聚合
        SingleOutputStreamOperator<Integer> sumed = window.sum(0);
        sumed.print();
        env.execute();
    }
}

执行结果

20200923091307889.png

3.4 Session Window-会话窗口

202009230836165.png


按照指定的时间间隔划分一个窗口。


四、WaterMark


1.它是window延迟触发的机制

2.watermark >= 上一个窗口的结束边界就会触发窗口执行

3.watermark = 数据锁携带的时间【窗口的最大时间】- 延迟执行的时间

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
293 0
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
408 0
|
SQL 存储 Unix
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
219 2
|
10月前
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
2020 28
|
11月前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
1321 27
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
226 0
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
140 0
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
174 0
|
SQL 安全 流计算
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
236 1
|
数据处理 调度 双11
Flink四大基石——1.window
Flink四大基石——1.window
145 0
下一篇
oss云网关配置