Flink教程(11)- Flink高级API(Window)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink教程(11)- Flink高级API(Window)

01 引言

在前面的博客,我们已经对Flink批流一体API的使用有了一定的了解了,有兴趣的同学可以参阅下:

在前面的教程,我们知道Flink的四大基石十分重要,如下图,本文先讲解下Window

02 Window

流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算,Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

2.1 为什么需要Window?

在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。

在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算

2.2 Window分类

2.2.1 按照time和count分类

  • 时间窗口(time-window) :根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据
  • 数量窗口(count-window):根据数量划分窗口,如:每xx个数据统计最近xx个数据

2.2.2 按照slide和size分类

窗口有两个重要的属性,窗口大小size和滑动间隔slide,根据它们的大小关系可分为:

  • 滚动窗口(tumbling-window): size=slide,比如: 每隔10s统计最近10s的数据
  • 滑动窗口(sliding-window):size>slide,比如:每隔5s统计最近10s的数据

注意:当size<slide的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失,所有开发中不用。

2.2.3 总结

按照上面窗口的分类方式进行组合,可以得出如下的窗口:

分类 使用频率
基于时间的滚动窗口:tumbling-time-window 用的较多
基于时间的滑动窗口:sliding-time-window 用的较多
基于数量的滚动窗口:tumbling-count-window 用的较少
基于数量的滑动窗口:sliding-count-window 用的较少

注意:Flink还支持一个特殊的窗口,即 Session会话窗口,需要设置一个会话超时时间,如30s:则表示30s内没有数据到来,则触发上个窗口的计算。

2.3 Window API

2.3.1 window和windowAll

何时使用:

  • 使用keyby的流,应该使用window方法
  • 未使用keyby的流,应该调用windowAll方法

2.3.2 WindowAssigner

window/windowAll 方法接收的输入是一个 WindowAssignerWindowAssigner负责将每条输入的数据分发到正确的 window中,Flink提供了很多各种场景用的WindowAssigner

如果需要自己定制数据分发策略,则可以实现一个 class,继承自WindowAssigner

2.3.3 evictor

evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.EvictorevicBeforeevicAfter两个方法。

Flink 提供了如下三种通用的 evictor:

  • CountEvictor 保留指定数量的元素
  • TimeEvictor设定一个阈值 interval,删除所有不再 max_ts - interval范围内的元
    素,其中 max_ts 是窗口内时间戳的最大值。
  • DeltaEvictor通过执行用户给定的 DeltaFunction 以及预设的theshold,判断是否删除一个元素。

2.3.4 trigger

trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,如果默认的trigger不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

  • onElement() :每次往 window 增加一个元素的时候都会触发
  • onEventTime() :当 event-time timer 被触发的时候会调用
  • onProcessingTime() :当 processing-time timer被触发的时候会调用
  • onMerge() :对两个 riggerstate 进行merge 操作
  • clear()window销毁的时候被调用

上面的接口中前三个会返回一个 TriggerResultTriggerResult有如下几种可能的选

择:

  • CONTINUE 不做任何事情;
  • FIRE 触发window
  • PURGE 清空整个 window 的元素并销毁窗口;
  • FIRE_AND_PURGE 触发窗口,然后销毁窗口。

2.3.5 API调用示例

source.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
source.keyBy(0).
timeWindow(Time.seconds(5))

03 Window案例演示

3.1 基于时间的滚动和滑动窗口

需求1:基于时间的滚动窗口 – 每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量

需求2:基于时间的滑动窗口 --每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量

模拟数据如下(信号灯编号和通过该信号灯的车的数量):

9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4

代码实现:

/**
 * TimeWindow
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 6:35 下午
 */
