开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大佬们好,Flink CDC flink中如何组合count窗口和time窗口呢?比如说数据达到1

大佬们好,Flink CDC flink中如何组合count窗口和time窗口呢?比如说数据达到100条或者达到10秒钟,哪个先到就触发,这种怎么实现呢?

展开
收起
雪哥哥 2022-12-05 07:54:27 1407 0
1 条回答
写回答
取消 提交回答
  • 1.CountAndProcessingTimeTrigger

    整体代码如下,主要逻辑包含在 onElement 和 onProcessingTime,前者主要负责根据 count 触发,即实现 CountTrigger 的功能,后者则主要实现 ProcessingTime 的功能,需要预先定义两个 ReduceValue 分别记录 Count 和 Time,ReduceValue 详细用法可参考上文,下面分析主要方法。

    class CountAndProcessingTimeTrigger(maxCount: Long, interval: Long) extends Trigger[String, TimeWindow] {
      // 条数计数器
      val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long])
      // 时间计数器,保存下一次触发的时间
      val timeStateDesc = new ReducingStateDescriptor[Long]("interval", new ReduceMin(), classOf[Long])
      // 元素过来后执行的操作
      override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
        // 获取 count state 并累加数量
        val count = triggerContext.getPartitionedState(countStateDesc)
        val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
        // 考虑count是否足够
        count.add(1L)
        if (count.get() >= maxCount) {
          val log = s"CountTrigger Triggered Count: ${count.get()}"
          println(formatString(log))
          count.clear()
          // 不等于默认窗口的触发时间
          if (fireTimestamp.get() != window.maxTimestamp()) {
            triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
          }
          fireTimestamp.clear()
          return TriggerResult.FIRE
        }
        // 添加窗口的下次触发时间
        val currentTimeStamp = triggerContext.getCurrentProcessingTime
        if (fireTimestamp.get() == null) {
          val nextFireTimeStamp = currentTimeStamp + interval
          triggerContext.registerProcessingTimeTimer(nextFireTimeStamp)
          fireTimestamp.add(nextFireTimeStamp)
        }
        TriggerResult.CONTINUE
      }
      override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
        // 获取 count state
        val count = triggerContext.getPartitionedState(countStateDesc)
        // 获取 Interval state
        val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
        // time default trigger
        if (time == window.maxTimestamp()) {
          val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}"
          println(formatString(log))
          count.clear()
          triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
          fireTimestamp.clear()
          fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
          triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
          return TriggerResult.FIRE
        } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
          val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}"
          println(formatString(log))
          count.clear()
          fireTimestamp.clear()
          fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
          triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
          return TriggerResult.FIRE
        }
        TriggerResult.CONTINUE
      }
      override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
        TriggerResult.CONTINUE
      }
      override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
        // 获取 count state
        val count = triggerContext.getPartitionedState(countStateDesc)
        // 获取 Interval state
        val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
        count.clear()
        fireTimestamp.clear()
    
    

    } }

    2022-12-09 08:25:34
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载