(1)时间分类
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示
事件时间EventTime: 事件真真正正发生产生的时间
摄入时间IngestionTime: 事件到达Flink的时间
处理时间ProcessingTime: 事件真正被处理/计算的时间
上面的三个时间,我们更关注事件时间EventTime
(2)Watermark详解
(2.1)Watermark图解
(2.2)什么是Watermark?
Watermark就是给数据再额外的加的一个时间列也就是Watermark是个时间戳!
(2.3)如何计算Watermark?
Watermark =当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
这样可以保证Watermark水位线会一直上升(变大),不会下降
(2.4)Watermark有什么用?
之前的窗口都是按照系统时间来触发计算的,如:[10:00:00~10:00:10) 的窗口,一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失!那么现在有了Watermark,窗口就可以按照Watermark来触发计算!
也就是说Watermark是用来触发窗口计算的!
(2.5)Watermark如何出发窗口计算?
窗口计算的触发条件为:
窗口中有数据
Watermaker >= 窗口的结束时间
注意:
上面的触发公式进行如下变形:
Watermaker >= 窗口的结束时间 Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 >= 窗口的结束时间 当前窗口的最大的事件时间 >= 窗口的结束时间 + 最大允许的延迟时间或乱序时间
Watermark API:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/event_timestamps_watermarks.html
(3)EventTime 和 WaterMark 的使用
Flink 内置了两个 WaterMark 生成器:
1.Monotonously Increasing Timestamps(时间戳单调增长:其实就是允许的延迟为 0)
WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()
- Fixed Amount of Lateness(允许固定时间的延迟)
WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
(3.1)基于事件时间的滚动窗口测试watermark机制
代码开发:
package com.aikfk.flink.datastream.bean; /** * @author :caizhengjie * @description:TODO * @date :2021/3/20 9:19 下午 * 水位传感器:用于接收水位数据 * <p> * id:传感器编号 * ts:时间戳 * vc:水位 */ public class WaterSensor { private String id; private Long ts; private Integer vc; public WaterSensor(String id, Long ts, Integer vc) { this.id = id; this.ts = ts; this.vc = vc; } public String getId() { return id; } public void setId(String id) { this.id = id; } public Long getTs() { return ts; } public void setTs(Long ts) { this.ts = ts; } public Integer getVc() { return vc; } public void setVc(Integer vc) { this.vc = vc; } @Override public String toString() { return "WaterSensor{" + "id='" + id + '\'' + ", ts=" + ts + ", vc=" + vc + '}'; } }
package com.aikfk.flink.datastream.watermark; import com.aikfk.flink.datastream.bean.WaterSensor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.time.Duration; /** * @author :caizhengjie * @description:基于事件事件滚动窗口测试watermark机制 * @date :2021/3/20 9:21 下午 */ public class EventTimeTumbling { public static void main(String[] args) throws Exception { // 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.读取端口数据并转换为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String s) throws Exception { String[] split = s.split(","); return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2])); } }); // 3.提取数据中的时间戳字段 SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS .assignTimestampsAndWatermarks(WatermarkStrategy // 设置最大允许的延迟时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 指定事时间件列 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000L; } })); // 4.按照id分组 KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId); // 5.开窗 WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))); // 6.计算总和 SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception { return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc()); } }); // 7.打印 result.print(); // 8.执行任务 env.execute(); } }
测试非乱序数据:
ws_001,1577844001,1 ws_001,1577844002,1 ws_001,1577844003,1 ws_001,1577844005,1 ws_001,1577844006,1 ws_001,1577844009,1
行结果:
WaterSensor{id='ws_001', ts=1577844001, vc=3}
运行过程解释:
因为滚动窗口是基于事件时间0到5秒,左闭右开[0,5)。输入的数据事件时间1到3秒时,会落入窗口为[0,5),当输入的数据事件时间为t(比如是9秒),假设设置最大允许的延迟时间为2秒,即watermark为7秒,而wm >= 窗口最大边界值5秒,所以触发[0,5)的窗口,得到的结果为vc = 3
测试乱序数据:
ws_001,1577844001,1 ws_001,1577844002,1 ws_001,1577844003,1 ws_001,1577844005,1 ws_001,1577844001,1 ws_001,1577844002,1 ws_001,1577844009,1
运行结果:
WaterSensor{id='ws_001', ts=1577844001, vc=5}
运行过程解释:
因为滚动窗口是基于事件时间0到5秒,左闭右开[0,5)。输入的数据事件时间1到3秒时,会落入窗口为[0,5),后面来了第5秒的数据,落入的窗口为[5,10),再后面又来了1,2秒的数据,为迟到的数据,因为在来第5秒数据的时候,wm为3秒,它是小于窗口的边界值,所以[0,5)窗口没有关闭,因此来的1,2秒数据会落入到[0,5)窗口中。当输入的数据事件时间为t(比如是9秒),假设设置最大允许的延迟时间为2秒,即watermark为7秒,而wm >= 窗口最大边界值5秒,所以触发[0,5)的窗口,得到的结果为vc = 5.
(3.2)基于事件时间的滚动窗口测试允许迟到数据(allowedLateness)机制与侧输出流(sideOutput)
已经添加了 wartemark 之后, 仍有数据会迟到怎么办? Flink 的窗口, 也允许迟到数据.
当触发了窗口计算后, 会先计算当前的结果, 但是此时并不会关闭窗口.以后每来一条 迟到数据, 则触发一次这条数据所在窗口计算(增量计算).
那么什么时候会真正的关闭窗口呢? wartermark 超过了 窗口结束时间+等待时间
.window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3))
注意:允许迟到只能运用在 event time 上
允许迟到数据, 窗口也会真正的关闭, 如果还有迟到的数据怎么办? Flink 提供了一种叫做侧输出流的来处理关窗之后到达的数据.
代码开发:
package com.aikfk.flink.datastream.watermark; import com.aikfk.flink.datastream.bean.WaterSensor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.OutputTag; import java.time.Duration; /** * @author :caizhengjie * @description:基于事件事件滚动窗口测试watermark机制 * @date :2021/3/20 9:21 下午 */ public class LateAndSideOutPut { public static void main(String[] args) throws Exception { // 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.读取端口数据并转换为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String s) throws Exception { String[] split = s.split(","); return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2])); } }); // 3.提取数据中的时间戳字段 SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS .assignTimestampsAndWatermarks(WatermarkStrategy // 设置最大允许的延迟时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 指定事时间件列 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000L; } })); // 4.按照id分组 KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId); // 5.开窗,允许迟到数据,侧输出流 WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(2)) .sideOutputLateData(new OutputTag<WaterSensor>("Side") { }); // 6.计算总和 SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception { return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc()); } }); DataStream<WaterSensor> sideOutput = result.getSideOutput(new OutputTag<WaterSensor>("Side") { }); // 7.打印 result.print(); sideOutput.print("Side"); // 8.执行任务 env.execute(); } }
测试数据:
ws_001,1577844001,1 ws_001,1577844002,1 ws_001,1577844003,1 ws_001,1577844008,1 ws_001,1577844001,1 ws_001,1577844002,1 ws_001,1577844003,1 ws_001,1577844009,1 ws_001,1577844001,1 ws_001,1577844002,1
运行结果:
WaterSensor{id='ws_001', ts=1577844001, vc=3} WaterSensor{id='ws_001', ts=1577844001, vc=4} WaterSensor{id='ws_001', ts=1577844001, vc=5} WaterSensor{id='ws_001', ts=1577844001, vc=6} Side> WaterSensor{id='ws_001', ts=1577844001, vc=1} Side> WaterSensor{id='ws_001', ts=1577844002, vc=1}
运行过程解释:
因为滚动窗口是基于事件时间0到5秒,左闭右开[0,5)。输入的数据事件时间1到3秒时,会落入窗口为[0,5),后面来了第8秒的数据,假设设置最大允许的延迟时间为2秒 ,此时的wm = 6秒大于窗口的最大边界值,触发窗口计算,所以输入第8秒的数据会得到vc=3,但是由于添加了允许迟到数据(allowedLateness)机制,设置允许迟到时间是2秒,因此窗口并没有关闭,而是持续到了wm = 7秒,后面来了1,2,3秒的迟到数据,还会落入到[0,5)窗口中,但是是来一条迟到数据则触发一次这条数据所在窗口计算(增量计算)。当输入的数据事件时间为t(比如是9秒),即watermark为7秒,而wm >= 窗口结束时间+等待时间,窗口关闭,后面再来的1,2秒迟到数据就不会落入到[0,5)窗口中,即通过侧输出流来处理关窗之后到达的数据。