Flink - CountAndProcessingTimeTrigger 基于 Count 和 Time 触发窗口

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: ​上一篇文章提到了CountTrigger && ProcessingTimeTriger,前者 CountTrigger 指定 count 数,当窗口内元素满足逻辑时进行一次触发,后者通过 TimeServer 注册窗口过期时间,到期后进行一次触发,本文自定义 Trigger 实现二者的合并即 Count 和 ProcessingTime 满足任意条件窗口都进行一次触发。...

 一.引言

上一篇文章提到了 CountTrigger && ProcessingTimeTriger,前者 CountTrigger 指定 count 数,当窗口内元素满足逻辑时进行一次触发,后者通过 TimeServer 注册窗口过期时间,到期后进行一次触发,本文自定义 Trigger 实现二者的合并即 Count 和 ProcessingTime 满足任意条件窗口都进行一次触发。

二.代码详解

1.CountAndProcessingTimeTrigger

整体代码如下,主要逻辑包含在 onElement 和 onProcessingTime,前者主要负责根据 count 触发,即实现 CountTrigger 的功能,后者则主要实现 ProcessingTime 的功能,需要预先定义两个 ReduceValue 分别记录 Count 和 Time,ReduceValue 详细用法可参考上文,下面分析主要方法。

class CountAndProcessingTimeTrigger(maxCount: Long, interval: Long) extends Trigger[String, TimeWindow] {
  // 条数计数器
  val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long])
  // 时间计数器,保存下一次触发的时间
  val timeStateDesc = new ReducingStateDescriptor[Long]("interval", new ReduceMin(), classOf[Long])
  // 元素过来后执行的操作
  override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state 并累加数量
    val count = triggerContext.getPartitionedState(countStateDesc)
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    // 考虑count是否足够
    count.add(1L)
    if (count.get() >= maxCount) {
      val log = s"CountTrigger Triggered Count: ${count.get()}"
      println(formatString(log))
      count.clear()
      // 不等于默认窗口的触发时间
      if (fireTimestamp.get() != window.maxTimestamp()) {
        triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
      }
      fireTimestamp.clear()
      return TriggerResult.FIRE
    }
    // 添加窗口的下次触发时间
    val currentTimeStamp = triggerContext.getCurrentProcessingTime
    if (fireTimestamp.get() == null) {
      val nextFireTimeStamp = currentTimeStamp + interval
      triggerContext.registerProcessingTimeTimer(nextFireTimeStamp)
      fireTimestamp.add(nextFireTimeStamp)
    }
    TriggerResult.CONTINUE
  }
  override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state
    val count = triggerContext.getPartitionedState(countStateDesc)
    // 获取 Interval state
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    // time default trigger
    if (time == window.maxTimestamp()) {
      val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}"
      println(formatString(log))
      count.clear()
      triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
      fireTimestamp.clear()
      fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
      triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
      return TriggerResult.FIRE
    } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
      val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}"
      println(formatString(log))
      count.clear()
      fireTimestamp.clear()
      fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
      triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
      return TriggerResult.FIRE
    }
    TriggerResult.CONTINUE
  }
  override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
    // 获取 count state
    val count = triggerContext.getPartitionedState(countStateDesc)
    // 获取 Interval state
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    count.clear()
    fireTimestamp.clear()
  }
}

image.gif

2.onElement

每个元素到达时执行 count.add 进行计数,如果满足超过定义的 maxCount 则进行触发操作:

---- 达到 MaxCount

A.log - 打印 log 标识本次触发来源于 CountTrigger

B.count.clear - 清空数值重新累加 count 并触发

C.deleteProcessingTime - 清空 TimeServer 计数器,因为触发后不管 Count 还是 ProcessingTime 都要重新计数或计时

----- 未达到 MaxCount

A.currentTime - 通过 ctx 上下文获取当前 ProcessingTime

B.registerProcessingTimeTimer - 判断时间 value 是否有值,如果没有值则根据 current & interval 计算得到 ProcessingTime 对应的下次窗口触发时间

----- 都不满足

A.TriggerResult.CONTINUE - 不做触发,等待 TimeServer 到期

override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state 并累加数量
    val count = triggerContext.getPartitionedState(countStateDesc)
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    // 考虑count是否足够
    count.add(1L)
    if (count.get() >= maxCount) {
      val log = s"CountTrigger Triggered Count: ${count.get()}"
      println(formatString(log))
      count.clear()
      // 不等于默认窗口的触发时间
      if (fireTimestamp.get() != window.maxTimestamp()) {
        triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
      }
      fireTimestamp.clear()
      return TriggerResult.FIRE
    }
    // 添加窗口的下次触发时间
    val currentTimeStamp = triggerContext.getCurrentProcessingTime
    if (fireTimestamp.get() == null) {
      val nextFireTimeStamp = currentTimeStamp + interval
      triggerContext.registerProcessingTimeTimer(nextFireTimeStamp)
      fireTimestamp.add(nextFireTimeStamp)
    }
    TriggerResult.CONTINUE
  }

image.gif

3.onProcessingTime

到达规定处理时间窗口执行的操作,上文我们讲到了窗口会在两种时机调用 onProcessingTime 方法,一种是达到自己定义的 ProcessintTimeTimer,窗口会进行 Fire 触发,此时触发数据为窗口的部分数据,还有一种是到达 window.maxTimeStamp 即到达 window.getEnd - 1L,此时窗口 Fire 触发的数据为 windowAll 定义的时间范围内所有数据,例如定义 Time.seconds(10),前者触发部分时间数据,后者触发完整的 10s 窗口。

