带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。

提醒:本文的示例代码基于flink1.13,在讲window的使用时也会说明flink版本一些api的弃用情况。


一、Time的简介

flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。


Event Time:指事件产生的时间,比如业务数据库一条数据产生的时间、一条日志数据产生的时间等。


Ingestion Time:指flink接收数据的时间。


Processing Time:指数据被flink算子处理的时间。


在真实的业务代码开发中,我们常使用Event TIme、Processing Time。


二、Window的概念


flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。


01.png


三、Window的类型


1、分类关系

* TimeWindow(计时窗口),按照一定时间生成 Window(比如:每5秒

* CountWindow(计数窗口),按照指定的数据量生成一个 Window,与时间无关(比如:每20个元素)。

* TumblingWindow(滚动窗口)

* Sliding Window(滑动窗口)

* Session Window(会话窗口)


它们之间的关系图


02.png




2、滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片,它的特点是时间对齐,窗口长度固定,没有重叠。例如:如果你指定了一个 5分钟大小的滚动窗口,窗口的创建如下图所示:


03.png


适合做每个时间段的聚合计算,例如:统计每5分钟内用户的热搜词



3、滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成,特点为时间对齐,窗口长度固定,可以有重叠。例如:你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据,如下图所示:


04.png


适用于对最近一个时间段内的统计,例如:求某接口最近 5min 的失败率来决定是否要报警这种场景。



4、会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。他的特点为:窗口大小是由数据本身决定,它没有固定的开始和结束时间。会话窗口根据Session gap间隙切分不同的窗口,当一个窗口在大于Session gap间隙的时间内没有接收到新数据时,窗口将关闭。例如:设置的时间gap是5秒,那么,当相邻的记录相差>=5秒时,则触发窗口。


05.png



适用于每个用户在一个独立的session中平均页面访问时长,前后两个session的间隔时间为15分钟这种场景。


四、windows 的使用


window算子api的使用分有key的、无key的,它们api分别的写法如下:


Keyed Windows

06.png



Non-Keyed Windows

07.png


我们下面按照上面的window分类关系去讲解api的使用,且举得例子都是Keyed Windows的,可以类比使用对应api理解Non-Keyed Windows。


1、Time Window

在time Window中,我们经常会在flink的老版本中使用timeWindow,如下图:

08.png



09.png


输入一个Time.seconds(n)是滚动窗口,输入两个是滑动窗口。特别注意,这里默认使用的Time是Processing Time。


在flink1.13中方法已经过时,能用但不建议使用。请使用原生window进行使用,如下图:

10.png

在window的参数中指定使用的窗口类型、时间类型,这个示例可以看出使用的是滚动窗口,时间类型为Processing Time


2、Count Window


Count Window中我们常在flink中使用

countWindow(高版本中没有过时),传入一个参数就是滚动窗口,如果传入两个参数就是滑动窗口,如下图:

11.png


12.png


3、自定义Window

如下图Keyed Windows、Non-Keyed Windows两种使用自定义的接口, 可以自己定义trigger(触发器)、evictor(移除器)、allowedLateness(允许窗口延时)、sideOutputLateData(侧输出流)等,自定义window。

13.png


4、示例


4.1 滚动窗口示例


需求:每隔5s时间,统计最近5s出现的单词


代码:

```

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.api.scala._

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows


/**

 * todo: 基于计时的滚动窗口应用--Tumbling Windows

 *

 * 滚动窗口,每隔5s时间,统计最近5s出现的单词

 */

object TestTimeWindowByTumbling {


 def main(args: Array[String]): Unit = {

   //todo:1、获取流式处理的环境

   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


   //todo:2、获取数据源

   val socketTextStream: DataStream[String] = env.socketTextStream("companynode01",19999)


   //todo: 3、对数据进行操作处理

   socketTextStream.flatMap(x=>x.split(" "))

     .map(x=>(x, 1))

     .keyBy(_._1)

     // .timeWindow(Time.seconds(5)) //过时

     // 滚动窗口

     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

     .sum(1)

     .print()


   //todo: 4、启动

   env.execute("TestTimeWindowByTumbling")


 }

}

```

4.2 滑动窗口示例


需求:每隔5s时间,统计最近10s出现的单词


代码:

```

import org.apache.flink.api.scala._

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows

import org.apache.flink.streaming.api.windowing.time.Time


/**

 * todo: 基于计时的滚动窗口应用--Sliding Windows

 *

 * 滑动窗口,每隔5s时间,统计最近10s出现的单词

 */

object TestTimeWindowBySliding {


 def main(args: Array[String]): Unit = {

   //todo:1、获取流式处理的环境

   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


   //todo:2、获取数据源

   val socketTextStream: DataStream[String] = env.socketTextStream("companynode01",19999)


   //todo: 3、对数据进行操作处理

   socketTextStream.flatMap(x=>x.split(" "))

                   .map(x=>(x, 1))

                   .keyBy(_._1)

//                    .timeWindow(Time.seconds(15),Time.seconds(5)) //过时

                   //滑动窗口

                   .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))

                   .sum(1)

                   .print()


   //todo: 4、启动

   env.execute("TestTimeWindowBySliding")


 }

}

```



五、window Function(窗口函数)


1、分类

窗口函数定义了要对窗口中收集的数据做的计算操作。主要可以分为两类:

* 增量聚合函数(incremental aggregation functions),它在每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction、AggregateFunction等。

* 全量窗口函数(full window functions),它先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。典型的全量窗口聚合函数有apply、process。


2、增量聚合统计


窗口当中每加入一条数据,就进行一次统计。常用的增量聚合算子有reduce(reduceFunction)、aggregate(aggregateFunction)sum()、min()、max()等。


14.png


示例:


需求:通过接收socket当中输入的单词,统计每5秒钟单词的累计数量


使用reduce的代码

```

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}

//import org.apache.flink.api.java.functions.KeySelector

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

import org.apache.flink.api.scala._



/**

* 增量聚合函数

* 通过接收socket当中输入的数据,统计每5秒钟数据的累计的值

* 基于 `reduce` 函数的计时窗口数据增量聚合

*/

object TestReduceOfTimeWindow {


 def main(args: Array[String]): Unit = {

   val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


   val socketTextStream: DataStream[String] = env.socketTextStream("companynode01", 19999)


   socketTextStream.flatMap(x => x.split(" "))

     .map(x=>(x, 1))

//      .keyBy(0)

     .keyBy(_._1)

     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

     .reduce((c1, c2) => (c1._1, c1._2+c2._2))

     .print()


   env.execute()

 }


}

```


使用aggregate的代码


```

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

import org.apache.flink.api.scala._

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.api.common.functions.AggregateFunction


/**

* 增量聚合函数

* 基于`aggregate`函数的计时窗口数据增量聚合

*/

object TestAggregateOfTimeWindow {


 def main(args: Array[String]): Unit = {


   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


   val socketTextStream: DataStream[String] = env.socketTextStream("companynode01", 9999)


   socketTextStream.flatMap(x => x.split(" "))

     .map(x => (x, 1))

     .keyBy(_._1)

     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

     .aggregate(new MyAggregateFunction)

     .print()


   env.execute("TestAggregateOfTimeWindow")

 }

}


//自定义AggregateFunction函数

class MyAggregateFunction extends AggregateFunction[(String, Int), (String, Int), (String, Int)] {


 var initAccumulator: (String, Int) = ("", 0)


 //累加值的初始化操作

 override def createAccumulator(): (String, Int) = {

   initAccumulator

 }


 //累加元素

 override def add(in: (String, Int), acc: (String, Int)): (String, Int) = {

   (in._1, acc._2 + in._2)

 }


 // 聚合的结果

 override def getResult(acc: (String, Int)): (String, Int) = {

   acc

 }


 //分布式累加

 override def merge(acc: (String, Int), acc1: (String, Int)): (String, Int) = {

   (acc._1, acc._2 + acc1._2)

 }

}

```


3、全量聚合统计


等到窗口截止,或者窗口内的数据全部到齐,然后再进行统计。


常用的增量聚合算子有apply(windowFunction)、process(processWindowFunction)其中processWindowFunction 比 windowFunction 提供了更多的上下文信息。


示例:


需求:通过接收socket当中输入的数值,统计5秒钟输入数值的平均值。


使用apply函数实现的代码:

```

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

import org.apache.flink.api.scala._

import org.apache.flink.streaming.api.scala.function.WindowFunction

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

import org.apache.flink.util.Collector


object TestApplyOfTimeWindow {


 def main(args: Array[String]): Unit = {

   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


   //2、获取数据源,例如输入如下数据源

   /**

    * 1

    * 2

    * 3

    * 4

    * 5

    * 6

    */

   val socketStream: DataStream[String] = env.socketTextStream("companynode01", 19999)


   socketStream.flatMap(x => x.split(" "))

     .map(x => ("countAvg", x.toInt))

//      .keyBy(0)

     //keyBy中的key是个虚拟key,不会输出

     .keyBy(_._1)

     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

     .apply(new MyApplyWindowFunction)

     .print()



   env.execute("TestApplyOfTimeWindow")

 }


}


class MyApplyWindowFunction extends WindowFunction[(String, Int), Double, String, TimeWindow]{

 override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[Double]): Unit = {

   //记次数

   var totalNum = 0

   //记累加结果

   var countNum = 0


   for(elem <- input) {

     totalNum += 1

     countNum += elem._2

   }


   out.collect(countNum/totalNum)


 }


}

```


使用process函数实现的代码:

```

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

import org.apache.flink.api.scala._

import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction

import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.windowing.windows.TimeWindow

import org.apache.flink.util.Collector


/**

* 基于 `process` 函数的计时窗口数据全量聚合

*/

object TestProcessOfTimeWindow {


 def main(args: Array[String]): Unit = {

   val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


   val socketStream: DataStream[String] = env.socketTextStream("companynode01", 19999)


   socketStream.flatMap(x => x.split(" "))

     .map(x => ("countAvg", x.toInt))

     .keyBy(x => x._1)

     //是使用的processingTime,接口已经过时

//      .timeWindow(Time.seconds(5))

     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

     .process(new MyProcessWindowFunction)

     .print()

   env.execute("TestProcessOfTimeWindow")

 }


}


class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int), Double, String, TimeWindow] {

 override def process(key: String, context: Context,

                      elements: Iterable[(String, Int)],

                      out: Collector[Double]): Unit = {


   //计次数

   var totalNum = 0

   //计累加次数

   var countNum = 0


   for (elem <- elements) {

     totalNum += 1

     countNum += elem._2

   }


   //计算平均值, totalNum.asInstanceOf[Double]是将totalNum强转成Double类型

   out.collect(countNum/totalNum.asInstanceOf[Double])


 }

}

```

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
110 0
|
2月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
113 0
|
6天前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
65 27
|
2月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
44 0
|
2月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
40 0
|
1月前
|
网络安全 Windows
Windows server 2012R2系统安装远程桌面服务后无法多用户同时登录是什么原因?
【11月更文挑战第15天】本文介绍了在Windows Server 2012 R2中遇到的多用户无法同时登录远程桌面的问题及其解决方法,包括许可模式限制、组策略配置问题、远程桌面服务配置错误以及网络和防火墙问题四个方面的原因分析及对应的解决方案。
|
1月前
|
监控 安全 网络安全
使用EventLog Analyzer日志分析工具监测 Windows Server 安全威胁
Windows服务器面临多重威胁,包括勒索软件、DoS攻击、内部威胁、恶意软件感染、网络钓鱼、暴力破解、漏洞利用、Web应用攻击及配置错误等。这些威胁严重威胁服务器安全与业务连续性。EventLog Analyzer通过日志管理和威胁分析,有效检测并应对上述威胁,提升服务器安全性,确保服务稳定运行。
|
1月前
|
监控 安全 网络安全
Windows Server管理:配置与管理技巧
Windows Server管理:配置与管理技巧
86 3
|
1月前
|
存储 安全 网络安全
Windows Server 本地安全策略
由于广泛使用及历史上存在的漏洞,Windows服务器成为黑客和恶意行为者的主要攻击目标。这些系统通常存储敏感数据并支持关键服务,因此组织需优先缓解风险,保障业务的完整性和连续性。常见的威胁包括勒索软件、拒绝服务攻击、内部威胁、恶意软件感染等。本地安全策略是Windows操作系统中用于管理计算机本地安全性设置的工具,主要包括用户账户策略、安全选项、安全设置等。实施强大的安全措施,如定期补丁更新、网络分段、入侵检测系统、数据加密等,对于加固Windows服务器至关重要。
|
2月前
|
边缘计算 安全 网络安全

热门文章

最新文章