Flink 中极其重要的 Time 与 Window 详细解析(深度好文,建议收藏) (三)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: Flink 中极其重要的 Time 与 Window 详细解析

Window Fold


WindowedStream → DataStream:给窗口赋一个fold功能的函数,并返回一个fold后的结果。


import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWindowFold {
  def main(args: Array[String]): Unit = {
    // 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 创建SocketSource
    val stream = env.socketTextStream("node01", 9999,'\n',3)
    // 对stream进行处理并按key聚合
    val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
    // 引入滚动窗口
    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
    // 执行fold操作
    val streamFold = streamWindow.fold(100){
      (begin, item) =>
        begin + item._2
    }
    // 将聚合数据写入文件
    streamFold.print()
    // 执行程序
    env.execute("TumblingWindow")
  }
}


Aggregation on Window


WindowedStream → DataStream:对一个window内的所有元素做聚合操作。min和 minBy的区别是min返回的是最小值,而minBy返回的是包含最小值字段的元素(同样的原理适用于 max 和 maxBy)。


import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
object StreamWindowAggregation {
  def main(args: Array[String]): Unit = {
    // 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 创建SocketSource
    val stream = env.socketTextStream("node01", 9999)
    // 对stream进行处理并按key聚合
    val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)
    // 引入滚动窗口
    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
    // 执行聚合操作
    val streamMax = streamWindow.max(1)
    // 将聚合数据写入文件
    streamMax.print()
    // 执行程序
    env.execute("TumblingWindow")
  }
}


EventTime与Window


EventTime的引入


  1. 与现实世界中的时间是不一致的,在flink中被划分为事件时间,提取时间,处理时间三种。


  1. 如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带EventTime


  1. 如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,以source的systemTime为准。


  1. 如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow,以operator的systemTime为准。


在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。


如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:


val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


Watermark


引入


我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的,所以 Flink 最初设计的时候,就考虑到了网络延迟,网络乱序等问题,所以提出了一个抽象概念:水印(WaterMark);


image.png


如上图所示,就出现一个问题,一旦出现乱序,如果只根据 EventTime 决定 Window 的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发 Window 去进行计算了,这个特别的机制,就是 Watermark。


Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 Window 来实现。


数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,Window 的执行也是由 Watermark 触发的。


Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 EventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。


有序流的Watermarker如下图所示:(Watermark设置为0)

image.png


乱序流的Watermarker如下图所示:(Watermark设置为2)


image.png


当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。


上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s5s,窗口2是6s10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。


Flink对于迟到数据的处理


waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,于延迟的数据Flink也有自己的解决办法,主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据。


设置允许延迟的时间是通过 allowedLateness(lateness: Time) 设置


保存延迟数据则是通过 sideOutputLateData(outputTag: OutputTag[T]) 保存


获取延迟数据是通过 DataStream.getSideOutput(tag: OutputTag[X]) 获取


具体的用法如下:


allowedLateness(lateness: Time)


def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
  javaStream.allowedLateness(lateness)
  this
}


该方法传入一个Time值,设置允许数据迟到的时间,这个时间和 WaterMark 中的时间概念不同。再来回顾一下:


WaterMark=数据的事件时间-允许乱序时间值


随着新数据的到来,waterMark的值会更新为最新数据事件时间-允许乱序时间值,但是如果这时候来了一条历史数据,waterMark值则不会更新。总的来说,waterMark是为了能接收到尽可能多的乱序数据。


那这里的Time值,主要是为了等待迟到的数据,在一定时间范围内,如果属于该窗口的数据到来,仍会进行计算,后面会对计算方式仔细说明


注意:该方法只针对于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值则会抛出异常。


sideOutputLateData(outputTag: OutputTag[T])


def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
  javaStream.sideOutputLateData(outputTag)
  this
}


该方法是将迟来的数据保存至给定的outputTag参数,而OutputTag则是用来标记延迟数据的一个对象。


DataStream.getSideOutput(tag: OutputTag[X])


通过window等操作返回的DataStream调用该方法,传入标记延迟数据的对象来获取延迟的数据。


对延迟数据的理解


延迟数据是指:


