2.6 Window函数
- 增量计算函数
增量计算指的是窗口保存一份中间数据,每流入一个新元素,新元素都会与中间数据合并,生成新的中间数据,在保存到窗口中。如ReduceFunction、AggregateFunction、FoldFnction。
- 全量计算函数
全量计算函数指的是先缓存该窗口的所有元素,等到触发条件后对窗口内的所有元素执行计算。如ProcessWindowFunction。
三、Watermark
3.1 DataStream Watermark生成
- Source Function中生成Watermark
SourceFunction 可以直接为数据元素分配时间戳,同时也会向下游发送Watermark。需要注意的是:如果一个timestamp 分配器被使用的话,由源提供的任何Timestramp和Watermark都会被重写。
为了通过SourceFunction直接为一个元素分配一个时间戳,SourceFunction需要调用SourceContext中的collectWithTimestamp(T element, long timestamp)方法。为了生成Watermark,源需要调用emitWatermark(Watermark)方法。
- DataStreamApi 中生成Watermark
DataStreamApi中使用的TimestampAssigner接口定义了时间戳的提取行为,其有两个不同接口AssignerWithPeriodicWatermarks和AssingerWithPunctuatedWatermarks,分别代表了不同的Watermark生成策略。
1)AssignerWithPeriodicWatermarks是周期性生成Watermark策略的顶层抽象接口,该接口的实现类周期性地生成watermark,而不会针对每一个事件都生成。
2)AssignerWithPunctuatedWatermarks对每一个事件都会尝试进行Watermark的生成,但是如果生成的Watermark是null或者Watermark小于之前的Watermark,则该Watermark不会发往下游,因为发往下游也不会有任何效果,不会触发任何窗口的执行。
3.2 FlinkSQL Watermark生成
其Watermark的生成主要是在TableSource中完成的,其定义了3类Watermark生成策略。
- 周期性Watermark策略
PeriodicWatermarkAssigner周期性(一定时间间隔或者达到一定的记录条数)地产生一个Watermark。生产使用时,一定注意时间和数据量,结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大延迟。
1)AscendingTimestamps:递增Watermark,作用在Flink SQL中的Rowtime属性上,Watermark = 当前收到的数据元素的最大时间戳 -1,此处减1的目的是确保有最大时间戳的时间不会被当做迟到的数据丢弃。
2)BoundedOutOfOrderTimestamps:固定延迟Watermark,作用在Flink SQL的Rowtime属性上,Watermark = 当前收到的数据元素的最大时间戳-固定延迟。
- 每个事件Watermark策略
PuntuatedWatermarkAssigner,数据流中每一个递增的EventTime都会产生一个Watermark。在生产中,除非实时性非常高的场景下才会选择Puntuated的方式进行Watermark生成。
- 无为策略
PreserveWatermark,可以使用DataStream Api和Table & SQL混合编程,所以FlinkSQL中不设定Watermark策略。
3.3 多流的Watermark
Flink内部实现每一个边上只能有一个递增的Watermark,当出现多流携带EventTime汇聚到一起(GroupBy或Union)时,Flink会选择所有流入的EventTime中最小的一个向下游流出,从而保证Watermark的单调递增和数据的完整性。
Flink作业一般是并行执行的,作业包含多个Task,每个Task运行一个或一组算子(operator chain) 实例,Task在生成Watermark的时候是相互独立的,也就是说在作业中存在多个并行的Watermark。多流输入会被分解成多个双流输入,对于多个双流Watermark处理,无论哪一个流的Watermark进入算子,都需要跟另一个流的当前算子进行比较,选择较小的的Watermark,即Min(input1 Watermark,input2Watermark),与算子当前的Watermark比较,如果大于当前算子的Watermark,则更新算子的Watermark为新的Watermark,并发送下游。
AbstractStreamOperator.java
3.4 时间服务
- 定时器服务器
定时器服务器在Flink中叫作TimeService,窗口算子(WindowOperator)中使用了InternalTimerService来管理定时器(Timer),其初始化是在WindowOperator#open()内实现的。
InternaleTimerService有几个元素比较重要:名称、命名空间类型N(及其序列化器)、键类型K(及其序列化器)和Triggerable对象(支持延时计算的算子,继承了Triggerable接口来实现回调)
一个算子中可以有多个InternalTimeService,通过名称进行区分:
1)WindowOperator:名称为 "window-timers"
2)KeyedProcessOperator:名称为"user-timers"
3)CepOperator:名称为"watermark-callbacks"
InternalTimerService接口实现类是InternalTimerServiceImpl,Timer的实现类是InternalTimer。InternalTimeServiceImpl使用了两个TimerHeapInternalTimer的优先队列(HeapPriorityQueueSet,该优先队列是Flink自己实现的),分别用于维护事件时间和处理时间的Timer。
InternalTimeServiceManager是Task级别提供的InternalService集中管理器。其使用Map保存了当前所有的InterTimerService,Map的key是InternalTimerService的名字。
- 定时器
定时器在Flink中叫作Timer。窗口的触发器与定时器是紧密联系的。
Flink的定时器使用InternalTimer接口定义行为。
Timer到底是如何触发然后回调用户逻辑的呢?
在InternalTimerServiceImpl中寻找答案,对于事件时间,会根据Watermark的时间,从事件时间的定时器队列中找到比给定时间小的所有定时器 ,触发该Timer所在的算子,然后由算子去调用UDF中的onTime()方法。处理时间也是类似的逻辑,区别在于,处理时间是从处理时间Timer优先队列中找到Timer。处理时间依赖于当前系统是,所以使用的周期性调度。
- 优先队列
Flink在优先级队列中使用了KeyGroup,是按照KeyGroup去重的,并不是按照全局的Key去重。
Flink自己实现了优先级队列来管理Timer:
1)基于堆内存的优先级队列HeapPriorityQueueSet:基于Java堆内存的优先级队列,其实现思路与Java的PriorityQueue类型,使用了二叉树。
2)基于RocksDB的优先级队列:分为Cache+RocksDB量级,Cache中保存了前N个元素,其余的保存在RocksDB中。写入的时候采用Write-through策略,即写入Cache的同时要更新RocksDB中的数据,可能需要访问磁盘。