Flink - CountTrigger && ProcessingTimeTriger 详解

简介: Flink 针对 window 提供了多种自定义 trigger,其中常见的有 CountTrigger 和 ProcessingTimeTrigger,下面通过两个 demo 了解一下两个 Trigger 的内部实现原理与窗口触发的相关知识。

一.引言

Flink 针对 window 提供了多种自定义 trigger,其中常见的有 CountTrigger 和 ProcessingTimeTrigger,下面通过两个 demo 了解一下两个 Trigger 的内部实现原理与窗口触发的相关知识。

二.辅助知识

介绍上述两个 Trigger 之前,首先重新回顾下之前提高的 trigger 基础知识。

1.Trigger 内部方法

· onElement :元素到达后执行的操作

· onProcessingTime:到达规定处理时间窗口执行的操作

· onEventTime :到达规定事件时间窗口执行的操作

· clear : 清除相关 value 变量

2.Window Trigger 后的操作

· TriggerResult.CONTINUE :跳过,什么都不做

· TriggerResult.FIRE :触发窗口计算

· TriggerResult.PURGE : 清除窗口元素

· TriggerResult.FIRE_AND_PURGE : 触发窗口操作,随后清空窗口元素

3.ReducingStateValue

ReducingStateValue 是一个抽象的统计量,需要用户自己定义其返回类型和对应的 reduce 操作,这里 reduce 并不是减少而是合并的意思,可以理解为 spark 里的 reduce(_ + _) 操作,即针对给定的 object1 和 object2,合成一个单独的 object,定义该变量方法如下:

val reduceStateDesc = new ReducingStateDescriptor[T]($key, new ReduceFunction(), classOf[T])

image.gif

T 代表返回变量类型,key 为其 name 标识,ReduceFunction 需要继承 org.apache.flink.api.common.functions.ReduceFunction 实现 reduce(o1: T, o2: T): T 的方法,下面示例生成一个得到两数中最小的一个的 RecuceFunction:

class ReduceMin() extends ReduceFunction[Long] {
  override def reduce(t1: Long, t2: Long): Long = {
    math.min(1, t2)
  }
}

image.gif

4.Window 默认触发时机

dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))

image.gif

使用 org.apache.flink.streaming.api.windowing.assigners 类下的窗口例如滚动窗口 TumblingProcessingTimeWindows 时,根据我们设置的时间,窗口的开始和结束时间其实是固定的,待参数确定后,窗口的 start - end 就确定好了,以上述 10s 滚动窗口为例,则窗口默认的开始结束时间为整点,按18:00 开始:

18:00:00 - 18:00:10 ,18:00:10 - 18:00:20 ... 18:59:50 - 19:00:00

image.gif

每当窗口到达规定结束时间时,都会默认调用 onProcessingTime 方法,这里不理解也没关系,等下看 demo 即可。

三.CountTrigger 详解

CountTrigger 按照 element 的个数进行触发,当元素数量达到 count_size 是,触发窗口计算逻辑,其内部统计 count 数量就用到了前面提到的 ReduceStateValue,为了在执行过程中添加运行日志,这里新增一个 SelfDefinedCountTrigger,代码与官方提供的 CountTrigger 完全一致,唯一差别是增加了日志打印。

1.SelfDefinedCountTrigger

trigger 内增加了触发的时间和触发的元素数量,主要逻辑都在 onElement 函数内,每来一个元素都会对 trigger 的 ReduceStateValue 累加值,这里采用 RecudeSum 函数,对两数求和,当数值达到 countSize 时进行窗口重触发 Fire 并清空 ReduceStateValue 开始新一轮计数;其余时间都返回 TriggerResult.CONTINUE。

class SelfDefinedCountTrigger(maxCount: Long) extends Trigger[String, TimeWindow] {
  // 条数计数器
  val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long])
  // 元素过来后执行的操作
  override def onElement(t: String, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state 并累加数量
    val count = triggerContext.getPartitionedState(countStateDesc)
    count.add(1L)
    // 满足数量触发要求
    if (count.get() >= maxCount) {
      // 首先清空计数器
      count.clear()
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss")
      val cla = Calendar.getInstance()
      cla.setTimeInMillis(System.currentTimeMillis())
      val date = dateFormat.format(cla.getTime)
      println(s"[$date] Window Trigger By Count = ${maxCount}")
      TriggerResult.FIRE
    } else {
      TriggerResult.CONTINUE
    }
  }
  override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
    val count = triggerContext.getPartitionedState(countStateDesc)
    count.clear()
  }
}