在当前窗口【假设窗口范围为10-15】已经计算之后,又来了一个属于该窗口的数据【假设事件时间为13】,这时候仍会触发 Window 操作,这种数据就称为延迟数据。


那么问题来了,延迟时间怎么计算呢?


假设窗口范围为10-15,延迟时间为2s,则只要 WaterMark<15+2,并且属于该窗口,就能触发 Window 操作。而如果来了一条数据使得 WaterMark>=15+2,10-15这个窗口就不能再触发 Window 操作,即使新来的数据的 Event Time 属于这个窗口时间内 。


Flink 关联 Hive 分区表


Flink 1.12 支持了 Hive 最新的分区作为时态表的功能,可以通过 SQL 的方式直接关联 Hive 分区表的最新分区,并且会自动监听最新的 Hive 分区,当监控到新的分区后,会自动地做维表数据的全量替换。通过这种方式,用户无需编写 DataStream 程序即可完成


Kafka 流实时关联最新的 Hive 分区实现数据打宽。


具体用法:


在 Sql Client 中注册 HiveCatalog:


vim conf/sql-client-defaults.yaml 
catalogs: 
  - name: hive_catalog 
    type: hive 
    hive-conf-dir: /disk0/soft/hive-conf/ #该目录需要包hive-site.xml文件


创建 Kafka 表


CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (  
    master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>,  
proctime as PROCTIME()  -- PROCTIME用来和Hive时态表关联  
) WITH (  
 'connector' = 'kafka',  
 'topic' = 'topic_name',  
 'format' = 'json',  
 'properties.bootstrap.servers' = 'host:9092',  
 'properties.group.id' = 'flinkTestGroup',  
 'scan.startup.mode' = 'timestamp',  
 'scan.startup.timestamp-millis' = '1607844694000'  
);


Flink 事实表与 Hive 最新分区数据关联


dim_extend_shop_info 是 Hive 中已存在的表,所以我们用 table hint 动态地开启维表参数。


CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  
SELECT * FROM  
 (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,   
     ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn  
    from hive_catalog.flink_db.kfk_fact_bill_master_12 t1  
       JOIN hive_catalog.flink_db.dim_extend_shop_info   
  /*+ OPTIONS('streaming-source.enable'='true',  
     'streaming-source.partition.include' = 'latest',  
     'streaming-source.monitor-interval' = '1 h',
     'streaming-source.partition-order' = 'partition-name') */
    FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表  
    ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id  
    where groupID in (202042)) t  where t.rn = 1


参数解释:


  • streaming-source.enable 开启流式读取 Hive 数据。


  • streaming-source.partition.include有以下两个值:


  1. latest 属性: 只读取最新分区数据。


 b.all: 读取全量分区数据 ,默认值为 all,表示读所有分区,latest 只能用在 temporal join 中,用于读取最新分区作为维表,不能直接读取最新分区数据。


  • streaming-source.monitor-interval 监听新分区生成的时间、不宜过短 、最短是1 个小时,因为目前的实现是每个 task 都会查询 metastore,高频的查可能会对metastore 产生过大的压力。需要注意的是,1.12.1 放开了这个限制,但仍建议按照实际业务不要配个太短的 interval。


  • streaming-source.partition-order分区策略,主要有以下 3 种,其中最为推荐的是partition-name:


  1. partition-name 使用默认分区名称顺序加载最新分区


 b.create-time 使用分区文件创建时间顺序


 c.partition-time 使用分区时间顺序

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
73 3
|
2月前
|
SQL 消息中间件 分布式计算
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
大数据-120 - Flink Window 窗口机制-滑动时间窗口、会话窗口-基于时间驱动&基于事件驱动
96 0
|
2月前
|
SQL 分布式计算 大数据
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
大数据-119 - Flink Window总览 窗口机制-滚动时间窗口-基于时间驱动&基于事件驱动
72 0
|
2月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
35 0
|
2月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
37 0
|
4月前
|
数据处理 调度 双11
Flink四大基石——1.window
Flink四大基石——1.window
52 0
|
13天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
43 2
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
70 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
57 0
|
2月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
62 0

推荐镜像

更多