Window Fold
WindowedStream → DataStream:给窗口赋一个fold功能的函数,并返回一个fold后的结果。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object StreamWindowFold { def main(args: Array[String]): Unit = { // 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 创建SocketSource val stream = env.socketTextStream("node01", 9999,'\n',3) // 对stream进行处理并按key聚合 val streamKeyBy = stream.map(item => (item, 1)).keyBy(0) // 引入滚动窗口 val streamWindow = streamKeyBy.timeWindow(Time.seconds(5)) // 执行fold操作 val streamFold = streamWindow.fold(100){ (begin, item) => begin + item._2 } // 将聚合数据写入文件 streamFold.print() // 执行程序 env.execute("TumblingWindow") } }
Aggregation on Window
WindowedStream → DataStream:对一个window内的所有元素做聚合操作。min和 minBy的区别是min返回的是最小值,而minBy返回的是包含最小值字段的元素(同样的原理适用于 max 和 maxBy)。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._ object StreamWindowAggregation { def main(args: Array[String]): Unit = { // 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 创建SocketSource val stream = env.socketTextStream("node01", 9999) // 对stream进行处理并按key聚合 val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0) // 引入滚动窗口 val streamWindow = streamKeyBy.timeWindow(Time.seconds(5)) // 执行聚合操作 val streamMax = streamWindow.max(1) // 将聚合数据写入文件 streamMax.print() // 执行程序 env.execute("TumblingWindow") } }
EventTime与Window
EventTime的引入
- 与现实世界中的时间是不一致的,在flink中被划分为事件时间,提取时间,处理时间三种。
- 如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带EventTime
- 如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,以source的systemTime为准。
- 如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow,以operator的systemTime为准。
在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment // 从调用时刻开始给env创建的每一个stream追加时间特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Watermark
引入
我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的,所以 Flink 最初设计的时候,就考虑到了网络延迟,网络乱序等问题,所以提出了一个抽象概念:水印(WaterMark);
如上图所示,就出现一个问题,一旦出现乱序,如果只根据 EventTime 决定 Window 的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发 Window 去进行计算了,这个特别的机制,就是 Watermark。
Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 Window 来实现。
数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,Window 的执行也是由 Watermark 触发的。
Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 EventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。
有序流的Watermarker如下图所示:(Watermark设置为0)
乱序流的Watermarker如下图所示:(Watermark设置为2)
当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s5s,窗口2是6s10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。
Flink对于迟到数据的处理
waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,于延迟的数据Flink也有自己的解决办法,主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据。
设置允许延迟的时间是通过 allowedLateness(lateness: Time) 设置
保存延迟数据则是通过 sideOutputLateData(outputTag: OutputTag[T]) 保存
获取延迟数据是通过 DataStream.getSideOutput(tag: OutputTag[X]) 获取
具体的用法如下:
allowedLateness(lateness: Time)
def allowedLateness(lateness: Time): WindowedStream[T, K, W] = { javaStream.allowedLateness(lateness) this }
该方法传入一个Time值,设置允许数据迟到的时间,这个时间和 WaterMark 中的时间概念不同。再来回顾一下:
WaterMark=数据的事件时间-允许乱序时间值
随着新数据的到来,waterMark的值会更新为最新数据事件时间-允许乱序时间值,但是如果这时候来了一条历史数据,waterMark值则不会更新。总的来说,waterMark是为了能接收到尽可能多的乱序数据。
那这里的Time值,主要是为了等待迟到的数据,在一定时间范围内,如果属于该窗口的数据到来,仍会进行计算,后面会对计算方式仔细说明
注意:该方法只针对于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值则会抛出异常。
sideOutputLateData(outputTag: OutputTag[T])
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = { javaStream.sideOutputLateData(outputTag) this }
该方法是将迟来的数据保存至给定的outputTag参数,而OutputTag则是用来标记延迟数据的一个对象。
DataStream.getSideOutput(tag: OutputTag[X])
通过window等操作返回的DataStream调用该方法,传入标记延迟数据的对象来获取延迟的数据。
对延迟数据的理解
延迟数据是指:
在当前窗口【假设窗口范围为10-15】已经计算之后,又来了一个属于该窗口的数据【假设事件时间为13】,这时候仍会触发 Window 操作,这种数据就称为延迟数据。
那么问题来了,延迟时间怎么计算呢?
假设窗口范围为10-15,延迟时间为2s,则只要 WaterMark<15+2,并且属于该窗口,就能触发 Window 操作。而如果来了一条数据使得 WaterMark>=15+2,10-15这个窗口就不能再触发 Window 操作,即使新来的数据的 Event Time 属于这个窗口时间内 。
Flink 关联 Hive 分区表
Flink 1.12 支持了 Hive 最新的分区作为时态表的功能,可以通过 SQL 的方式直接关联 Hive 分区表的最新分区,并且会自动监听最新的 Hive 分区,当监控到新的分区后,会自动地做维表数据的全量替换。通过这种方式,用户无需编写 DataStream 程序即可完成
Kafka 流实时关联最新的 Hive 分区实现数据打宽。
具体用法:
在 Sql Client 中注册 HiveCatalog:
vim conf/sql-client-defaults.yaml catalogs: - name: hive_catalog type: hive hive-conf-dir: /disk0/soft/hive-conf/ #该目录需要包hive-site.xml文件
创建 Kafka 表
CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 ( master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>, proctime as PROCTIME() -- PROCTIME用来和Hive时态表关联 ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_name', 'format' = 'json', 'properties.bootstrap.servers' = 'host:9092', 'properties.group.id' = 'flinkTestGroup', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '1607844694000' );
Flink 事实表与 Hive 最新分区数据关联
dim_extend_shop_info 是 Hive 中已存在的表,所以我们用 table hint 动态地开启维表参数。
CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as SELECT * FROM (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id, ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn from hive_catalog.flink_db.kfk_fact_bill_master_12 t1 JOIN hive_catalog.flink_db.dim_extend_shop_info /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '1 h', 'streaming-source.partition-order' = 'partition-name') */ FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表 ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id where groupID in (202042)) t where t.rn = 1
参数解释:
- streaming-source.enable 开启流式读取 Hive 数据。
- streaming-source.partition.include有以下两个值:
- latest 属性: 只读取最新分区数据。
b.all: 读取全量分区数据 ,默认值为 all,表示读所有分区,latest 只能用在 temporal join 中,用于读取最新分区作为维表,不能直接读取最新分区数据。
- streaming-source.monitor-interval 监听新分区生成的时间、不宜过短 、最短是1 个小时,因为目前的实现是每个 task 都会查询 metastore,高频的查可能会对metastore 产生过大的压力。需要注意的是,1.12.1 放开了这个限制,但仍建议按照实际业务不要配个太短的 interval。
- streaming-source.partition-order分区策略,主要有以下 3 种,其中最为推荐的是partition-name:
- partition-name 使用默认分区名称顺序加载最新分区
b.create-time 使用分区文件创建时间顺序
c.partition-time 使用分区时间顺序