Flink - CountAndProcessingTimeTrigger 基于 Count 和 Time 触发窗口

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: ​上一篇文章提到了CountTrigger && ProcessingTimeTriger,前者 CountTrigger 指定 count 数,当窗口内元素满足逻辑时进行一次触发,后者通过 TimeServer 注册窗口过期时间,到期后进行一次触发,本文自定义 Trigger 实现二者的合并即 Count 和 ProcessingTime 满足任意条件窗口都进行一次触发。...

 一.引言

上一篇文章提到了 CountTrigger && ProcessingTimeTriger,前者 CountTrigger 指定 count 数,当窗口内元素满足逻辑时进行一次触发,后者通过 TimeServer 注册窗口过期时间,到期后进行一次触发,本文自定义 Trigger 实现二者的合并即 Count 和 ProcessingTime 满足任意条件窗口都进行一次触发。

二.代码详解

1.CountAndProcessingTimeTrigger

整体代码如下,主要逻辑包含在 onElement 和 onProcessingTime,前者主要负责根据 count 触发,即实现 CountTrigger 的功能,后者则主要实现 ProcessingTime 的功能,需要预先定义两个 ReduceValue 分别记录 Count 和 Time,ReduceValue 详细用法可参考上文,下面分析主要方法。

class CountAndProcessingTimeTrigger(maxCount: Long, interval: Long) extends Trigger[String, TimeWindow] {
  // 条数计数器
  val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long])
  // 时间计数器,保存下一次触发的时间
  val timeStateDesc = new ReducingStateDescriptor[Long]("interval", new ReduceMin(), classOf[Long])
  // 元素过来后执行的操作
  override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state 并累加数量
    val count = triggerContext.getPartitionedState(countStateDesc)
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    // 考虑count是否足够
    count.add(1L)
    if (count.get() >= maxCount) {
      val log = s"CountTrigger Triggered Count: ${count.get()}"
      println(formatString(log))
      count.clear()
      // 不等于默认窗口的触发时间
      if (fireTimestamp.get() != window.maxTimestamp()) {
        triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
      }
      fireTimestamp.clear()
      return TriggerResult.FIRE
    }
    // 添加窗口的下次触发时间
    val currentTimeStamp = triggerContext.getCurrentProcessingTime
    if (fireTimestamp.get() == null) {
      val nextFireTimeStamp = currentTimeStamp + interval
      triggerContext.registerProcessingTimeTimer(nextFireTimeStamp)
      fireTimestamp.add(nextFireTimeStamp)
    }
    TriggerResult.CONTINUE
  }
  override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state
    val count = triggerContext.getPartitionedState(countStateDesc)
    // 获取 Interval state
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    // time default trigger
    if (time == window.maxTimestamp()) {
      val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}"
      println(formatString(log))
      count.clear()
      triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
      fireTimestamp.clear()
      fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
      triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
      return TriggerResult.FIRE
    } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
      val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}"
      println(formatString(log))
      count.clear()
      fireTimestamp.clear()
      fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
      triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
      return TriggerResult.FIRE
    }
    TriggerResult.CONTINUE
  }
  override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
    // 获取 count state
    val count = triggerContext.getPartitionedState(countStateDesc)
    // 获取 Interval state
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    count.clear()
    fireTimestamp.clear()
  }
}

image.gif

2.onElement

每个元素到达时执行 count.add 进行计数,如果满足超过定义的 maxCount 则进行触发操作:

---- 达到 MaxCount

A.log - 打印 log 标识本次触发来源于 CountTrigger

B.count.clear - 清空数值重新累加 count 并触发

C.deleteProcessingTime - 清空 TimeServer 计数器,因为触发后不管 Count 还是 ProcessingTime 都要重新计数或计时

----- 未达到 MaxCount

A.currentTime - 通过 ctx 上下文获取当前 ProcessingTime

B.registerProcessingTimeTimer - 判断时间 value 是否有值,如果没有值则根据 current & interval 计算得到 ProcessingTime 对应的下次窗口触发时间

----- 都不满足

A.TriggerResult.CONTINUE - 不做触发,等待 TimeServer 到期

override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state 并累加数量
    val count = triggerContext.getPartitionedState(countStateDesc)
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    // 考虑count是否足够
    count.add(1L)
    if (count.get() >= maxCount) {
      val log = s"CountTrigger Triggered Count: ${count.get()}"
      println(formatString(log))
      count.clear()
      // 不等于默认窗口的触发时间
      if (fireTimestamp.get() != window.maxTimestamp()) {
        triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
      }
      fireTimestamp.clear()
      return TriggerResult.FIRE
    }
    // 添加窗口的下次触发时间
    val currentTimeStamp = triggerContext.getCurrentProcessingTime
    if (fireTimestamp.get() == null) {
      val nextFireTimeStamp = currentTimeStamp + interval
      triggerContext.registerProcessingTimeTimer(nextFireTimeStamp)
      fireTimestamp.add(nextFireTimeStamp)
    }
    TriggerResult.CONTINUE
  }

image.gif

3.onProcessingTime

到达规定处理时间窗口执行的操作,上文我们讲到了窗口会在两种时机调用 onProcessingTime 方法,一种是达到自己定义的 ProcessintTimeTimer,窗口会进行 Fire 触发,此时触发数据为窗口的部分数据,还有一种是到达 window.maxTimeStamp 即到达 window.getEnd - 1L,此时窗口 Fire 触发的数据为 windowAll 定义的时间范围内所有数据,例如定义 Time.seconds(10),前者触发部分时间数据,后者触发完整的 10s 窗口。

----- 到达窗口默认触发时间

A.window.maxTimestamp - 到达窗口默认时间,打印对应日志标识

B.count.clear - 清空计数状态

