一.引言
Flink 流处理用于处理源源不断的数据,之前介绍过 processFunction,该方法会对单个元素进行处理,除此之外,还有一种批量数据处理的方法就是 TimeWindow 以及 TimeWindowAll,Flink 时间窗口可以看作是对无线数据流设置的有限数据集,即流处理框架下的批处理。窗口下又分为 CountWindow 和 TimeWindow,之前介绍窗口 Trigger 已经介绍过,有兴趣的同学可以回看。本文主要介绍 TimeWindow 且示例均采用 ProcessingTime。
二.TimeWindow 简介
Flink 的窗口采用左闭右开,其根据定义的时间范围自定义生成窗口范围,常用的有:
Tumbling Window - 滚动窗口
Sliding Window - 滑动窗口
Session Window - 会话窗口
1.Tumbling Window - 滚动窗口
滚动窗口下各个窗口之间不重叠,且窗口的时长固定,根据 ProcessingTime 或者 EventTime 可以分别创建对应的 TumblingProcessingTimeWindows 与 TumblingEventTimeWindows,窗口的长度可以使用 org.apache.flink.streaming.api.windowing.time.Time 设定 seconds、minutes、hours 、days。
编辑
根据到来的元素,窗口划定相同时间范围进行元素圈定并生成窗口,由于时间是连续的,所以滚动窗口的窗口前后重合且不会丢失元素。
Tips:
滚动窗口的时间根据 Time 的设定自动生成范围,例如设置 10s 的滚动窗口:
TumblingProcessingTimeWindows.of(Time.seconds(10)
以 18:00 开始为例,Flink 会自动生成如下左闭右开的时间窗口,窗口的开始和结束分别对应 window.getStart 和 window.getEnd。
18:00:00 - 18:00:10 ,18:00:10 - 18:00:20 ... 18:59:50 - 19:00:00
2. Sliding Window - 滑动窗口
滑动窗口以步长 Slide 不断向前滑动,然后生成 Size 大小的窗口。Slide 决定窗口的生成速度,Slide 较大时每个窗口的范围很大,窗口数量很少,反之 Slide 较小时则会生成的窗口数量会很多。Flink 可以用过如下代码设置滑动窗口:
SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(5)
Slide > Size:
当 Slide > Size 时,滑动窗口会有一部分元素不在 Size 内,从而导致元素丢失。
编辑
Slide < Size:
当 Slide < Size 时,两个滑动窗口 Size 窗口内可能包含同一元素,从而导致元素重复。
编辑
Slide = Size:
当 Silde = Size 时,滚动窗口变为滑动窗口。
编辑
3.Session Window - 会话窗口
上面两种为常见的窗口模式,还有一种窗口使用较少即 Session Window,该模式下两个窗口之间有一个间隙,称为 Session Gap。当一个窗口 Session Gap 时间内没有收到数据,则窗口关闭。可以通过如下代码设置 Session Window:
ProcessingTimeSessionWindows.withGap(Time.minutes(10))
也可以调用 .withDynamicGap 和 SessionWindowTimeGapExtractor 设置动态的 Session Gap。
编辑
可以看到由于 Session Window 受数据源的连续性影响,窗口的大小、窗口的起止时间都是不确定的。
三.TimeWindow 与 TimeWindowAll
不论是 TimwWindow 还是 TimeWindowAll ,二者都使用上述的 Window 模式,唯一不同的是二者的使用场景。
1.TimeWindow
TimeWindow 适用于 keyedStream,数据在 keyBy 分流后,window 将不同的 key 分开并生成多个 window,所以 TimeWindows 是并行处理的。
数据流每s生成一批 Data 类数据,并累加对应类中的数值 num,数据的并行度为5:
dataStream.keyBy(data => { data.num.toString.slice(0, 1) }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction[Data, String, String, TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[Data], out: Collector[String]): Unit = { val log = key + "\t" + elements.toArray.mkString(",") val taskId = getRuntimeContext.getIndexOfThisSubtask out.collect(taskId + "\t" + log) } }).print()
红框:
其中第一列红框为 TaskId,由于设置并行度为5,所以 TaskId 的值分别为 0,1,2,3,4,这也说明了 TimeWindow 是并行执行,将不同的 key 划分至不同 window
绿框:
第二列绿框为 TimeWindow 归拢元素对应的 key,我们根据 num 数字的第一位 keyBy,所以其值为 0,1,2,3,4,5,6,7,8,9
蓝框:
第三列蓝框为 key 对应的数据,可以看到对应数据的第一位均与对应 window 的 key 完全一致,所以这里可以看做是将数据 GroupBy 并汇总至不同 window
编辑
Tips:
TimeWindow 支持并行操作,默认并行度与 SourceStream 一致,也可以手动 setParallelism 设置:
dataStream.keyBy(data => { data.num.toString.slice(0, 1) }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction[Data, String, String, TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[Data], out: Collector[String]): Unit = { val log = key + "\t" + elements.toArray.mkString(",") val taskId = getRuntimeContext.getIndexOfThisSubtask out.collect(taskId + "\t" + log) } }).setParallelism(2).print()
修改并行度为2后,可以看到打印的 TaskId 只有0,1,不再受 SourceStream 并行度为5的影响。
编辑
除此之外,因为 TimeWindow 需要根据 key 划分,所以需要数据流为 keyedStream,DataStream 或者 SingleOutputStreamOperator 不支持使用 TimeWindow,只能使用 TimeWindowAll。
2.TimeWindowAll
TimeWindowAll 是把所有的数据进行聚合,所以并行度只能为1。这里处理函数有不同,TImeWindow 使用 ProcessWindowFunction,TimeWindowAll 则使用 ProcessAllWindowFunction。
dataStream.keyBy(data => { data.num.toString.slice(0, 1) }).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessAllWindowFunction[Data, String, TimeWindow] { override def process(context: Context, elements: Iterable[Data], out: Collector[String]): Unit = { val log = elements.toArray.map(_.num).mkString(",") val taskId = getRuntimeContext.getIndexOfThisSubtask out.collect(taskId + "\t" + log) } }).print()
WindowAll 的数据都汇聚在同一个 Task 上,所以数据量相比之前并行的 TimeWindow 会大很多。
编辑
Tips:
TimeWindowAll 的并行度只能为1,因为 window 汇聚一段时间内的所有数据到一个 task 处理,如果像 TimeWindow 上面示例修改并行度会得到如下报错:
编辑
3.使用场景
A.常见使用
TimeWindow 和 TimeWindowAll 都适用于对流式数据转化做一定时间范围内的批处理,主要区别在两者的并行度,前者为 Parallel Operator 后者为 Non Parallel Operator,所以 TimeWindow 的适用范围更广,适合一些需要对数据分批分 key处理且数据量较大需要并行处理的场景;而 TimeWindowAll 汇聚一段时间内的所有数据,适合需要汇总所有数据或者数据量不大的任务,这样可以减少并发,例如任务内需要涉及到数据网络 IO,如果并行度过高则容易导致网络服务过载。
B.转换
TimeWindow 的并行度变成 1 则变为 TimeWindowAll;如果 TimeWindowAll 的数据实在很大,可以先通过一层 TimeWindow 做分区的汇总,随后将数据回收至 TimeWindowAll 做总的汇总,有点类似 Spark 的 groupByKey 和 reduceByKey。
四.总结
Flink TimeWindow 以及 TimeWindowAll 的基本介绍大致就这些,除了 window 的使用外,还涉及到 window triger 即 window 的触发方式,有需要的同学可以查看:Flink - Scala/Java trigger 简介与使用。