image.gif

2.主函数

主函数逻辑为从0开始,每s生成30个数字并循环累加,window 采用 10s 聚合的滚动窗口,trigger 采用 count = 30 的 CountTrigger,理论上每s生成的30个元素恰好触发窗口执行逻辑。窗口处理逻辑逻辑也很简单,直接输出当前 window 内的元素个数,元素 min,max 与处理时间:

object CountTriggerDemo {
  val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss")
  // 每s生成一批数据
  class SourceFromCollection extends RichSourceFunction[String] {
    private var isRunning = true
    var start = 0
    override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
      while (isRunning) {
        (start until (start + 100)).foreach(num => {
          ctx.collect(num.toString)
          if (num % 30 == 0) {
            TimeUnit.SECONDS.sleep(1)
          }
        })
        start += 100
      }
    }
    override def cancel(): Unit = {
      isRunning = false
    }
  }
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env
      .addSource(new SourceFromCollection())
      .setParallelism(1)
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .trigger(new SelfDefinedCountTrigger(30))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          val cla = Calendar.getInstance()
          cla.setTimeInMillis(System.currentTimeMillis())
          val date = dateFormat.format(cla.getTime)
          val info = elements.toArray.map(_.toInt)
          val min = info.min
          val max = info.max
          val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max=========="
          out.collect(output)
        }
      }).print()
    env.execute()
  }
}

image.gif

3.执行日志

由于启动时并非整数时间,所以第一个窗口只处理了 8s 数据 8:30:02 - 8:30:10,第一个窗口逻辑结束后,后面都是稳定的从 10s 的滑动窗口中按每 30 个元素一次进行触发,可以通过 Window Elem Num 看到 10s 内窗口数据的变化,10s x 30 = 300。

image.gif编辑

4.窗口默认触发机制

上面提高了窗口会在默认的 end 时间执行 onProcessingTime 方法,由于方法内只返回了 TriggerResult.CONTINUE 所以不明显,下面在 onProcessingTime 方法中增加日志验证一下:

override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    println(s"Window Trigger At Default End Time: $l")
    TriggerResult.CONTINUE
  }

image.gif

查看日志:

image.gif编辑

通过日志信息看到了窗口确实会在结束时间执行 onProcessingTime 方法,但是为什么不是整数呢

image.gif编辑

这里是因为窗口实际触发的 timeStamp 是 window.maxTimestamp() 变量对应的方法,而该方法定义如下:

public long maxTimestamp() {
        return this.end - 1L;
    }

image.gif

源码在默认 end 对应的时间戳上做了减一的处理,实际窗口结束时间为 1648034659999 + 1 :

image.gif编辑

所以这里验证了两个问题,第一就是窗口会在默认结束时间调用 onProcessingTime 方法,其次就是窗口的结束时间和真实触发时间相差 1L, window.maxTimestamp + 1 = window.getEnd。

四.ProcessingTimeTrigger 详解

processingTimeTrigger 是 processTime 对应 Flink 程序 window 的默认 trigger,其根据窗口默认的 start ,end 时间进行触发,还是用 10s 的 TumblingProcessingTimeWindows 窗口加自定义 SelfDefinedProcessingTimeTrigger 进行 demo 展示。

1.SelfDefinedProcessingTimeTrigger

ProcesingTimeTrigger 主要方法为 onElement 和 onProcessingTime ,前者对窗口进行 timeServer 的注册,其过期时间为 window.maxTimestamp,上面也提到了,时间就是 window.getEnd() - 1,后者执行 Fire 触发窗口计算,为了获得更细致的时间信息,这里增加了 processingTime 和 window-start window-end 的相关日志。

class SelfDefinedProcessingTimeTrigger() extends Trigger[String, TimeWindow] {
  // 条数计数器
  val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long])
  val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss")
  val cla = Calendar.getInstance()
  // 元素过来后执行的操作
  override def onElement(t: String, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    triggerContext.registerProcessingTimeTimer(w.maxTimestamp)
    TriggerResult.CONTINUE
  }
  override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    cla.setTimeInMillis(w.getStart)
    val start = dateFormat.format(cla.getTime)
    cla.setTimeInMillis(w.getEnd)
    val end = dateFormat.format(cla.getTime)
    println(s"start: $start end: $end processTime: $l maxTimeStamp: ${w.maxTimestamp()} windowStart: ${w.getStart} windowEnd: ${w.getEnd}")
    TriggerResult.FIRE
  }
  override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
    triggerContext.deleteProcessingTimeTimer(w.maxTimestamp)
  }
}

