一.引言
上一篇文章提到了 CountTrigger && ProcessingTimeTriger,前者 CountTrigger 指定 count 数,当窗口内元素满足逻辑时进行一次触发,后者通过 TimeServer 注册窗口过期时间,到期后进行一次触发,本文自定义 Trigger 实现二者的合并即 Count 和 ProcessingTime 满足任意条件窗口都进行一次触发。
二.代码详解
1.CountAndProcessingTimeTrigger
整体代码如下,主要逻辑包含在 onElement 和 onProcessingTime,前者主要负责根据 count 触发,即实现 CountTrigger 的功能,后者则主要实现 ProcessingTime 的功能,需要预先定义两个 ReduceValue 分别记录 Count 和 Time,ReduceValue 详细用法可参考上文,下面分析主要方法。
class CountAndProcessingTimeTrigger(maxCount: Long, interval: Long) extends Trigger[String, TimeWindow] { // 条数计数器 val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long]) // 时间计数器,保存下一次触发的时间 val timeStateDesc = new ReducingStateDescriptor[Long]("interval", new ReduceMin(), classOf[Long]) // 元素过来后执行的操作 override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state 并累加数量 val count = triggerContext.getPartitionedState(countStateDesc) val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) // 考虑count是否足够 count.add(1L) if (count.get() >= maxCount) { val log = s"CountTrigger Triggered Count: ${count.get()}" println(formatString(log)) count.clear() // 不等于默认窗口的触发时间 if (fireTimestamp.get() != window.maxTimestamp()) { triggerContext.deleteProcessingTimeTimer(fireTimestamp.get()) } fireTimestamp.clear() return TriggerResult.FIRE } // 添加窗口的下次触发时间 val currentTimeStamp = triggerContext.getCurrentProcessingTime if (fireTimestamp.get() == null) { val nextFireTimeStamp = currentTimeStamp + interval triggerContext.registerProcessingTimeTimer(nextFireTimeStamp) fireTimestamp.add(nextFireTimeStamp) } TriggerResult.CONTINUE } override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state val count = triggerContext.getPartitionedState(countStateDesc) // 获取 Interval state val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) // time default trigger if (time == window.maxTimestamp()) { val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}" println(formatString(log)) count.clear() triggerContext.deleteProcessingTimeTimer(fireTimestamp.get()) fireTimestamp.clear() fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval) triggerContext.registerProcessingTimeTimer(fireTimestamp.get()) return TriggerResult.FIRE } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) { val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}" println(formatString(log)) count.clear() fireTimestamp.clear() fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval) triggerContext.registerProcessingTimeTimer(fireTimestamp.get()) return TriggerResult.FIRE } TriggerResult.CONTINUE } override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { // 获取 count state val count = triggerContext.getPartitionedState(countStateDesc) // 获取 Interval state val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) count.clear() fireTimestamp.clear() } }
2.onElement
每个元素到达时执行 count.add 进行计数,如果满足超过定义的 maxCount 则进行触发操作:
---- 达到 MaxCount
A.log - 打印 log 标识本次触发来源于 CountTrigger
B.count.clear - 清空数值重新累加 count 并触发
C.deleteProcessingTime - 清空 TimeServer 计数器,因为触发后不管 Count 还是 ProcessingTime 都要重新计数或计时
----- 未达到 MaxCount
A.currentTime - 通过 ctx 上下文获取当前 ProcessingTime
B.registerProcessingTimeTimer - 判断时间 value 是否有值,如果没有值则根据 current & interval 计算得到 ProcessingTime 对应的下次窗口触发时间
----- 都不满足
A.TriggerResult.CONTINUE - 不做触发,等待 TimeServer 到期
override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state 并累加数量 val count = triggerContext.getPartitionedState(countStateDesc) val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) // 考虑count是否足够 count.add(1L) if (count.get() >= maxCount) { val log = s"CountTrigger Triggered Count: ${count.get()}" println(formatString(log)) count.clear() // 不等于默认窗口的触发时间 if (fireTimestamp.get() != window.maxTimestamp()) { triggerContext.deleteProcessingTimeTimer(fireTimestamp.get()) } fireTimestamp.clear() return TriggerResult.FIRE } // 添加窗口的下次触发时间 val currentTimeStamp = triggerContext.getCurrentProcessingTime if (fireTimestamp.get() == null) { val nextFireTimeStamp = currentTimeStamp + interval triggerContext.registerProcessingTimeTimer(nextFireTimeStamp) fireTimestamp.add(nextFireTimeStamp) } TriggerResult.CONTINUE }
3.onProcessingTime
到达规定处理时间窗口执行的操作,上文我们讲到了窗口会在两种时机调用 onProcessingTime 方法,一种是达到自己定义的 ProcessintTimeTimer,窗口会进行 Fire 触发,此时触发数据为窗口的部分数据,还有一种是到达 window.maxTimeStamp 即到达 window.getEnd - 1L,此时窗口 Fire 触发的数据为 windowAll 定义的时间范围内所有数据,例如定义 Time.seconds(10),前者触发部分时间数据,后者触发完整的 10s 窗口。
----- 到达窗口默认触发时间
A.window.maxTimestamp - 到达窗口默认时间,打印对应日志标识
B.count.clear - 清空计数状态
C.deleteProcessingTime - 清空原计数器,因为这里窗口触发后就要重新计数和计时
D.registerProcessingTIme - 基于当前 ProcessingTime + interval 注册下次时间
E.TriggerResult.FIRE - 进行全数据的窗口触发
----- 到达自定义 interval 间隔
A.日志标识 - 打印 TimeTriggered 标识本次触发来源自自定义 ProcessingTime
B.clear - 窗口触发后清空原有 count 状态
C.registerProcessingTIme - 基于当前 ProcessingTime + interval 注册下次时间
D.TriggerResult.FIRE - 触发窗口数据
----- 都不满足
A.TriggerResult.CONTINUE - 什么都不做
override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state val count = triggerContext.getPartitionedState(countStateDesc) // 获取 Interval state val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) // time default trigger if (time == window.maxTimestamp()) { val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}" println(formatString(log)) count.clear() triggerContext.deleteProcessingTimeTimer(fireTimestamp.get()) fireTimestamp.clear() fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval) triggerContext.registerProcessingTimeTimer(fireTimestamp.get()) return TriggerResult.FIRE } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) { val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}" println(formatString(log)) count.clear() fireTimestamp.clear() fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval) triggerContext.registerProcessingTimeTimer(fireTimestamp.get()) return TriggerResult.FIRE } TriggerResult.CONTINUE }
4.onEventTime
因为是基于 Count 和 ProcessingTime,所以 onEventTime 返回 TriggerResult.CONTINUE
5.clear
清空 Count 和 FireTimestamp 对应的 ReduceValue
三.代码实践
1.主函数
上文 CountTrigger 和 ProcessingTimeTrigger 的 Soucre 都是固定的数据来源,每 s 发送30条数据,为了验证 CountAndProcessingTimeTrigger,这里采用 socket,自定义发送数据实现,本地nc -lk port 即可开启,processFunction 实现对 window 内数据的 min,max 统计和处理时间输出。
object CountAndProcessTimeTriggerDemo { val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env .socketTextStream("localhost", 9999) .setParallelism(1) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) .trigger(new CountAndProcessingTimeTrigger(10, 5000)) .process(new ProcessAllWindowFunction[String, String, TimeWindow] { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { val cla = Calendar.getInstance() cla.setTimeInMillis(System.currentTimeMillis()) val date = dateFormat.format(cla.getTime) val info = elements.toArray.map(_.toInt) val min = info.min val max = info.max val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max==========" out.collect(output) } }).print() env.execute() } }
2.数据验证
上述 CountAndProcessintTimeTrigger 设定为 count = 10,interval = 5s
编辑
颜色 | 触发方式 | 输入 | 流程 |
蓝 | CountTrigger | 1-10 | 满足count=10,触发CountTrigger |
红 | DeaultTrigger | 默认触发,全窗口数据为1-10 | |
绿 | ProcessingTimeTrigger | 11,12 | 输入11,12 触发 ProcessingTimeTrigger |
黄 | DeaultTrigger | 13 | 结束前输入13,全窗口数据为 11-13 |
灰 | ProcessingTimeTrigger | 14 | 输入14,等待 ProcessingTimeTrigger 触发 |
白 | DeaultTrigger | 默认触发,此时全窗口数据为 14 |
这里解释下为什么 interval 是 5s,但是 ProcessingTimeTrigger 输出日志的时间为 16 和 27,这是因为手动输入到 Socket 会有延迟,如果是机器默认发送数据,日志会修正为 15 和 25 触发 ProcessingTimeTrigger。
四.更多
CountTrigger 的逻辑比较简单,ProcessingTimeTrigger 这里只是一种定义方法,即窗口结束就重新设定过期时间,也可以跨窗口定义或者不设置整数时间,有兴趣可以自定义尝试。针对 CountTrigger、ProcessingTrigger 还有本文自定义的 CountAndProcessingTimeTrigger 我们都发现窗口触发时只调用了 FIRE,没有调用 FIRE_AND_PURGE 做 clear 清除操作,那窗口数据没有清除会不会越攒越多撑爆存储呢,其实也不会,WindowOperator 函数内置了默认的 onProcessingTime 方法,其内部会判断并调用 clearAllState 方法进行数据清空:
编辑
WindowOperator 作为所有窗口处理逻辑的入口,如果我们的 Trigger 返回 TirggerResult.FIRE,窗口会在到达 CleanupTime 时执行 clearAllState 方法清空当前 window 所有状态;如果返回 TriggerResult.FIRE_AND_PURGE,windowOperator 则会调用 Trigger @override 的 clear 方法,例如 ProcessingTimeTrigger 会将窗口的计时器清除,而本例如果返回 FIRE_AND_PURGE 则会同时清空 count 和 fireTimestamp 对应的两个 ReduceValue 的值:
编辑