public class TimeWindow {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //将9,3转为CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });
        //分组
        //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
        // * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口
        //timeWindow(Time size窗口大小, Time slide滑动间隔)
        SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                //.timeWindow(Time.seconds(5))//当size==slide,可以只写一个
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum("count");
        // * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口
        SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                //.timeWindow(Time.seconds(10), Time.seconds(5))
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .sum("count");
        //4.Sink
        /*
        1,5
        2,5
        3,5
        4,5
        */
        //result1.print();
        result2.print();
        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

3.2 基于数量的滚动和滑动窗口

需求1:基于数量的滚动窗口:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计

需求2:基于数量的滑动窗口:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计

示例代码如下:

/**
 * CountWindow
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 6:40 下午
 */
public class CountWindow {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //将9,3转为CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });
        //分组
        //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");
        // * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
        //countWindow(long size, long slide)
        SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                //.countWindow(5L, 5L)
                .countWindow(5L)
                .sum("count");
        // * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
        //countWindow(long size, long slide)
        SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                .countWindow(5L, 3L)
                .sum("count");
        //4.Sink
        //result1.print();
        /*
        1,1
        1,1
        1,1
        1,1
        2,1
        1,1
         */
        result2.print();
        /*
        1,1
        1,1
        2,1
        1,1
        2,1
        3,1
        4,1
         */
        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

3.3 会话窗口

需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

示例代码如下:

/**
 * SessionWindow
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 6:42 下午
 */
public class SessionWindow {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //将9,3转为CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });
        //需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!)
        SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId)
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                .sum("count");
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

04 文末

本文主要讲解Flink高级APIWindow,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
10天前
|
API
车牌号归属地查询免费API接口教程
本接口用于根据车牌号查询社会车辆的归属地,不支持军车、使馆等特殊车牌。请求地址为 `https://cn.apihz.cn/api/other/chepai.php`,支持 POST 和 GET 请求。请求参数包括 `id`、`key` 和 `words`,返回数据包含车牌归属地信息。示例请求:`https://cn.apihz.cn/api/other/chepai.php?id=88888888&key=88888888&words=川B1234`。
46 21
|
1月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
95 0
|
9天前
|
API
获取网页重定向地址免费API接口教程
该API用于获取网页重定向跳转后的最终地址。请求地址为`https://cn.apihz.cn/api/wangzhan/tiaozhuan.php`,支持POST或GET方式。请求参数包括`id`、`key`和`url`,返回数据包含状态码`code`和最终URL`url`。示例返回:`{&quot;code&quot;:200,&quot;url&quot;:&quot;https://www.baidu.com/&quot;}`。
51 29
|
13天前
|
API
将秒数转换为时间免费API接口教程
该API用于将指定秒数转换为年、日、时、分、秒。支持指定转换类型。请求地址为 `https://cn.apihz.cn/api/time/stime.php`,需提供ID、密钥、类型和秒数参数。返回结果包含转换后的年、日、时、分、秒等信息。示例请求:`https://cn.apihz.cn/api/time/stime.php?id=88888888&key=88888888&type=1&s=123456`。更多详情见 [文档](https://www.apihz.cn/api/timestime.html)。
将秒数转换为时间免费API接口教程
|
5天前
|
API
天气预报1天-中国气象局-地址查询版免费API接口教程
此接口提供中国气象局官方的当日天气信息,支持POST和GET请求,需提供用户ID、KEY、省份及具体地点。返回数据包括状态码、消息、天气详情等。示例中使用的ID与KEY为公共测试用,建议使用个人ID与KEY以享受更高调用频次。
|
10天前
|
网络协议 API
检测指定TCP端口开放状态免费API接口教程
该API用于检测目标主机指定TCP端口是否开放,适用于检测连通状态等场景。支持指定大陆、美国、香港等检测节点。请求地址为 `https://cn.apihz.cn/api/wangzhan/port.php`,支持POST和GET请求方式。请求参数包括 `id`、`key`、`type`、`host` 和 `port`。返回参数包含检测结果和状态码。示例请求:`https://cn.apihz.cn/api/wangzhan/port.php?id=88888888&key=88888888&type=1&host=49.234.56.78&port=80`。
|
8天前
|
API 数据安全/隐私保护
抖音视频,图集无水印直链解析免费API接口教程
该接口用于解析抖音视频和图集的无水印直链地址。请求地址为 `https://cn.apihz.cn/api/fun/douyin.php`,支持POST或GET请求。请求参数包括用户ID、用户KEY和视频或图集地址。返回参数包括状态码、信息提示、作者昵称、标题、视频地址、封面、图集和类型。示例请求和返回数据详见文档。
|
13天前
|
API
图片压缩+格式转换免费API接口教程
这是一个免费的图片压缩和格式转换API接口,支持GET和POST请求。请求地址为 `https://cn.apihz.cn/api/img/yasuo.php`,需提供 `id`、`key`、`img` 等参数。返回数据包含处理后的图片URL和其他相关信息。更多详情请参考:https://www.apihz.cn/api/imgyasuo.html
|
13天前
|
API
天气预报-腾讯天气-7天-IP查询版免费API接口教程
根据IP地址自动查询该IP归属地7天天气预报的腾讯天气API。请求地址为`https://cn.apihz.cn/api/tianqi/tengxunip.php`,支持GET和POST请求。需提供ID、Key和IP地址作为参数。返回数据包含天气预报信息。
|
12天前
|
前端开发 JavaScript API
取网页纯文本内容免费API接口教程
该API用于获取指定网页的纯文本内容,去除HTML标签、CSS和JS等元素。支持POST和GET请求,需提供ID、Key、URL等参数。请求示例:https://cn.apihz.cn/api/wangzhan/getyuan.php?id=88888888&key=88888888&url=www.apihz.cn&dy=1。返回纯文本数据。

热门文章

最新文章

下一篇
无影云桌面