Apache Flink:Keyed Window与Non-Keyed Window

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗?Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。

5万人关注的大数据成神之路,不来了解一下吗?
5万人关注的大数据成神之路,真的不来了解一下吗?
5万人关注的大数据成神之路,确定真的不来了解一下吗?

Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。本文我们主要基于Apache Flink 1.4.0版本,说明Keyed Window与Non-Keyed Window的基本概念,然后分别对与其相关的WindowFunction与WindowAllFunction的类设计进行分析,最后通过编程实践来应用。

基本概念

Flink将Window分为两类,一类叫做Keyed Window,另一类叫做Non-Keyed Window。为了说明这两类Window的不同,我们看下Flink官网给出的,基于这两种类型的Window编写代码的结构说明。
基于Keyed Window进行编程,用户代码基本结构如下所示:

image

基于Non-Keyed Window进行编程,用户代码基本结构如下所示:

image

上面两种编程结构的区别在于:

从编程API上看,Keyed Window编程结构,可以直接对输入的stream按照Key进行操作,输入的stream中识别Key,即输入stream中的每个数据元素哪一部分是作为Key来关联这个数据元素的,这样就可以对stream中的数据元素基于Key进行相关计算操作,如keyBy,可以根据Key进行分组(相同的Key必然可以分到同一组中去)。如果输入的stream中没有Key,比如就是一条日志记录信息,那么无法对其进行keyBy操作。而对于Non-Keyed Window编程结构来说,无论输入的stream具有何种结构(比如是否具有Key),它都认为是无结构的,不能对其进行keyBy操作,而且如果使用Non-Keyed Window函数操作,就会对该stream进行分组(具体如何分组依赖于我们选择的WindowAssigner,它负责将stream中的每个数据元素指派到一个或多个Window中),指派到一个或多个Window中,然后后续应用到该stream上的计算都是对Window中的这些数据元素进行操作。

从计算上看,Keyed Window编程结构会将输入的stream转换成Keyed stream,逻辑上会对应多个Keyed stream,每个Keyed stream会独立进行计算,这就使得多个Task可以对Windowing操作进行并行处理,具有相同Key的数据元素会被发到同一个Task中进行处理。而对于Non-Keyed Window编程结构,Non-Keyed stream逻辑上将不能split成多个stream,所有的Windowing操作逻辑只能在一个Task中进行处理,也就是说计算并行度为1。

在实际编程过程中,我们可以看到DataStream的API也有对应的方法timeWindow()和timeWindowAll(),他们也分别对应着Keyed Window和Non-Keyed Window。

WindowFunction与AllWindowFunction

Flink中对输入stream进行Windowing操作后,将到达的数据元素指派到指定的Window中,或者基于EventTime/ProcessingTime,或者基于Count,或者混合EventTime/ProcessingTime/Count,来对数据元素进行分组。那么,在对分配的Window进行操作时,就需要使用Flink提供的函数(Function),而对于Window的操作,分别基于Keyed Window、Non-Keyed Window提供了WindowFunction、AllWindowFunction,通过实现特定的Window函数,能够访问Window相关的元数据,来满足实际应用需要。下面,我们从类设计的角度,来看下对应的继承层次结构:

  • Keyed Window对应的WindowFunction

Keyed Window对应的WindowFunction类图,如下所示:

image

通常,如果我们想要自定义处理Window中数据元素的处理逻辑,或者访问Window对应的元数据,可以继承自ProcessWindowFunction类来实现。我们看一下ProcessWindowFunction对应的类声明:

image

对Keyed stream的Window进行操作,上面泛型对应4个类型参数:

IN表示进入到该ProcessWindowFunction的数据元素的类型,例如stream中上一个操作的输出是包含两个String类型的元组,则IN类型对应为(String, String);

OUT表示该ProcessWindowFunction处理后的输出数据元素的类型,例如输出一个String和一个Long的元组,则OUT类型对应为(String, Long);

