一.引言
Flink 针对 window 提供了多种自定义 trigger,其中常见的有 CountTrigger 和 ProcessingTimeTrigger,下面通过两个 demo 了解一下两个 Trigger 的内部实现原理与窗口触发的相关知识。
二.辅助知识
介绍上述两个 Trigger 之前,首先重新回顾下之前提高的 trigger 基础知识。
1.Trigger 内部方法
· onElement :元素到达后执行的操作
· onProcessingTime:到达规定处理时间窗口执行的操作
· onEventTime :到达规定事件时间窗口执行的操作
· clear : 清除相关 value 变量
2.Window Trigger 后的操作
· TriggerResult.CONTINUE :跳过,什么都不做
· TriggerResult.FIRE :触发窗口计算
· TriggerResult.PURGE : 清除窗口元素
· TriggerResult.FIRE_AND_PURGE : 触发窗口操作,随后清空窗口元素
3.ReducingStateValue
ReducingStateValue 是一个抽象的统计量,需要用户自己定义其返回类型和对应的 reduce 操作,这里 reduce 并不是减少而是合并的意思,可以理解为 spark 里的 reduce(_ + _) 操作,即针对给定的 object1 和 object2,合成一个单独的 object,定义该变量方法如下:
val reduceStateDesc = new ReducingStateDescriptor[T]($key, new ReduceFunction(), classOf[T])
T 代表返回变量类型,key 为其 name 标识,ReduceFunction 需要继承 org.apache.flink.api.common.functions.ReduceFunction 实现 reduce(o1: T, o2: T): T 的方法,下面示例生成一个得到两数中最小的一个的 RecuceFunction:
class ReduceMin() extends ReduceFunction[Long] { override def reduce(t1: Long, t2: Long): Long = { math.min(1, t2) } }
4.Window 默认触发时机
dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
使用 org.apache.flink.streaming.api.windowing.assigners 类下的窗口例如滚动窗口 TumblingProcessingTimeWindows 时,根据我们设置的时间,窗口的开始和结束时间其实是固定的,待参数确定后,窗口的 start - end 就确定好了,以上述 10s 滚动窗口为例,则窗口默认的开始结束时间为整点,按18:00 开始:
18:00:00 - 18:00:10 ,18:00:10 - 18:00:20 ... 18:59:50 - 19:00:00
每当窗口到达规定结束时间时,都会默认调用 onProcessingTime 方法,这里不理解也没关系,等下看 demo 即可。
三.CountTrigger 详解
CountTrigger 按照 element 的个数进行触发,当元素数量达到 count_size 是,触发窗口计算逻辑,其内部统计 count 数量就用到了前面提到的 ReduceStateValue,为了在执行过程中添加运行日志,这里新增一个 SelfDefinedCountTrigger,代码与官方提供的 CountTrigger 完全一致,唯一差别是增加了日志打印。
1.SelfDefinedCountTrigger
trigger 内增加了触发的时间和触发的元素数量,主要逻辑都在 onElement 函数内,每来一个元素都会对 trigger 的 ReduceStateValue 累加值,这里采用 RecudeSum 函数,对两数求和,当数值达到 countSize 时进行窗口重触发 Fire 并清空 ReduceStateValue 开始新一轮计数;其余时间都返回 TriggerResult.CONTINUE。
class SelfDefinedCountTrigger(maxCount: Long) extends Trigger[String, TimeWindow] { // 条数计数器 val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long]) // 元素过来后执行的操作 override def onElement(t: String, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state 并累加数量 val count = triggerContext.getPartitionedState(countStateDesc) count.add(1L) // 满足数量触发要求 if (count.get() >= maxCount) { // 首先清空计数器 count.clear() val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") val cla = Calendar.getInstance() cla.setTimeInMillis(System.currentTimeMillis()) val date = dateFormat.format(cla.getTime) println(s"[$date] Window Trigger By Count = ${maxCount}") TriggerResult.FIRE } else { TriggerResult.CONTINUE } } override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { val count = triggerContext.getPartitionedState(countStateDesc) count.clear() } }
2.主函数
主函数逻辑为从0开始,每s生成30个数字并循环累加,window 采用 10s 聚合的滚动窗口,trigger 采用 count = 30 的 CountTrigger,理论上每s生成的30个元素恰好触发窗口执行逻辑。窗口处理逻辑逻辑也很简单,直接输出当前 window 内的元素个数,元素 min,max 与处理时间:
object CountTriggerDemo { val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") // 每s生成一批数据 class SourceFromCollection extends RichSourceFunction[String] { private var isRunning = true var start = 0 override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (isRunning) { (start until (start + 100)).foreach(num => { ctx.collect(num.toString) if (num % 30 == 0) { TimeUnit.SECONDS.sleep(1) } }) start += 100 } } override def cancel(): Unit = { isRunning = false } } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env .addSource(new SourceFromCollection()) .setParallelism(1) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) .trigger(new SelfDefinedCountTrigger(30)) .process(new ProcessAllWindowFunction[String, String, TimeWindow] { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { val cla = Calendar.getInstance() cla.setTimeInMillis(System.currentTimeMillis()) val date = dateFormat.format(cla.getTime) val info = elements.toArray.map(_.toInt) val min = info.min val max = info.max val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max==========" out.collect(output) } }).print() env.execute() } }
3.执行日志
由于启动时并非整数时间,所以第一个窗口只处理了 8s 数据 8:30:02 - 8:30:10,第一个窗口逻辑结束后,后面都是稳定的从 10s 的滑动窗口中按每 30 个元素一次进行触发,可以通过 Window Elem Num 看到 10s 内窗口数据的变化,10s x 30 = 300。
编辑
4.窗口默认触发机制
上面提高了窗口会在默认的 end 时间执行 onProcessingTime 方法,由于方法内只返回了 TriggerResult.CONTINUE 所以不明显,下面在 onProcessingTime 方法中增加日志验证一下:
override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { println(s"Window Trigger At Default End Time: $l") TriggerResult.CONTINUE }
查看日志:
编辑
通过日志信息看到了窗口确实会在结束时间执行 onProcessingTime 方法,但是为什么不是整数呢
编辑
这里是因为窗口实际触发的 timeStamp 是 window.maxTimestamp() 变量对应的方法,而该方法定义如下:
public long maxTimestamp() { return this.end - 1L; }
源码在默认 end 对应的时间戳上做了减一的处理,实际窗口结束时间为 1648034659999 + 1 :
编辑
所以这里验证了两个问题,第一就是窗口会在默认结束时间调用 onProcessingTime 方法,其次就是窗口的结束时间和真实触发时间相差 1L, window.maxTimestamp + 1 = window.getEnd。
四.ProcessingTimeTrigger 详解
processingTimeTrigger 是 processTime 对应 Flink 程序 window 的默认 trigger,其根据窗口默认的 start ,end 时间进行触发,还是用 10s 的 TumblingProcessingTimeWindows 窗口加自定义 SelfDefinedProcessingTimeTrigger 进行 demo 展示。
1.SelfDefinedProcessingTimeTrigger
ProcesingTimeTrigger 主要方法为 onElement 和 onProcessingTime ,前者对窗口进行 timeServer 的注册,其过期时间为 window.maxTimestamp,上面也提到了,时间就是 window.getEnd() - 1,后者执行 Fire 触发窗口计算,为了获得更细致的时间信息,这里增加了 processingTime 和 window-start window-end 的相关日志。
class SelfDefinedProcessingTimeTrigger() extends Trigger[String, TimeWindow] { // 条数计数器 val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long]) val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") val cla = Calendar.getInstance() // 元素过来后执行的操作 override def onElement(t: String, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { triggerContext.registerProcessingTimeTimer(w.maxTimestamp) TriggerResult.CONTINUE } override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { cla.setTimeInMillis(w.getStart) val start = dateFormat.format(cla.getTime) cla.setTimeInMillis(w.getEnd) val end = dateFormat.format(cla.getTime) println(s"start: $start end: $end processTime: $l maxTimeStamp: ${w.maxTimestamp()} windowStart: ${w.getStart} windowEnd: ${w.getEnd}") TriggerResult.FIRE } override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { triggerContext.deleteProcessingTimeTimer(w.maxTimestamp) } }
2.主函数
数据 Source 为自定义 Source,每s生成30个元素,以10s为周期生成滚动窗口,处理逻辑依然为输出窗口的元素个数,min 和 max。
object ProcessTimeTriggerDemo { val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") // 每s生成一批数据 class SourceFromCollection extends RichSourceFunction[String] { private var isRunning = true var start = 0 override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (isRunning) { (start until (start + 100)).foreach(num => { ctx.collect(num.toString) if (num % 30 == 0) { TimeUnit.SECONDS.sleep(1) } }) start += 100 } } override def cancel(): Unit = { isRunning = false } } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env .addSource(new SourceFromCollection()) .setParallelism(1) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) .trigger(new SelfDefinedProcessingTimeTrigger()) .process(new ProcessAllWindowFunction[String, String, TimeWindow] { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { val cla = Calendar.getInstance() cla.setTimeInMillis(System.currentTimeMillis()) val date = dateFormat.format(cla.getTime) val info = elements.toArray.map(_.toInt) val min = info.min val max = info.max val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max==========" out.collect(output) } }).print() env.execute() } }
3.执行日志
窗口每个10s执行一次触发,触发 elem 数量为 10x30 = 300,通过示例可以看到 window-start window-end 和其对应的 format 时间形式,以及再次验证 maxTimestamp = window.end - 1。
编辑
五.总结
通过微调官方 Trigger 并增加日志,可以看到最常见的 CountTrigger 和 ProcessingTimeTrigger 的执行逻辑并对加深窗口触发的逻辑,后续将结合 CountTrigger 和 ProcessTimeTrigger 实现自定义的 CountAndTimeTrigger,该 Trigger 结合了 Count 和 ProcessingTime 的触发条件,可以让窗口在满足条数或满足间隔的情况下都触发。