前言
终于忙完了四门专业课的期末,确实挺累啊。今天开始继续学习 Flink ,接着上次的内容。
今日摘录:
他觉得一个人奋斗更轻松自在。跟没有干劲的人在一起厮混,只会徒增压力。
-《解忧杂货店》
1、窗口
之前我们已经了解了 Flink 中基本的聚合操作。在流处理中,我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有所有数据都到齐了才开始处理。所以聚合计算其实只能针对当前已有的数据——之后再有数据到来,就需要继续叠加、再次输出结果。这样似乎很“实时”,但现实中大量数据一般会同时到来,需要并行处理,这样频繁地更新结果就会给系统带来很大负担了。更加高效的做法是,把无界流进行切分,每一段数据分别进行聚合,结果只输出一次。这就相当于将无界流的聚合转化为了有界数据集的聚合,这就是所谓的“窗口”(Window)聚合操作。窗口聚合其实是对实时性和处理效率的一个权衡。在实际应用中,我们往往更关心一段时间内数据的统计结果,比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下,我们就可以定义一个窗口,收集最近一分钟内的所有用户点击数据,然后进行聚合统计,最终输出一个结果就可以了。在 Flink 中,提供了非常丰富的窗口操作,下面我们就来详细介绍。
1.1、窗口的概念
这里的窗口和我们之前 Hive 中学到的窗口的概念是不一样的。Hive的窗口函数主要用于离线数据的聚合,是基于字段范围的,而Flink的窗口则是基于时间更多地用于处理实时数据流。
在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
Flink 中窗口并不是静态准备好的,而是动态创建的,只有这个窗口区间内的数据到达时,才会去创建对应的窗口。
1.2、窗口的分类
1.2.1、按照度量标准
1)时间窗口
以时间点来定义窗口的开始和结束,所以截取出的就是某一段时间的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。
2)计数窗口
基于元素个数来截取数据,元素达到固定的个数时,就触发计算并关闭窗口。
1.2.2、按照窗口分配数据的规则
1)滚动窗口
滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口。
2)滑动窗口
与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。既然是向前滑动,那么每一步滑多远,就也是可以控制的。
滑动窗口的参数有两个:窗口大小(window size)和 “滑动步长”(window slide)。当滑动步长等于窗口大小时,这就像是一个滚动窗口;当滑动步长大于窗口大小时,会有数据的遗漏。所以一般情况下,我们会让滑动步长小于窗口大小,并尽量设置为整数倍的关系。
3)会话窗口
会话窗口顾名思义,是基于“会话”(session)来来对数据进行分组的。这里的会话类似Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来描述窗口。简单来说,就是数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。
与滑动窗口和滚动窗口不同,会话窗口只能基于时间来定义,而没有“会话计数窗口”的概念。
4)全局窗口
还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。
1.3、API
1.3.1、 按键分区(Keyed)和非按键分区(Non-Keyed)
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,是否有 keyBy 操作。
1)按键分区窗口(Keyed Windows)
经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。在代码实现上,我们需要先对 DataStream 调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
stream.keyBy(...).window(...)
2)非按键分区(Non-Keyed)
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。
在代码中,直接基于 DataStream 调用.windowAll()定义窗口。
stream.windowAll(...)
这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作。
1.3.2、API 的调用
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
stream.keyBy(<key selector>) .window(<window assigner>) .aggregate(<window function>)
其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种。另外,在实际应用中,一般都需要并行执行任务,非按键分区很少用到,所以我们之后都以按键分区窗口为例;如果想要实现非按键分区窗口,只要前面不做 keyBy,后面调用.window()时直接换成.windowAll()就可以了。
1.4、窗口分配器
用于指定窗口的类型,也就是指定窗口的度量标准(基于时间/元素个数)和分配数据的规则(滚动/滑动/会话/全局)。
窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。
除去需要自定义的全局窗口外,其他三种常用的类型 Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。
下面的例子我们都使用按键分区的窗口类型,这里给出 keyBy 后得到的数据流对象 KeyedStream:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()); KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(sensor -> sensor.getId());
1.4.1、基于时间的窗口
注意:这里的时间有两种语义,一种是处理时间(对于API中 xxxProcessingTimeWindows),一种是事件时间(对于API中 xxxEventProcessingWindows),这涉及到时间语义的内容,之后学到时间语义再详细说。
这里只需要知道,TumblingProcessingTimeWindows和TumblingEventTimeWindows的主要区别在于时间基准和触发机制。TumblingProcessingTimeWindows基于处理时间触发计算,适用于实时性要求高的场景;而TumblingEventTimeWindows基于事件时间触发计算,对于处理延迟到达的数据更加灵活。
1)基于处理时间的滚动窗口
// 基于时间的滚动创建 10s WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
这里的 of 方法还可以指定第二个参数,可以传入两个 Time 类型的参数:size 和 offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。
我们知道,不同国家分布在不同的时区。标准时间戳其实就是1970 年 1 月 1 日 0 时 0 分 0 秒 0 毫秒开始计算的一个毫秒数,而这个时间是以 UTC 时间,也就是 0 时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了:
sensorKs.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
2)基于处理时间的滑动窗口
// 基于时间的滑动窗口 窗口大小10s 滑动步长2s WindowedStream<WaterSensor, String, TimeWindow> slideWindow = sensorKs.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)));
3)基于处理时间的会话窗口
注意:会话窗口只能基于时间(处理时间或事件时间),不可以基于计数,因为我们会话窗口它划分窗口的规则只能是基于时间间隔的。
// 基于时间的会话窗口 中间间隔多久没有数据来,就把前面的数据划分为一个窗口,这里指定5s没有数据来就把前面的数据划分为一个窗口,后面的数据是一个新的窗口 WindowedStream<WaterSensor, String, TimeWindow> sessionWindow = sensorKs.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
也可以动态指定间隔(每来一个数据都会修改它的间隔)
// 基于处理时间的会话窗口 动态间隔 sensorKs.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<WaterSensor>() { @Override public long extract(WaterSensor sensor) { // 取 WaterSenor 对象的 ts 属性作为间隔,单位 ms 这里*1000方便输入 return sensor.getTs() * 1000; } }));
比如我们连续很快输入:
s1,5,5 s2,6,6 s3,7,7 s8,8,8
它会再我们输出完 "s8,8,8" 后再过八秒没有数据来,就输出窗口(窗口中的值就是计算的结果,具体计算的逻辑取决于我们指定的窗口函数)
4)基于事件时间的滚动窗口
// 基于事件时间的滚动窗口 sensorKs.window(TumblingEventTimeWindows.of(Time.seconds(10)));
5)基于事件时间的滑动窗口
// 基于事件时间的滑动窗口 sensorKs.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)));
6)基于事件时间的会话窗口
// 基于事件时间的会话窗口 sensorKs.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
1.4.2、基于计数的窗口
滚动计数窗口和滑动计数窗口的方法是同一个,只不过一个只需要传入一个参数,一个需要传入两个参数。
1)滚动计数窗口
// 基于计数的滚动窗口 窗口长度=5个元素 sensorKs.countWindow(5);
2)滑动计数窗口
// 基于计数的滑动窗口 窗口长度=5个元素,窗口步长的=2个元素,也就是说两个窗口间隔两个元素 sensorKs.countWindow(5,2);
滑动计数窗口会在数据量每达到步长的时候计算一次,而数据量每达到窗口大小才会关闭窗口开启一个新的窗口。 只要经过一个步长,就一定有一个窗口关闭。
对于上面的滑动计数窗口,它每一个步长2就会滑动一次(并输出一次计算结果)。
3)全局窗口
全局窗口一般在自定义的时候才会用到。
我们可以看到,滚动计数窗口和滑动计数窗口的实现的底层就是全局窗口,只不过滚动计数窗口中指定了触发器,滑动计数窗口中指定了触发器和移除器:
1.5、窗口函数
定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要怎么做,这就需要使用窗口函数来操作了(注意:是窗口函数而不是其他普通函数,因为经过划分窗口后的数据流是 WindowedStream 而不是 DataStream)。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。
经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream
1.5.1、增量聚合函数(reduce/aggregate)
1)规约函数(ReduceFunction)
public class ReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()); KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId); // todo 1. 指定窗口分配器:基于处理时间的滚动窗口 WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // todo 2. 指定窗口函数:增量聚合的规约函数 SingleOutputStreamOperator<WaterSensor> reduce = tumblingWindow.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor sensor1, WaterSensor sensor2) throws Exception { System.out.println("调用 reduce 函数,sensor1= " + sensor1 + ",sensor2= " + sensor2); return new WaterSensor(sensor1.getId(), sensor2.getTs(), sensor1.getVc() + sensor2.getVc()); } }); reduce.print(); env.execute(); } }
运行结果:
// 窗口1 调用 reduce 函数,sensor1= WaterSensor{id='s1', ts=1, vc=1},sensor2= WaterSensor{id='s1', ts=2, vc=2} WaterSensor{id='s1', ts=2, vc=3} WaterSensor{id='s3', ts=3, vc=3} // 窗口2 WaterSensor{id='s3', ts=1, vc=1} // 窗口3 调用 reduce 函数,sensor1= WaterSensor{id='s4', ts=1, vc=1},sensor2= WaterSensor{id='s4', ts=1, vc=5} WaterSensor{id='s4', ts=1, vc=6}
2)聚合函数(AggregateFunction)
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样(可以看上面函数接口 ReduceFunction 中的泛型方法的定义)。这就迫使我们必须在聚合前,先将数据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦。
AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC,也就是我们中间结果的类型)和输出类型(OUT)。
接口中有四个方法:
⚫ createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
⚫ add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器 accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
⚫ getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。
⚫ merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
public class AggregateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()); KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId); // todo 1. 指定窗口分配器:基于处理时间的滚动窗口 WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // todo 2. 指定窗口函数:增量聚合的规约函数 SingleOutputStreamOperator<String> aggregate = tumblingWindow.aggregate(new AggregateFunction<WaterSensor, Integer, String>() { // @Override public Integer createAccumulator() { // 我们要存的是对象,不初始化的话是null,所以需要初始化它的值 System.out.println("创建累加器(初始化累加器)"); return 0; } @Override public Integer add(WaterSensor sensor, Integer accumulator) { // 计算逻辑 System.out.println("调用add方法,sensor= "+sensor); return accumulator + sensor.getVc(); } @Override public String getResult(Integer accumulator) { // 获取最终结果输出时 窗口触发输出 System.out.println("调用 getResult 方法"); return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { // 一般不用 只有会话窗口才会用到 System.out.println("调用 merge 方法"); return null; } }); aggregate.print(); env.execute(); } }
运行结果:
// 窗口1 创建累加器(初始化累加器) 调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1} 创建累加器(初始化累加器) 调用add方法,sensor= WaterSensor{id='s2', ts=1, vc=1} 调用 getResult 方法 //id='s1'的结果 1 调用 getResult 方法 //id='s2'的结果 1 // 窗口2 创建累加器(初始化累加器) 调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=2} 调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1} 调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=3} 调用 getResult 方法 6 // 窗口3 创建累加器(初始化累加器) 调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=2} 调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=2} 调用 getResult 方法 4 // 窗口4 创建累加器(初始化累加器) 调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=1}
其实这里的增量聚合函数就是用流处理的思路来处理有界数据流,当相同 key 的数据进来时,把数据的状态不断进行更新。这就是 Flink 所谓的“有状态的流处理”,通过这种方式可以极大地提高程序运行的效率,所以在实际应用中最为常见。
1.5.2、全窗口函数(apply/process)
与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程。
但是把计算放到窗口关闭才去计算无疑是低效的,毕竟如果数据量比较大的时候,这种方式肯定没有增量聚合函数计算的快。那为什么还要使用这种方式呢?这是因为有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式,这就可以用全窗口函数来实现。
在 Flink 中,全窗口函数也有两种:WindowFunction 和 ProcessWindowFunction。
1)全窗口函数(full window functions)
WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。
注意:这种写法已经不推荐使用了,这里了解即可。
tumblingWindow.apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() { /** * * @param key 分组的 key * @param window 窗口对象 * @param input 存的数据(迭代器保存着所有传进来的数据) * @param out 采集器 * @throws Exception */ @Override public void apply(String key, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception { } });
这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。WindowFunction 接口在源码中实现如下:
@Public public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param window The window that is being evaluated. * @param input The elements in the window being evaluated. * @param out A collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception; }
2)窗口处理函数(ProcessWindowFunction)
ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上, ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。
当 然 , 这 些 好 处 是 以 牺 牲 性 能 和 资 源 为 代 价 的 。 作 为 一 个 全 窗 口 函 数 ,ProcessWindowFunction 同样需要将所有数据缓存下来、等到窗口触发计算时才使用。它其实就是一个增强版的 WindowFunction。
public class WindowProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()); KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId); // todo 1. 指定窗口分配器:基于处理时间的滚动窗口 WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // todo 2. 指定窗口函数:全窗口函数 SingleOutputStreamOperator<String> process = tumblingWindow.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { /** * * @param key 分组的 key * @param context 上下文 * @param elements 全窗口存的数据 * @param out 采集器 * @throws Exception */ @Override public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String start = sdf.format(new Date(startTs)); String end = sdf.format(new Date(endTs)); long size = elements.spliterator().estimateSize(); out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString()); } }); process.print(); env.execute(); } }
运行结果:
key=s1 的窗口[2023-11-29 16:30:30,2023-11-29 16:30:40]包含1条数据===>[WaterSensor{id='s1', ts=1, vc=1}] key=s2 的窗口[2023-11-29 16:30:30,2023-11-29 16:30:40]包含2条数据===>[WaterSensor{id='s2', ts=1, vc=1}, WaterSensor{id='s2', ts=1, vc=1}]
这里,我们只用上下文对象获取了窗口的创建时间和关闭时间,其实它还有很多强大的功能,比如侧输出流等。
1.5.3、增量聚合函数结合全窗口函数
增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果我们采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。
而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。
所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
我们之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。
public class AggregateAndProcessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("localhost", 9999) .map(new WaterSensorFunction()); KeyedStream<WaterSensor, String> sensorKs = sensorDS.keyBy(WaterSensor::getId); // todo 1. 指定窗口分配器:基于处理时间的滚动窗口 WindowedStream<WaterSensor, String, TimeWindow> tumblingWindow = sensorKs.window(TumblingProcessingTimeWindows.of(Time.seconds(10))); // todo 2. 指定窗口函数:增量聚合的规约函数 SingleOutputStreamOperator<String> result = tumblingWindow.aggregate( new AggregateFunction<WaterSensor, Integer, String>() { @Override public Integer createAccumulator() { // 我们要存的是对象,不初始化的话是null,所以需要初始化它的值 System.out.println("创建累加器(初始化累加器)"); return 0; } @Override public Integer add(WaterSensor sensor, Integer accumulator) { // 计算逻辑 System.out.println("调用add方法,sensor= " + sensor); return accumulator + sensor.getVc(); } @Override public String getResult(Integer accumulator) { // 获取最终结果输出时 窗口触发输出 System.out.println("调用 getResult 方法"); return accumulator.toString(); } @Override public Integer merge(Integer a, Integer b) { // 一般不用 只有会话窗口才会用到 System.out.println("调用 merge 方法"); return null; } }, new ProcessWindowFunction<String, String, String, TimeWindow>() { // 输入类型=增量函数的输出类型 // 这里的 elements 只有一条数据,因为是从增量聚合函数直接传过来的,我们使用全窗口函数只是为了获得更多的上下文功能 @Override public void process(String key, Context context, Iterable<String> elements, Collector<String> out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String start = sdf.format(new Date(startTs)); String end = sdf.format(new Date(endTs)); long size = elements.spliterator().estimateSize(); out.collect("key=" + key + " 的窗口[" + start + "," + end + "]包含" + size + "条数据===>" + elements.toString()); } }); result.print(); env.execute(); } }
运行结果:
// 窗口1 调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1} 调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=2} 调用 getResult 方法 //这里调用 getResult 方法并不会输出结果 因为结果被作为输入参数传给了全窗口函数 key=s1 的窗口[2023-11-29 16:48:00,2023-11-29 16:48:10]包含1条数据===>[3] 创建累加器(初始化累加器) 调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=2} 调用 getResult 方法 key=s2 的窗口[2023-11-29 16:48:10,2023-11-29 16:48:20]包含1条数据===>[2] // 窗口2 创建累加器(初始化累加器) 调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1} 调用add方法,sensor= WaterSensor{id='s1', ts=1, vc=1} 创建累加器(初始化累加器) 调用add方法,sensor= WaterSensor{id='s2', ts=2, vc=2} 调用 getResult 方法 key=s1 的窗口[2023-11-29 16:48:20,2023-11-29 16:48:30]包含1条数据===>[2] 调用 getResult 方法 key=s2 的窗口[2023-11-29 16:48:20,2023-11-29 16:48:30]包含1条数据===>[2]
可以看到,我们结合了两者的优点:
- 增量聚合:来一条算一条,存储中间结果,不会把计算的压力攒到最后。
- 全窗口函数:可以通过上下文获取更多的功能。
当然,我们的增量聚合函数 reduce 也可以和 全窗口函数结合使用,同样是传入两个参数(一个需要实现函数接口 ReduceFunction ,另一个需要实现抽象类 ProcessWindowFunction (或者 WindowFunction 只不过这个现在不常用了))
1.6、其他 API
其实前面的窗口都有默认的触发器,比如滚动计数窗口中指定了触发器,滑动计数窗口中指定了触发器和移除器。这里理解就好。
1.6.1、触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算,也就是什么时候执行窗口函数,可以理解为计算得到结果的过程。
基于 WindowedStream 调用 tigger() 方法,就可以传入一个自定义的窗口触发器(Trigger)。
tumblingWindow.trigger(new Trigger<WaterSensor, TimeWindow>() { @Override public TriggerResult onElement(WaterSensor element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { return null; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return null; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return null; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { } });
拿我们基于处理时间的滚动窗口来说,TumblingProcessingTimeWindows 有一个 getDefaultTrigger 方法来选择默认的触发器,这个方法会返回一个类 ProcessingTimeTrigger 的实例,这个类 ProcessingTimeTrigger 继承自抽象类 Tigger, 主要实现了这么几个方法:
- onElement():窗口中每到来一条数据,都会调用这个方法。
- onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
- onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
- clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。
我们可以看上面 基于计数的滑动窗口的底层代码,我们可以发现,它有默认的触发器和移除器:
我们可以查看它的触发器的内部代码:
1.6.2、移除器(Evictor)
移除器主要用来自定义移除某些数据的逻辑。基于 WindowedStream 调用 evictor() 方法,就可以传入一个自定义的移除器。Evictor 是一个接口,不同的窗口类型有自己预先实现的移除器。
tumblingWindow.evictor(new Evictor<WaterSensor, TimeWindow>() { @Override public void evictBefore(Iterable<TimestampedValue<WaterSensor>> elements, int size, TimeWindow window, EvictorContext evictorContext) { } @Override public void evictAfter(Iterable<TimestampedValue<WaterSensor>> elements, int size, TimeWindow window, EvictorContext evictorContext) { } });
不管是触发器还是移除器,一般都不需要我们自定义,但要知道它的底层原理。
1.7、窗口原理分析
1.7.1、窗口的触发时机
思考:假如我们第一条数据到来的时间是 03分:05秒,时间窗口大小为10分钟,那么这条数据属于窗口 [0,10)还是[3,13)呢?(注意:窗口是左闭右开的!)
事实上,窗口的划分并不是以第一条数据来的时间作为初始时间(窗口的 start),上面的数据是属于窗口[0,10]的。我们通过上面小节中的运行结果也可以看到,时间都是整秒开始的。我们可以查看计算窗口起始时间的代码底层原理(下面是以 TumblingProcessingTimeWindows 为例):
我们可以看到, TumblingProcessingTimeWindows 的 assignWindows 方法的作用是确定如何将流中的元素分配到不同的窗口中。其中定义了窗口的起始时间(start)和结束时间(start + size):
1.7.2、窗口的生命周期
1)什么时候创建?
Flink 是事件驱动型的,它不会预先把窗口建立好等数据来,而是当属于本窗口范围的第一条数据来的时候才创建窗口。
比如我们的窗口大小为 10s,假如我们在 13s(第一条)、14s 来了两条数据 ,它是什么时候创建窗口的?
通过源码我们可以看到,属于该窗口的数据到来时,都会通过现 new 来创建一个窗口对象,并且放到一个单例集合中,表示这个窗口对象只有一个,第一条数据之后来的就不会覆盖之前的窗口了。
2)什么时候销毁?
当 时间进展 >= 窗口最大时间 (end - 1ms) + 允许迟到的时间(默认为0) 的时候,就会销毁窗口。