----- 到达窗口默认触发时间

A.window.maxTimestamp - 到达窗口默认时间,打印对应日志标识

B.count.clear - 清空计数状态

C.deleteProcessingTime - 清空原计数器,因为这里窗口触发后就要重新计数和计时

D.registerProcessingTIme - 基于当前 ProcessingTime + interval 注册下次时间

E.TriggerResult.FIRE - 进行全数据的窗口触发

----- 到达自定义 interval 间隔

A.日志标识 - 打印 TimeTriggered 标识本次触发来源自自定义 ProcessingTime

B.clear - 窗口触发后清空原有 count 状态

C.registerProcessingTIme - 基于当前 ProcessingTime + interval 注册下次时间

D.TriggerResult.FIRE - 触发窗口数据

----- 都不满足

A.TriggerResult.CONTINUE - 什么都不做

override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state
    val count = triggerContext.getPartitionedState(countStateDesc)
    // 获取 Interval state
    val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
    // time default trigger
    if (time == window.maxTimestamp()) {
      val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}"
      println(formatString(log))
      count.clear()
      triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
      fireTimestamp.clear()
      fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
      triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
      return TriggerResult.FIRE
    } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
      val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}"
      println(formatString(log))
      count.clear()
      fireTimestamp.clear()
      fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
      triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
      return TriggerResult.FIRE
    }
    TriggerResult.CONTINUE
  }

image.gif

4.onEventTime

因为是基于 Count 和 ProcessingTime,所以 onEventTime 返回 TriggerResult.CONTINUE

5.clear

清空 Count 和 FireTimestamp 对应的 ReduceValue

三.代码实践

1.主函数

上文 CountTrigger 和 ProcessingTimeTrigger 的 Soucre 都是固定的数据来源,每 s 发送30条数据,为了验证 CountAndProcessingTimeTrigger,这里采用 socket,自定义发送数据实现,本地nc -lk port 即可开启,processFunction 实现对 window 内数据的 min,max 统计和处理时间输出。

object CountAndProcessTimeTriggerDemo {
  val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss")
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env
      .socketTextStream("localhost", 9999)
      .setParallelism(1)
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .trigger(new CountAndProcessingTimeTrigger(10, 5000))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          val cla = Calendar.getInstance()
          cla.setTimeInMillis(System.currentTimeMillis())
          val date = dateFormat.format(cla.getTime)
          val info = elements.toArray.map(_.toInt)
          val min = info.min
          val max = info.max
          val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max=========="
          out.collect(output)
        }
      }).print()
    env.execute()
  }
}

image.gif

2.数据验证

上述 CountAndProcessintTimeTrigger 设定为 count = 10,interval = 5s

image.gif编辑

颜色 触发方式 输入 流程
CountTrigger 1-10 满足count=10,触发CountTrigger
DeaultTrigger 默认触发,全窗口数据为1-10
绿 ProcessingTimeTrigger 11,12 输入11,12 触发 ProcessingTimeTrigger
DeaultTrigger 13 结束前输入13,全窗口数据为 11-13
ProcessingTimeTrigger 14 输入14,等待 ProcessingTimeTrigger 触发
DeaultTrigger 默认触发,此时全窗口数据为 14

这里解释下为什么 interval 是 5s,但是 ProcessingTimeTrigger 输出日志的时间为 16 和 27,这是因为手动输入到 Socket 会有延迟,如果是机器默认发送数据,日志会修正为 15 和 25 触发 ProcessingTimeTrigger。

四.更多

CountTrigger 的逻辑比较简单,ProcessingTimeTrigger 这里只是一种定义方法,即窗口结束就重新设定过期时间,也可以跨窗口定义或者不设置整数时间,有兴趣可以自定义尝试。针对 CountTrigger、ProcessingTrigger 还有本文自定义的 CountAndProcessingTimeTrigger 我们都发现窗口触发时只调用了 FIRE,没有调用 FIRE_AND_PURGE 做 clear 清除操作,那窗口数据没有清除会不会越攒越多撑爆存储呢,其实也不会,WindowOperator 函数内置了默认的 onProcessingTime 方法,其内部会判断并调用 clearAllState 方法进行数据清空:

image.gif编辑

WindowOperator 作为所有窗口处理逻辑的入口,如果我们的 Trigger 返回 TirggerResult.FIRE,窗口会在到达 CleanupTime 时执行 clearAllState 方法清空当前 window 所有状态;如果返回 TriggerResult.FIRE_AND_PURGE,windowOperator 则会调用 Trigger @override 的 clear 方法,例如 ProcessingTimeTrigger 会将窗口的计时器清除,而本例如果返回 FIRE_AND_PURGE 则会同时清空 count 和 fireTimestamp 对应的两个 ReduceValue 的值:

image.gif编辑


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
1月前
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
1月前
|
SQL API 数据处理
实时计算 Flink版产品使用合集之如果一个窗口区间没有数据,若不会开窗就没法使用triggers赋默认值
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7天前
|
数据采集 关系型数据库 MySQL
实时计算 Flink版操作报错合集之源表的数据已经被手动删除,时间窗口内的数据仍存在,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
SQL 资源调度 监控
实时计算 Flink版产品使用合集之Flink on YARN 下,任务代码中通过 JobListener 监听任务状态,onJobSubmitted 和 onJobExecuted 同时触发如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
38 3
|
1月前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
57 0
|
1月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
41 0
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
40 0
|
1月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在尝试触发checkpoint时遇到了报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
28 0
|
1月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在尝试触发checkpoint时遇到了报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
30 0