C.deleteProcessingTime - 清空原计数器,因为这里窗口触发后就要重新计数和计时

D.registerProcessingTIme - 基于当前 ProcessingTime + interval 注册下次时间

E.TriggerResult.FIRE - 进行全数据的窗口触发

----- 到达自定义 interval 间隔

A.日志标识 - 打印 TimeTriggered 标识本次触发来源自自定义 ProcessingTime

B.clear - 窗口触发后清空原有 count 状态

C.registerProcessingTIme - 基于当前 ProcessingTime + interval 注册下次时间

D.TriggerResult.FIRE - 触发窗口数据

----- 都不满足

A.TriggerResult.CONTINUE - 什么都不做

override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state
    val count = triggerContext.getPartitionedState(countStateDesc)
    // 获取 Interval state
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    // time default trigger
    if (time == window.maxTimestamp()) {
      val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}"
      println(formatString(log))
      count.clear()
      triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
      fireTimestamp.clear()
      fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
      triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
      return TriggerResult.FIRE
    } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
      val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}"
      println(formatString(log))
      count.clear()
      fireTimestamp.clear()
      fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
      triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
      return TriggerResult.FIRE
    }
    TriggerResult.CONTINUE
  }

image.gif

4.onEventTime

因为是基于 Count 和 ProcessingTime,所以 onEventTime 返回 TriggerResult.CONTINUE

5.clear

清空 Count 和 FireTimestamp 对应的 ReduceValue

三.代码实践

1.主函数

上文 CountTrigger 和 ProcessingTimeTrigger 的 Soucre 都是固定的数据来源,每 s 发送30条数据,为了验证 CountAndProcessingTimeTrigger,这里采用 socket,自定义发送数据实现,本地nc -lk port 即可开启,processFunction 实现对 window 内数据的 min,max 统计和处理时间输出。

object CountAndProcessTimeTriggerDemo {
  val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss")
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env
      .socketTextStream("localhost", 9999)
      .setParallelism(1)
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .trigger(new CountAndProcessingTimeTrigger(10, 5000))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          val cla = Calendar.getInstance()
          cla.setTimeInMillis(System.currentTimeMillis())
          val date = dateFormat.format(cla.getTime)
          val info = elements.toArray.map(_.toInt)
          val min = info.min
          val max = info.max
          val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max=========="
          out.collect(output)
        }
      }).print()
    env.execute()
  }
}

image.gif

2.数据验证

上述 CountAndProcessintTimeTrigger 设定为 count = 10,interval = 5s

image.gif编辑

颜色 触发方式 输入 流程
CountTrigger 1-10 满足count=10,触发CountTrigger
DeaultTrigger 默认触发,全窗口数据为1-10
绿 ProcessingTimeTrigger 11,12 输入11,12 触发 ProcessingTimeTrigger
DeaultTrigger 13 结束前输入13,全窗口数据为 11-13
ProcessingTimeTrigger 14 输入14,等待 ProcessingTimeTrigger 触发
DeaultTrigger 默认触发,此时全窗口数据为 14

这里解释下为什么 interval 是 5s,但是 ProcessingTimeTrigger 输出日志的时间为 16 和 27,这是因为手动输入到 Socket 会有延迟,如果是机器默认发送数据,日志会修正为 15 和 25 触发 ProcessingTimeTrigger。

四.更多

CountTrigger 的逻辑比较简单,ProcessingTimeTrigger 这里只是一种定义方法,即窗口结束就重新设定过期时间,也可以跨窗口定义或者不设置整数时间,有兴趣可以自定义尝试。针对 CountTrigger、ProcessingTrigger 还有本文自定义的 CountAndProcessingTimeTrigger 我们都发现窗口触发时只调用了 FIRE,没有调用 FIRE_AND_PURGE 做 clear 清除操作,那窗口数据没有清除会不会越攒越多撑爆存储呢,其实也不会,WindowOperator 函数内置了默认的 onProcessingTime 方法,其内部会判断并调用 clearAllState 方法进行数据清空:

image.gif编辑

WindowOperator 作为所有窗口处理逻辑的入口,如果我们的 Trigger 返回 TirggerResult.FIRE,窗口会在到达 CleanupTime 时执行 clearAllState 方法清空当前 window 所有状态;如果返回 TriggerResult.FIRE_AND_PURGE,windowOperator 则会调用 Trigger @override 的 clear 方法,例如 ProcessingTimeTrigger 会将窗口的计时器清除,而本例如果返回 FIRE_AND_PURGE 则会同时清空 count 和 fireTimestamp 对应的两个 ReduceValue 的值:

image.gif编辑


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
227 0
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
292 0
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
406 0
|
SQL 存储 Unix
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
Flink SQL 在快手实践问题之设置 Window Offset 以调整窗口划分如何解决
216 2
|
10月前
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
2010 28
|
10月前
|
传感器 监控 数据挖掘
Flink 四大基石之 Time (时间语义) 的使用详解
Flink 中的时间分为三类:Event Time(事件发生时间)、Ingestion Time(数据进入系统时间)和 Processing Time(数据处理时间)。Event Time 通过嵌入事件中的时间戳准确反映数据顺序,支持复杂窗口操作。Watermark 机制用于处理 Event Time,确保数据完整性并触发窗口计算。Flink 还提供了多种迟到数据处理方式,如默认丢弃、侧输出流和允许延迟处理,以应对不同场景需求。掌握这些时间语义对编写高效、准确的 Flink 应用至关重要。
617 21
|
11月前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
1298 27
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
222 0
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
138 0
|
SQL 流计算
Flink SQL 在快手实践问题之CUMULATE窗口的划分逻辑如何解决
Flink SQL 在快手实践问题之CUMULATE窗口的划分逻辑如何解决
292 2