Flink / Scala - 使用 CountWindow 实现按条数触发窗口

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: CountWindow 数量窗口分为滑动窗口与滚动窗口,类似于之前 TimeWindow 的滚动时间与滑动时间,这里滚动窗口不存在元素重复而滑动窗口存在元素重复的情况,下面 demo 场景为非重复场景,所以将采用滚动窗口。......

一.引言

CountWindow 数量窗口分为滑动窗口与滚动窗口,类似于之前 TimeWindow 的滚动时间与滑动时间,这里滚动窗口不存在元素重复而滑动窗口存在元素重复的情况,下面 demo 场景为非重复场景,所以将采用滚动窗口。

二.CountWindow 简介

image.gif编辑

这里最关键的一句话是: A Window that represents a count window. For each count window, we will assign a unique id. Thus this CountWindow can act as namespace part in state. We can attach data to each different CountWindow. 翻译的意思是:表示计数窗口的窗口。对于每个计数窗口,我们将分配一个唯一的 id。因此,此计数窗口可以作为状态中的命名空间部分。我们可以将数据附加到每个不同的 CountWindow。

image.gif编辑

countWindow 共分为两种初始化方式,其中只添加 size 为滚动窗口,即不重复元素的 CountWindow,带 slide 滑动参数则生成滑动窗口,不在本文讨论范围之内。上面提到了对于每个计数的窗口,我们分配一个唯一的 id,这个 id 可以近似看做是以 keyBy 为依据进行数据分流,下面示例将给出解答。

三.任务场景与实现 👍

通过自定义 Source 实现无限数据流,我们希望指定 count = N,使得每 N 个数据作为一个 batch 触发一次窗口并计算。

1.自定义 Source

case class info(num: Int, id: Int)

image.gif

自定义 Source 需要继承 RichSourceFunction,这里我们生成 info 类数据结构,其内部包含两个元素 num 与 id,num 代表其内部的数字信息,id 代表其 keyBy 的分组信息:

class SourceFromCollection extends RichSourceFunction[info] {
  private var isRunning = true
  var start = 0
  override def run(ctx: SourceFunction.SourceContext[info]): Unit = {
    val random = new Random() // 生成随机 id
    while (isRunning) {
      (start until (start + 100)).foreach(num => {
        ctx.collect(info(num, random.nextInt(4))) // 随机id范围 0,1,2,3
        if (num % 20 == 0) { // 每生产20个数据 sleep 1s
          TimeUnit.SECONDS.sleep(1)
        }
      })
      start += 100 // 数值累加
    }
  }
  override def cancel(): Unit = {
    isRunning = false
  }
}

image.gif

Source 函数每 s 生成 20 个数字和其对应的 randomId,最终输出 info(num, id) 类。

2.执行流程

主函数主要基于 Source 实现 ProcessWindowFunction,并制定 count=20,每20个元素触发一次窗口计算:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream = env.addSource(new SourceFromCollection) // 添加 Source
    dataStream
      .keyBy(info => info.id) // 按 random 生成的 id 分配到不同 CountWindow 
      .countWindow(20) // size = 20 的滚动窗口
      .process(new ProcessWindowFunction[info, String, Int, GlobalWindow] {
      override def process(key: Int, context: Context, elements: Iterable[info], out: Collector[String]): Unit = {
        // 输出 id + size + 元素
        val log = key + "\t" + elements.size + "\t" + elements.toArray.map(_.num).mkString("|")
        out.collect(log)
      }
    }).print()
    env.execute()

image.gif

其中 ProcessWindowFunction 共有四个参数:

image.gif编辑

IN: 输入类型,本例数据流为 DataStream[info],所以 classOf[In] 为 info

OUT: 输出类型,与 Collector 后类型一致,这里输出类型为 String

KEY: Key 即为 keyBy 的 id,这里 key 为 random.nextInt,所以为 int 类型

W: org.apache.flink.streaming.api.windowing.windows.window,这里主要分为两类,如果采用时间窗口即 TimeWindow,则对应类型为 TimeWindow,本例中采用 CountWindow,则对应类型为 GlobalWindow。

3.运行结果

批处理每一批 size = 20,分别输出 id + "\t" + size + "\t" + 批次数据,可以看到每一批触发20条数据,且 id 分别为 0,1,2,3,这里还是根据 process 的并行度来定,如果 process 的并行度设定为2,则很大概率 0,1,2,3 均分至两台 TaskManager 上,如果规定并行度为4,则分别分配到单台 TaskManager 上,也可以根据数据的吞吐,修改并行度与 keyBy 时 nextInt 的范围。

image.gif编辑

四.CountTrigger

1.自定义 CountTrigger

