一、时间定义
如图所示,在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被 Flink 系统中的 Source 算子读取消费,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。
有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后。
- 处理时间(Processing Time): 是指执行处理操作的机器的系统时间。
- 事件时间(Event Time): 指每个事件在对应的设备上发生的时间,也就是数据生成的时间。
二、水位线(Watermark)
1、概念
在实际应用中,一般会采用事件时间语义。而水位线,就是基于事件时间提出的概念。
在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。
在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
如图所示,每个事件产生的数据,都包含了一个时间戳,我们直接用一个整数表示。当产生于2 秒的数据到来之后,当前的事件时间就是 2 秒;在后面插入一个时间戳也为 2 秒的水位线,随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。
1.有序流中水位线
在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;如图 所示。所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。
2.乱序流中水位线
道在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的“乱序数据”。
如图所示,第一个水位线时间戳为 7,它表示当前事件时间是 7 秒,7 秒之前的数据都已经到齐,之后再也不会有了;同样,第二个、第三个水位线时间戳分别为 12 和 20,表示11 秒、20 秒之前的数据都已经到齐,如果有对应的窗口就可以直接关闭了,统计的结果一定是正确的。这里由于水位线是周期性生成的,所以插入的位置不一定是在时间戳最大的数据后面。
另外需要注意的是,这里一个窗口所收集的数据,并不是之前所有已经到达的数据。因为数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。也就是说,上图中尽管水位线 W(20)之前有时间戳为 22 的数据到来,10~20 秒的窗口中也不会收集这个数据,进行计算依然可以得到正确的结果。
2、水位线特征
现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据。
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展。
- 水位线是基于数据的时间戳生成的。
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进。
- 水位线可以通过设置延迟,来保证正确处理乱序数据。
- 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据。
3、生成水位线
所以 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
3.1 水位线生成策略(Watermark Strategies)
1. 水位线配置API(assignTimestampsAndWatermarks)
在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方法:.assignTimestampsAndWatermarks()
,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy )
直接用 DataStream 调用该方法即可,与普通的 transform 方法完全一样。
stream06.assignTimestampsAndWatermarks(new CustomWatermarkStrategy());
2. 水位线生成策略(WatermarkGenerator)
.assignTimestampsAndWatermarks()
方法需要传入一个WatermarkStrategy
作为参数,这就是所谓的水位线生成策略
。
WatermarkStrategy
中包含了一个时间戳分配器TimestampAssigner
和一个水位线生成器WatermarkGenerator
。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{ @Override TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context); @Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }
TimestampAssigner
:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。WatermarkGenerator
:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。onEvent
:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作.onPeriodicEmit
:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
env.getConfig().setAutoWatermarkInterval(60 * 1000L);
3.2 Flink 内置水位线生成器
WatermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的需求;Flink提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。
1.有序流
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()
方法就可以实现。
简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
时间戳和水位线的单位,必须都是毫秒。
stream06.assignTimestampsAndWatermarks( WatermarkStrategy // 有序流(时间戳单调递增) .<Event>forMonotonousTimestamps() // 时间戳抽取逻辑 .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timstamp; } }) );
上面代码中我们调用.withTimestampAssigner()
方法,将数据中的 timestamp
字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。
2.乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。
这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()
方法就可以实现。这个方法需要传入一个maxOutOfOrderness
参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;
// 插入水位线逻辑 stream06.assignTimestampsAndWatermarks( WatermarkStrategy // 乱序流(针对乱序流插入水位线,延迟时间设置为5s) .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 时间戳抽取逻辑 .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timstamp; } }) );
事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为 0 的乱序流水位线生成器,两者完全等同:
WatermarkStrategy.forMonotonousTimestamps() WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
乱序流中生成的水位线真正的时间戳,其实是 当前最大时间戳 – 延迟时间 – 1。
3.3 自定义水位线策略
1. 周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线。
import com.lydms.flink.domain.Event; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; // 自定义水位线的产生 public class CustomWatermarkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env .addSource(new ClickSource()) .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) .print(); env.execute(); } public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> { @Override public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段 } }; } @Override public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new CustomPeriodicGenerator(); } } public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> { private Long delayTime = 5000L; // 延迟时间 private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳 @Override public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) { // 每来一条数据就调用一次 maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳 } @Override public void onPeriodicEmit(WatermarkOutput output) { // 发射水位线,默认 200ms 调用一次 output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } } }
2. 断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。
public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> { @Override public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) { // 只有在遇到特定的 itemId 时,才发出水位线 if (r.user.equals("Mary")) { output.emitWatermark(new Watermark(r.timestamp - 1)); } } @Override public void onPeriodicEmit(WatermarkOutput output) { // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线 } }
我们在 onEvent()中判断当前事件的 user 字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()
发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。
4、水位线的传递
如图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:
- 上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线” (Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
- 当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
- 再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
- 同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间戳为 4 的水位线,广播到下游各个分区任务。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。
三、窗口(Window)
1、概念
把无界流进行切分,每一段数据分别进行聚合,结果只输出一次。这就相当于将无界流的聚合转化为了有界数据集的聚合,这就是所谓的“窗口”(Window)聚合操作。窗口聚合其实是对实时性和处理效率的一个权衡。
Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。窗口就是用来处理无界流的核心。
Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。
- 第一个数据时间戳为 2,判断之后创建第一个窗口[0, 10),并将 2 秒数据保存进去;
- 后续数据依次到来,时间戳均在 [0, 10)范围内,所以全部保存进第一个窗口;
- 11 秒数据到来,判断它不属于[0, 10)窗口,所以创建第二个窗口[10, 20),并将 11秒的数据保存进去。由于水位线设置延迟时间为 2 秒,所以现在的时钟是 9 秒,第一个窗口也没有到关闭时间;
- 之后又有 9 秒数据到来,同样进入[0, 10)窗口中;
- 12 秒数据到来,判断属于[10, 20)窗口,保存进去。这时产生的水位线推进到了 10秒,所以 [0, 10)窗口应该关闭了。第一个窗口收集到了所有的 7 个数据,进行处理计算后输出结果,并将窗口关闭销毁;
- 同样的,之后的数据依次进入第二个窗口,遇到 20 秒的数据时会创建第三个窗口[20, 30)并将数据保存进去;遇到 22 秒数据时,水位线达到了 20 秒,第二个窗口触发计算,输出结果并关闭。
2、窗口分类
2.1 驱动类型分类
- 时间窗口(Time Window)
- 计数窗口(Count Window)
窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。
时间窗口(Time Window):按照时间段去截取数据。
计数窗口(Count Window):按照固定的个数,来截取一段数据集。
时间窗口(Time Window)
时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”。
窗口中的数据,最大允许的时间戳就是 end - 1,这也就代表了我们定义的窗口时间范围都是左闭右开的区间[start,end)。
Flink 中有一个专门的类来表示时间窗口,名称就叫作 TimeWindow。这个类只有两个私有属性:start 和 end,表示窗口的开始和结束的时间戳,单位为毫秒。
public class TimeWindow extends Window { private final long start; private final long end; public TimeWindow(long start, long end) { this.start = start; this.end = end; } // 获取开始时间戳 public long getStart() { return start; } // 获取结束时间戳 public long getEnd() { return end; } }
计数窗口(Count Window)
计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。
计数窗口相比时间窗口就更加简单,我们只需指定窗口大小,就可以把数据分配到对应的窗口中了。在 Flink 内部也并没有对应的类来表示计数窗口,底层是通过“全局窗口”(Global Window)来实现的。
2.2 窗口分配数据规则
- 滚动窗口(Tumbling Window)
- 滑动窗口(Sliding Window)
- 会话窗口(Session Window)
- 全局窗口(Global Window)
滚动窗口(Tumbling Window)
- 滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。
- 窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。
- 每个数据都会被分配到一个窗口,而且只会属于一个窗口。
- 需要的参数只有一个,就是窗口的大小(window size)。
- 滚动窗口可以基于时间定义,也可以基于数据个数定义;
如图所示,小圆点表示流中的数据,我们对数据按照 userId 做了分区。当固定了窗口大小之后,所有分区的窗口划分都是一致的;窗口没有重叠,每个数据只属于一个窗口。
滑动窗口(Sliding Window)
与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。
- 参数有两个:窗口大小(window size)之外,还有一个“滑动步长”(window slide)。
- 滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。
我们可以看到,当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决定。如图所示,滑动步长刚好是窗口大小的一半,那么每个数据都会被分配到 2 个窗口里。
会话窗口(Session Window)
是基于“会话”(session)来来对数据进行分组的,借用会话超时失效的机制来描述窗口。
- 如果接下来还有数据陆续到来,那么就一直保持会话;
- 如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。
- 参数就是这段时间的长度(size),它表示会话的超时时间。
与前两种窗口不同,会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。如图 6-19 所示,会话窗口之间一定是不会重叠的,而且会留有至少为 size 的间隔(session gap)。
全局窗口(Global Window)
全局窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。
可以看到,全局窗口没有结束的时间点,所以一般在希望做更加灵活的窗口处理时自定义使用。Flink 中的计数窗口(Count Window),底层就是用全局窗口实现的。
3、API概述
3.1 按键分区(Keyed)和非按键分区(Non-Keyed)
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,是否有 keyBy 操作。
按键分区窗口(Keyed Windows)
经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
stream.keyBy(...) .window(...)
非按键分区(Non-Keyed Windows)
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。
在代码中,直接基于 DataStream 调用.windowAll()定义窗口。
stream.windowAll(...)
3.2 代码中窗口 API 的调用
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
stream.keyBy(<key selector>) .window(<window assigner>) .aggregate(<window function>)
.window()
方法需要传入一个窗口分配器,它指明了窗口的类型;.aggregate()
方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。
4、窗口分配器((Window Assigners)
定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。
4.1 时间窗口
时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。
1-1 滚动处理时间窗口
窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。
这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。
定义 1 天的窗口,默认就从 0 点开始;如果定义 1 小时的窗口,默认就从整点开始。
stream.keyBy(...) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(...)
可以增加起始时间偏移量,来确定什么时间开始任务的执行。
// 早上8点开始执行(增加偏移量) .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
1-2. 滑动处理时间窗口
窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。
stream.keyBy(...) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(...)
这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。
滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。
1-3. 会话时间窗口
窗口分配器由类ProcessingTimeSessionWindows
提供,需要调用它的静态方法.withGap()
或者.withDynamicGap()
。
这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。
stream.keyBy(...) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...)
案例:
.window( ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() { @Override public long extract(Tuple2<String, Long> element) { // 提取 session gap 值返回, 单位毫秒 return element.f0.length() * 1000; } }))
2-1. 滚动事件时间窗口
窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。
stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...)
2-2 滑动事件时间窗口
窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。
stream.keyBy(...) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(...)
2-3 事件时间会话窗口
窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。
stream.keyBy(...) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .aggregate(...)
4.2 计数窗口
计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink 为我们提供了非常方便的接口:直接调用.countWindow()方法。
根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类。
1. 滚动计数窗口
滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。
定义一个长度为 10 的滚动计数窗口,当窗口中元素数量达到 10 的时候,就会触发计算执行并关闭窗口。
stream.keyBy(...) .countWindow(10)
2. 滑动计数窗口
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size 和 slide,前者表示窗口大小,后者表示滑动步长。
stream.keyBy(...) .countWindow(10,3)
定义一个长度为 10、滑动步长为 3 的滑动计数窗口。每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果。
4.3 全局窗口
全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由 GlobalWindows 类提供。
stream.keyBy(...) .window(GlobalWindows.create());
需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。
5、窗口函数(Window Functions)
经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream
。这个类型并不是 DataStream
,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream
,如图所示。
处理的方式可以分为两类:增量聚合函数和全窗口函数。
5.1 增量聚合函数(incremental aggregation functions)
为了提高实时性,可以像DataStream
简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了。区别在于不立即输出结果,而是等到窗口结束时间,拿出之前聚合的状态直接输出。
典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。
5.1.1 归约函数(ReduceFunction)
最基本的聚合方式就是归约(reduce)。