KEY有一点不同,需要注意,它并不是面向应用编程用户使用的,而且该值不会提供有意义的业务应用含义,在Keyed Window中它是用来跟踪该Window的,一般应用开发中只需要将其作为输出的Key即可,后面我们会有对应的编程实践;

W类型表示该ProcessWindowFunction作用的Window的类型,例如TimeWindow、GlobalWindow。
下面,我们看一下继承自ProcessWindowFunction需要实现的方法,方法签名如下所示:

image

进入到该Window,对应着其中一个Keyed stream。属于某个Window的数据元素都在elements这个集合中,我们可以对这些数据元素进行处理。通过context可以访问Window对应的元数据信息,比如TimeWindow的开始时间(start)和结束时间(end)。out是一个Collector,负责收集处理后的数据元素并发送到stream下游进行处理。

  • Non-Keyed Window对应的AllWindowFunction

Non-Keyed Window对应的WindowFunction类图,如下所示:

image

类似地,如果我们想要自定义处理Window中数据元素的处理逻辑,或者访问Window对应的元数据,可以继承自ProcessAllWindowFunction类来实现。我们看一下ProcessAllWindowFunction对应的类声明:

image

可以同ProcessWindowFunction对比一下,发现ProcessAllWindowFunction的泛型参数中没有了用来跟踪Window的KEY,因为Non-Keyed Window只在一个Task中进行处理,其它的OUT和W与前面ProcessWindowFunction类相同,不再累述。
继承自ProcessAllWindowFunction,需要实现的方法,如下所示:

image

该ProcessAllWindowFunction作用于原始输入的stream,所有的数据元素经过Windowing后,都会经过该方法进行处理,在该方法具体处理逻辑与ProcessWindowFunction.process()类似。

编程实践

现在,我们模拟这样一个场景:某个App开发商需要从多个渠道(Channel)推广App,需要通过日志来分析对应的用户行为(安装、打开、浏览、点击、购买、关闭、卸载),我们假设要实时(近实时)统计分析每个时间段内(如每隔5秒)来自不同渠道的用户的行为。
首先,创建一个模拟生成数据的SourceFunction,实现代码如下所示:

image

有了该数据源,我们就可以基于该SimulatedEventSource来构建Flink Streaming应用程序了。下面,也分别面向Keyed Window和Non-Keyed Window来编程实践,并比较它们不同之处。

Keyed Window编程

我们基于Sliding Window(WindowAssigner)来在stream上生成Window,Window大小size=5s,silde=1s,即每个Window计算5s之内的数据元素,每个1s启动一个Window(查看提交该Flink程序的命令行中指定的各个参数值)。同时,基于上面自定义实现的SimulatedEventSource作为输入数据源,创建Flink stream,然后后续就可以对stream进行各种操作了。

处理stream数据,我们希望能够获取到每个Window对应的起始时间和结束时间,然后输出基于Window(起始时间+结束时间)、渠道(Channel)、行为类型进行分组统计的结果,最后将结果数据实时写入到指定Kafka topic中。

我们实现的Flink程序类为SlidingWindowAnalytics,代码如下所示:

image

首先,对输入stream进行一个map操作,处理输出 ((渠道, 行为类型), 计数)。
其次,基于该结果进行一个keyBy操作,指定Key为(渠道, 行为类型),得到了多个Keyed stream。

接着,对每个Keyed stream应用Sliding Window操作,设置Sliding Window的size和slide值。

然后,因为我们想要获取到Window对应的起始时间和结束时间,所以需要对Windowing后的stream进行一个ProcessWindowFunction操作,这个是我们自定义实现的,在其中获取到Window起始时间和结束时间,并对Windowing的数据进行分组统计(groupBy),然后输出带有Window起始时间和结束时间,以及渠道、行为类型、统计计数这些信息,对应的实现类为MyReduceWindowFunction,代码如下所示:

image

