Flink 中极其重要的 Time 与 Window 详细解析(深度好文,建议收藏) (二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: Flink 中极其重要的 Time 与 Window 详细解析
  • sliding-time-window (有重叠数据)


//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定义数据流来源
val text = env.socketTextStream("localhost", 9999)
//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
  line => {
    val tokens = line.split(",")
    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
  }
}
//4.执行统计操作,每个sensorId一个sliding窗口,窗口时间10秒,滑动时间5秒
//也就是说,每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。
val ds2: DataStream[CarWc] = ds1
  .keyBy("sensorId")
  .timeWindow(Time.seconds(10), Time.seconds(5))
  .sum("carCnt")
//5.显示统计结果
ds2.print()
//6.触发流计算
env.execute(this.getClass.getName)


CountWindow


CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。


注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。


  • tumbling-count-window (无重叠数据)


//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定义数据流来源
val text = env.socketTextStream("localhost", 9999)
//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
  (f) => {
    val tokens = f.split(",")
    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
  }
}
//4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5
//按照key进行收集,对应的key出现的次数达到5次作为一个结果
val ds2: DataStream[CarWc] = ds1
  .keyBy("sensorId")
  .countWindow(5)
  .sum("carCnt")
//5.显示统计结果
ds2.print()
//6.触发流计算
env.execute(this.getClass.getName)


  • sliding-count-window (有重叠数据)


同样也是窗口长度和滑动窗口的操作:窗口长度是5,滑动长度是3


//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定义数据流来源
val text = env.socketTextStream("localhost", 9999)
//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
  (f) => {
    val tokens = f.split(",")
    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
  }
}
//4.执行统计操作,每个sensorId一个sliding窗口,窗口大小3条数据,窗口滑动为3条数据
//也就是说,每个路口分别统计,收到关于它的3条消息时统计在最近5条消息中,各自路口通过的汽车数量
val ds2: DataStream[CarWc] = ds1
  .keyBy("sensorId")
  .countWindow(5, 3)
  .sum("carCnt")
//5.显示统计结果
ds2.print()
//6.触发流计算
env.execute(this.getClass.getName)


  • Window 总结


  1. flink支持两种划分窗口的方式(time和count)


  • 如果根据时间划分窗口,那么它就是一个time-window


  • 如果根据数据划分窗口,那么它就是一个count-window


  1. flink支持窗口的两个重要属性(size和interval)


  • 如果size=interval,那么就会形成tumbling-window(无重叠数据)


  • 如果size>interval,那么就会形成sliding-window(有重叠数据)


  • 如果size<interval,那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。


  1. 通过组合可以得出四种基本窗口


  • time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))


  • time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))


  • count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)


  • count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)


Window Reduce


WindowedStream → DataStream:给window赋一个reduce功能的函数,并返回一个聚合的结果。


import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWindowReduce {
  def main(args: Array[String]): Unit = {
    // 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 创建SocketSource
    val stream = env.socketTextStream("node01", 9999)
    // 对stream进行处理并按key聚合
    val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
    // 引入时间窗口
    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
    // 执行聚合操作
    val streamReduce = streamWindow.reduce(
      (item1, item2) => (item1._1, item1._2 + item2._2)
    )
    // 将聚合数据写入文件
    streamReduce.print()
    // 执行程序
    env.execute("TumblingWindow")
  }
}


Window Apply


apply方法可以进行一些自定义处理,通过匿名内部类的方法来实现。当有一些复杂计算时使用。


用法


  1. 实现一个 WindowFunction 类


  1. 指定该类的泛型为 [输入数据类型, 输出数据类型, keyBy中使用分组字段的类型, 窗口类型]


示例:使用apply方法来实现单词统计


步骤:


  1. 获取流处理运行环境


  1. 构建socket流数据源,并指定IP地址和端口号


  1. 对接收到的数据转换成单词元组


  1. 使用 keyBy 进行分流(分组)


  1. 使用 timeWinodw 指定窗口的长度(每3秒计算一次)


  1. 实现一个WindowFunction匿名内部类


  • apply方法中实现聚合计算


  • 使用Collector.collect收集数据


核心代码如下:


//1. 获取流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2. 构建socket流数据源,并指定IP地址和端口号
    val textDataStream = env.socketTextStream("node01", 9999).flatMap(_.split(" "))
    //3. 对接收到的数据转换成单词元组
    val wordDataStream = textDataStream.map(_->1)
    //4. 使用 keyBy 进行分流(分组)
    val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1)
    //5. 使用 timeWinodw 指定窗口的长度(每3秒计算一次)
    val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))
    //6. 实现一个WindowFunction匿名内部类
    val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
      //在apply方法中实现数据的聚合
      override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
        println("hello world")
        val tuple = input.reduce((t1, t2) => {
          (t1._1, t1._2 + t2._2)
        })
        //将要返回的数据收集起来,发送回去
        out.collect(tuple)
      }
    })
    reduceDatStream.print()
    env.execute()
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
105 3
|
3月前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
89 0
|
3月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
111 0
|
3月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
123 0
|
2天前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
27 14
|
3月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
46 0
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
86 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
87 0
|
9天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
9天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析

热门文章

最新文章

推荐镜像

更多