之前提到过 CountTrigger 也可以实现按 count 数目进行窗口触发,但是有一点不同是 CountTrigger 不会清除窗口内元素,所以多次执行逻辑会重复处理一批数据,具体实现逻辑解析可以参考: Flink - CountTrigger && ProcessingTimeTriger 详解

class SelfDefinedCountTrigger(maxCount: Long) extends Trigger[String, TimeWindow] {
  // 条数计数器
  val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long])
  // 元素过来后执行的操作
  override def onElement(t: String, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    // 获取 count state 并累加数量
    val count = triggerContext.getPartitionedState(countStateDesc)
    count.add(1L)
    // 满足数量触发要求
    if (count.get() >= maxCount) {
      // 首先清空计数器
      count.clear()
      val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss")
      val cla = Calendar.getInstance()
      cla.setTimeInMillis(System.currentTimeMillis())
      val date = dateFormat.format(cla.getTime)
      println(s"[$date] Window Trigger By Count = ${maxCount}")
      TriggerResult.FIRE
    } else {
      TriggerResult.CONTINUE
    }
  }
  override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
    val count = triggerContext.getPartitionedState(countStateDesc)
    count.clear()
  }
}

image.gif

2.运行结果

之前介绍 Window 触发 CountTrigger 时也实现了基于 Count 的窗口触发机制,但是存在一个问题,CountTrigger 每次达到 count 数量触发,但是不会清除窗口数据,即窗口数据累加同时多次触发窗口:

image.gif编辑

如上,窗口逻辑为计算批次数据的最大最小值,同一个颜色框内为 count=30 多次触发的结果,可以看到 min 一直为同一个数字,max 持续增大,这就是上面提到的问题: 使用 countTrigger 时会造成窗口数据重复触发,所以想要实现无重复 CountWindow 就得最上面的 countWindow 实现。当然上面的执行逻辑也并不是没有使用场景,例如电商平台统计一段时间内商品销售的情况就可以使用 CountTrigger,实时滚动大屏数据展示。

五.CountWindow 完整代码

下面附上完整代码:

package com.weibo.ug.push.flink.Main
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector
import java.util.concurrent.TimeUnit
import com.weibo.ug.push.flink.Main.TestCountWindow.info
import scala.util.Random
object TestCountWindow {
  case class info(num: Int, id: Int)
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream = env.addSource(new SourceFromCollection)
    dataStream
      .keyBy(x => x.id)
      .countWindow(20)
      .process(new ProcessWindowFunction[info, String, Int, GlobalWindow] {
      override def process(key: Int, context: Context, elements: Iterable[info], out: Collector[String]): Unit = {
        val log = key + "\t" + elements.size + "\t" + elements.toArray.map(_.num).mkString("|")
        out.collect(log)
      }
    }).print()
    env.execute()
  }
}
class SourceFromCollection extends RichSourceFunction[info] {
  private var isRunning = true
  var start = 0
  override def run(ctx: SourceFunction.SourceContext[info]): Unit = {
    val random = new Random()
    while (isRunning) {
      (start until (start + 100)).foreach(num => {
        ctx.collect(info(num, random.nextInt(4)))
        if (num % 20 == 0) {
          TimeUnit.SECONDS.sleep(1)
        }
      })
      start += 100
    }
  }
  override def cancel(): Unit = {
    isRunning = false
  }
}

image.gif

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
SQL API 数据处理
实时计算 Flink版产品使用合集之如果一个窗口区间没有数据,若不会开窗就没法使用triggers赋默认值
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 网络安全 API
实时计算 Flink版产品使用问题之使用ProcessTime进行窗口计算,并且有4台机器的时间提前了2个小时,会导致什么情况
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL Java 数据处理
实时计算 Flink版产品使用问题之使用MavenShadePlugin进行relocation并遇到只包含了Java代码而未包含Scala代码,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 Kafka 数据库
实时计算 Flink版产品使用问题之如何对CDC数据进行窗口分组和聚合操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
运维 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在处理MySQL表新增数据记录时,没有正确触发变更事件,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
数据采集 关系型数据库 MySQL
实时计算 Flink版操作报错合集之源表的数据已经被手动删除,时间窗口内的数据仍存在,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
113 1
|
3月前
|
SQL 资源调度 监控
实时计算 Flink版产品使用合集之Flink on YARN 下,任务代码中通过 JobListener 监听任务状态,onJobSubmitted 和 onJobExecuted 同时触发如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
62 3
|
3月前
|
BI 数据处理 Apache
[AIGC] 深入理解Flink中的窗口、水位线和定时器
[AIGC] 深入理解Flink中的窗口、水位线和定时器
|
3月前
|
BI API 流计算
[实时流基础 flink] 窗口
[实时流基础 flink] 窗口
|
3月前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在尝试触发checkpoint时遇到了报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
44 0