上面对应于ProcessWindowFunction的泛型参数的值,分别为:IN=((String, String), Long)、OUT=((String, String, String, String), Long)、KEY=Tuple、W=TimeWindow,这样可以对照方法process()中的各个参数的类型来理解。上述代码中,elements中可能存在多个相同的Key的值,但是具有同一个Key的数据元素一定会在同一个Window中(即elements),我们需要对elements进行一个groupBy的内存计算操作,再对每个group中的数据进行汇总计数,输出为((Window开始时间, Window结束时间, 渠道, 行为类型), 累加计数值)。这样,即可有调用stream上的process方法,将该MyReduceWindowFunction实现的示例作为参数值传进去即可。
最后,通过map操作将结果格式化,输出保存到Kafka中。

运行上面我们实现的Flink程序,执行如下命令:

image

提交运行后,可以通过Flink Web Dashboard查看Job运行状态。可以在Kafka中查看最终结果数据,对应的输出数据示例如下所示:

image

通过结果可以看到,采用Sliding Window来指派Window,随着时间流逝各个Window之间存在重叠的现象,这正是我们最初想要的结果。

  • Non-Keyed Window编程

这里,我们基于Tumbling Window(WindowAssigner)来在stream上生成Non-Keyed Window。Tumbling Window也被称为固定时间窗口(Fixed Time Window),各个Window的时间长度相同,Window之间没有重叠。

我们想要达到的目标和前面类似,也希望获取到每个Window对应的起始时间和结束时间,所以需要实现一个ProcessWindowAllFunction,但因为是Non-Keyed Window,只有一个Task来负责对所有输入stream中的数据元素指派Window,这在编程实现中并没有感觉到有太大的差异。实现的Flink程序为TumblingWindowAllAnalytics,代码如下所示:

object TumblingWindowAllAnalytics {
  var MAX_LAGGED_TIME = 5000L
 
  def checkParams(params: ParameterTool) = {
    if (params.getNumberOfParameters < 5) {
      println("Missing parameters!\n"
        + "Usage: Windowing "
        + "--window-result-topic <windowed_result_topic> "
        + "--bootstrap.servers <kafka_brokers> "
        + "--zookeeper.connect <zk_quorum> "
        + "--window-all-lagged-millis <window_all_lagged_millis> "
        + "--window-all-size-millis <window_all_size_millis>")
      System.exit(-1)
    }
  }
 
  def main(args: Array[String]): Unit = {
    val params = ParameterTool.fromArgs(args)
    checkParams(params)
    MAX_LAGGED_TIME = params.getLong("window-all-lagged-millis", MAX_LAGGED_TIME)
    val windowAllSizeMillis = params.getRequired("window-all-size-millis").toLong
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    val stream: DataStream[(String, String)] = env.addSource(new SimulatedEventSource)
 
    // create a Kafka producer for Kafka 0.9.x
    val kafkaProducer = new FlinkKafkaProducer09(
      params.getRequired("window-result-topic"),
      new SimpleStringSchema, params.getProperties
    )
 
    stream
      .map(t => {
        val channel = t._1
        val eventFields = t._2.split("\t")
        val ts = eventFields(0).toLong
        val behaviorType = eventFields(3)
        (ts, channel, behaviorType)
      })
      .assignTimestampsAndWatermarks(new TimestampExtractor(MAX_LAGGED_TIME))
      .map(t => (t._2, t._3))
      .timeWindowAll(Time.milliseconds(windowAllSizeMillis))
      .process(new MyReduceWindowAllFunction())
      .map(t => {
        val key = t._1
        val count = t._2
        val windowStartTime = key._1
        val windowEndTime = key._2
        val channel = key._3
        val behaviorType = key._4
        Seq(windowStartTime, windowEndTime,
          channel, behaviorType, count).mkString("\t")
      })
      .addSink(kafkaProducer)
 
    env.execute(getClass.getSimpleName)
  }
 
  class TimestampExtractor(val maxLaggedTime: Long)
    extends AssignerWithPeriodicWatermarks[(Long, String, String)] with Serializable {
 
    var currentWatermarkTs = 0L
 
    override def getCurrentWatermark: Watermark = {
      if(currentWatermarkTs <= 0) {
        new Watermark(Long.MinValue)
      } else {
        new Watermark(currentWatermarkTs - maxLaggedTime)
      }
    }
 
    override def extractTimestamp(element: (Long, String, String),
                                  previousElementTimestamp: Long): Long = {
      val ts = element._1
      Math.max(ts, currentWatermarkTs)
    }
  }
}