image.gif

2.主函数

数据 Source 为自定义 Source,每s生成30个元素,以10s为周期生成滚动窗口,处理逻辑依然为输出窗口的元素个数,min 和 max。

object ProcessTimeTriggerDemo {
  val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss")
  // 每s生成一批数据
  class SourceFromCollection extends RichSourceFunction[String] {
    private var isRunning = true
    var start = 0
    override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
      while (isRunning) {
        (start until (start + 100)).foreach(num => {
          ctx.collect(num.toString)
          if (num % 30 == 0) {
            TimeUnit.SECONDS.sleep(1)
          }
        })
        start += 100
      }
    }
    override def cancel(): Unit = {
      isRunning = false
    }
  }
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env
      .addSource(new SourceFromCollection())
      .setParallelism(1)
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .trigger(new SelfDefinedProcessingTimeTrigger())
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          val cla = Calendar.getInstance()
          cla.setTimeInMillis(System.currentTimeMillis())
          val date = dateFormat.format(cla.getTime)
          val info = elements.toArray.map(_.toInt)
          val min = info.min
          val max = info.max
          val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max=========="
          out.collect(output)
        }
      }).print()
    env.execute()
  }
}

image.gif

3.执行日志

窗口每个10s执行一次触发,触发 elem 数量为 10x30 = 300,通过示例可以看到 window-start window-end 和其对应的 format 时间形式,以及再次验证 maxTimestamp = window.end - 1。

image.gif编辑

五.总结

通过微调官方 Trigger 并增加日志,可以看到最常见的 CountTrigger 和 ProcessingTimeTrigger 的执行逻辑并对加深窗口触发的逻辑,后续将结合 CountTrigger 和 ProcessTimeTrigger 实现自定义的 CountAndTimeTrigger,该 Trigger 结合了 Count 和 ProcessingTime 的触发条件,可以让窗口在满足条数或满足间隔的情况下都触发。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
消息中间件 Kafka 流计算
Flink读取Kafka报Error sending fetch request
实时计算Flink读取消息队列Kafka,flink日志中出现Error sending fetch request (sessionId=1510763375, epoch=12890978) to node 103: {}. org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.DisconnectException: null
13189 3
Flink读取Kafka报Error sending fetch request
|
8月前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
1625 3
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
10月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1109 0
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
2255 28
|
消息中间件 负载均衡 算法
聊聊 RocketMQ中 Topic,Queue,Consumer,Consumer Group的关系
本文详细解析了RocketMQ中Topic、Queue、Consumer及Consumer Group之间的关系。文中通过图表展示了Topic可包含多个Queue,Queue分布在不同Broker上;Consumer组内多个消费者共享消息;并深入探讨了集群消费与广播消费模式下Queue与Consumer的关系,以及Rebalancing机制在实例增减时如何确保负载均衡。理解这些关系有助于更好地掌握RocketMQ的工作原理,提升系统运维效率。
3179 2
|
存储 SQL API
Flink教程(23)- Flink高级特性(Streaming File Sink)
Flink教程(23)- Flink高级特性(Streaming File Sink)
1120 0
|
IDE Java 应用服务中间件
Java“ClassNotFoundException”解决
Java中的“ClassNotFoundException”表示JVM找不到指定的类。解决方法包括:确保类路径正确、检查依赖是否完整、确认类名无误、清理和重新构建项目等。
3075 0
|
SQL 人工智能 自然语言处理
DataWorks Copilot:大模型时代数据开发的新范式
阿里云DataWorks是一站式数据开发治理平台,支持多种大数据引擎,助力企业构建数据仓库、湖仓一体架构。DataWorks现推出Copilot,致力于打造智能SQL助手和AI Agent,通过生成SQL、优化SQL、提供查询帮助、注释生成、错误修正等功能,帮助数据开发工程师和数据分析师提升SQL 开发和分析的效率和体验。目前,DataWorks Copilot正开放邀测,欢迎大家体验。
21381 7
|
存储 缓存 监控
Flink内存管理机制及其参数调优
Flink内存管理机制及其参数调优
|
监控 调度 流计算
【Flink】Flink的并行度了解吗?Flink的并行度设置是怎样的?
【4月更文挑战第18天】【Flink】Flink的并行度了解吗?Flink的并行度设置是怎样的?