(3.3)基于事件时间的滑动窗口测试watermark机制
代码开发:
package com.aikfk.flink.datastream.watermark; import com.aikfk.flink.datastream.bean.WaterSensor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.time.Duration; /** * @author :caizhengjie * @description:基于事件事件滚动窗口测试watermark机制 * @date :2021/3/20 9:21 下午 */ public class EventTimeSliding { public static void main(String[] args) throws Exception { // 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.读取端口数据并转换为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String s) throws Exception { String[] split = s.split(","); return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2])); } }); // 3.提取数据中的时间戳字段,生成watermark SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS .assignTimestampsAndWatermarks(WatermarkStrategy // 设置最大允许的延迟时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 指定事时间件列 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000L; } })); // 4.按照id分组 KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId); // 5.开窗 WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(6), Time.seconds(2))); // 6.计算总和 SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception { return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc()); } }); // 7.打印 result.print(); // 8.执行任务 env.execute(); } }
测试数据:
ws_001,1577844001,1 ws_001,1577844008,1 ws_001,1577844012,1
运行结果:
WaterSensor{id='ws_001', ts=1577844001, vc=1} WaterSensor{id='ws_001', ts=1577844001, vc=1} WaterSensor{id='ws_001', ts=1577844001, vc=1} WaterSensor{id='ws_001', ts=1577844008, vc=1}
运行过程解释:
程序中设置的滑动窗口大小为6秒,步长为2秒,当输入的数据事件时间为1秒时,所属的窗口为[-4,2),[-2,4),[0,6)这三个窗口中,当输入的数据事件时间为8秒时,wm为6秒 >= [0,6)这个窗口的最大边界值,关闭窗口,触发前面三个窗口计算,所以直接输出三个结果。而8秒属于[4,10),[6,12),[8,14)这三个窗口,如果想输出一个结果,则输出数据事件时间为12秒,wm为10秒 >= [4,10)这个窗口的最大边界值,触发窗口计算,得到一个结果。
(3.4)基于事件时间的会话窗口测试watermark机制
时间间隔:指的是WaterMark跟数据本身的时间差值,包含间隔时间
代码开发:
package com.aikfk.flink.datastream.watermark; import com.aikfk.flink.datastream.bean.WaterSensor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.time.Duration; /** * @author :caizhengjie * @description:基于事件事件滚动窗口测试watermark机制 * @date :2021/3/20 9:21 下午 */ public class EventTimeSession { public static void main(String[] args) throws Exception { // 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.读取端口数据并转换为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String s) throws Exception { String[] split = s.split(","); return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2])); } }); // 3.提取数据中的时间戳字段,生成watermark SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS .assignTimestampsAndWatermarks(WatermarkStrategy // 设置最大允许的延迟时间 .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 指定事时间件列 .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000L; } })); // 4.按照id分组 KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId); //5.开窗,时间间隔:指的是WaterMark跟数据本身的时间差值,包含间隔时间 WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); // 6.计算总和 SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception { return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc()); } }); // 7.打印 result.print(); // 8.执行任务 env.execute(); } }
测试数据:
ws_001,1577844002,1 ws_001,1577844007,1 ws_001,1577844014,1
运行结果:
WaterSensor{id='ws_001', ts=1577844002, vc=2}
运行过程解释:
程序中设置的会话窗口大小为5秒,第一次输入的数据事件时间是2秒,第二次输入的数据事件时间是7秒,不会触发窗口,因为只有输入数据的watermark >= 上一次的数据事件时间 + 时间间隔(5秒)。当输入的数据时间为14秒,wm为12秒 >= 7 + 5,所以触发窗口计算,得到两个结果。
(4)自定义 WatermarkStrategy
有 2 种风格的 WaterMark 生产方式: periodic(周期性) and punctuated(间歇性).
都需要继承接口: WatermarkGenerator
(4.1)周期性
package com.aikfk.flink.datastream.watermark; import com.aikfk.flink.datastream.bean.WaterSensor; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** * @author :caizhengjie * @description:基于事件事件滚动窗口测试watermark机制 * @date :2021/3/20 9:21 下午 */ public class EventTimeTumblingCustomerPeriod { public static void main(String[] args) throws Exception { // 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.读取端口数据并转换为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String s) throws Exception { String[] split = s.split(","); return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2])); } }); // 3.提取数据中的时间戳字段,生成watermark SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS .assignTimestampsAndWatermarks(new WatermarkStrategy<WaterSensor>() { @Override public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new MyPeriod(2000L); } }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000L; } })); // 4.按照id分组 KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId); // 5.开窗 WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))); // 6.计算总和 SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception { return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc()); } }); // 7.打印 result.print(); // 8.执行任务 env.execute(); } /** * 自定义周期性的Watermark生成器 */ public static class MyPeriod implements WatermarkGenerator<WaterSensor> { private Long maxTs; // 允许的最大延迟时间 ms private Long maxDelay; public MyPeriod(Long maxDelay) { this.maxDelay = maxDelay; this.maxTs = Long.MIN_VALUE + maxDelay + 1; } // 每收到一个元素, 执行一次. 用来生产WaterMark中的时间戳 @Override public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) { //有了新的元素找到最大的时间戳 System.out.println("取数据中最大的时间戳"); maxTs = Math.max(eventTimestamp, maxTs); } // 周期性的把WaterMark发射出去, 默认周期是200ms @Override public void onPeriodicEmit(WatermarkOutput output) { // 周期性的发射水印: 相当于Flink把自己的时钟调慢了一个最大延迟 System.out.println("生成WaterMark" + (maxTs - maxDelay)); output.emitWatermark(new Watermark(maxTs - maxDelay)); } } }
(4.2)间歇性
package com.aikfk.flink.datastream.watermark; import com.aikfk.flink.datastream.bean.WaterSensor; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; /** * @author :caizhengjie * @description:基于事件事件滚动窗口测试watermark机制 * @date :2021/3/20 9:21 下午 */ public class EventTimeTumblingCustomerPunt { public static void main(String[] args) throws Exception { // 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.读取端口数据并转换为JavaBean SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("bigdata-pro-m07", 9999) .map(new MapFunction<String, WaterSensor>() { @Override public WaterSensor map(String s) throws Exception { String[] split = s.split(","); return new WaterSensor(split[0],Long.parseLong(split[1]),Integer.parseInt(split[2])); } }); // 3.提取数据中的时间戳字段 SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = waterSensorDS .assignTimestampsAndWatermarks(new WatermarkStrategy<WaterSensor>() { @Override public WatermarkGenerator<WaterSensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new MyPunt(2000L); } }.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { @Override public long extractTimestamp(WaterSensor element, long recordTimestamp) { return element.getTs() * 1000L; } })); // 4.按照id分组 KeyedStream<WaterSensor, String> keyedStream = waterSensorSingleOutputStreamOperator.keyBy(WaterSensor::getId); // 5.开窗 WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))); // 6.计算总和 SingleOutputStreamOperator<WaterSensor> result = window.reduce(new ReduceFunction<WaterSensor>() { @Override public WaterSensor reduce(WaterSensor t1, WaterSensor t2) throws Exception { return new WaterSensor(t1.getId(),t1.getTs(),t1.getVc() + t2.getVc()); } }); // 7.打印 result.print(); // 8.执行任务 env.execute(); } /** * 自定义间歇性watermark * */ public static class MyPunt implements WatermarkGenerator<WaterSensor> { private Long maxTs; private Long maxDelay; public MyPunt(Long maxDelay) { this.maxDelay = maxDelay; this.maxTs = Long.MIN_VALUE + maxDelay + 1; } //当数据来的时候调用 @Override public void onEvent(WaterSensor event, long eventTimestamp, WatermarkOutput output) { System.out.println("取数据中最大的时间戳"); maxTs = Math.max(eventTimestamp, maxTs); output.emitWatermark(new Watermark(maxTs - maxDelay)); } //周期性调用 @Override public void onPeriodicEmit(WatermarkOutput output) { } } }
测试数据:
ws_001,1577844001,1 ws_001,1577844002,1 ws_001,1577844012,1
运行结果:
取数据中最大的时间戳 取数据中最大的时间戳 取数据中最大的时间戳 WaterSensor{id='ws_001', ts=1577844001, vc=2}
(5)多并行度下 WaterMark 的传递
WaterMark传递:
- 使用广播的方式传输的
- 某个并行度中Watermark值取决于前面所有并行度的最小WaterMark值
- 当WaterMark值没有增长的时候,不会向下游传递,注意:生成不变
总结: 多并行度的条件下, 向下游传递 WaterMark 的时候, 总是以最小的那个 WaterMark 为准! 木桶原理!