上面代码中,我们在输入stream开始处理时,调用DataStream的assignTimestampsAndWatermarks方法为stream中的每个数据元素指派时间戳,周期性地生成WaterMark来控制stream的处理进度(Progress),用来提取时间戳和生成WaterMark的实现参考实现类TimestampExtractor。有关WaterMark相关的内容,可以参考后面的参考链接中给出的介绍。

另外,我们实现了Flink的ProcessWindowAllFunction抽象类,对应实现类为MyReduceWindowAllFunction,用来处理每个Window中的数据,获取对应的Window的起始时间和结束时间,实现代码如下所示:

class MyReduceWindowAllFunction
  extends ProcessAllWindowFunction[(String, String), ((String, String, String, String), Long), TimeWindow] {
 
  override def process(context: Context,
                       elements: Iterable[(String, String)],
                       collector: Collector[((String, String, String, String), Long)]): Unit = {
    val startTs = context.window.getStart
    val endTs = context.window.getEnd
    val elems = elements.map(t => {
      ((t._1, t._2), 1L)
    })
    for(group <- elems.groupBy(_._1)) {
      val myKey = group._1
      val myValue = group._2
      var count = 0L
      for(elem <- myValue) {
        count += elem._2
      }
      val channel = myKey._1
      val behaviorType = myKey._2
      val outputKey = (formatTs(startTs), formatTs(endTs), channel, behaviorType)
      collector.collect((outputKey, count))
    }
  }
 
  private def formatTs(ts: Long) = {
    val df = new SimpleDateFormat("yyyyMMddHHmmss")
    df.format(new Date(ts))
  }
}

与Keyed Window实现中的ProcessWindowFunction相比,这里没有了对应的泛型参数KEY,因为这种情况下只有一个Task处理stream输入的所有数据元素,ProcessAllWindowFunction的实现类对所有未进行groupBy(也无法进行,因为数据元素的Key未知)操作得到的Window中的数据元素进行处理,处理逻辑和前面基本相同。

提交Flink程序TumblingWindowAllAnalytics,执行如下命令行:

参考链接

《大数据成神之路》
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
417 33
The Past, Present and Future of Apache Flink
|
5月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1071 13
Apache Flink 2.0-preview released
|
5月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
184 3
|
2月前
|
缓存 监控 数据处理
Flink 四大基石之窗口(Window)使用详解
在流处理场景中,窗口(Window)用于将无限数据流切分成有限大小的“块”,以便进行计算。Flink 提供了多种窗口类型,如时间窗口(滚动、滑动、会话)和计数窗口,通过窗口大小、滑动步长和偏移量等属性控制数据切分。窗口函数包括增量聚合函数、全窗口函数和ProcessWindowFunction,支持灵活的数据处理。应用案例展示了如何使用窗口进行实时流量统计和电商销售分析。
297 28
|
3天前
|
存储 大数据 数据处理
您有一份 Apache Flink 社区年度报告请查收~
您有一份 Apache Flink 社区年度报告请查收~
|
3月前
|
存储 SQL 人工智能
Apache Flink 2.0:Streaming into the Future
本文整理自阿里云智能高级技术专家宋辛童、资深技术专家梅源和高级技术专家李麟在 Flink Forward Asia 2024 主会场的分享。三位专家详细介绍了 Flink 2.0 的四大技术方向:Streaming、Stream-Batch Unification、Streaming Lakehouse 和 AI。主要内容包括 Flink 2.0 的存算分离云原生化、流批一体的 Materialized Table、Flink 与 Paimon 的深度集成,以及 Flink 在 AI 领域的应用。
666 13
Apache Flink 2.0:Streaming into the Future
|
5月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
434 0
|
3天前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
Flink CDC 在阿里云实时计算Flink版的云上实践
|
6月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
4月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1845 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎

推荐镜像

更多