01 引言
在前面的博客,我们已经对Flink
批流一体API
的使用有了一定的了解了,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
- 《Flink教程(10)- Flink批流一体API(其它)》
- 《Flink教程(11)- Flink高级API(Window)》
在前面的教程,我们已经学习了Flink
的四大基石里面的Window
了,如下图,本文讲解下Time
:
02 Time
在Flink
的流式处理中,会涉及到时间的不同概念,如下图所示:
可以看到Time分为如下几类:
- 事件时间(EventTime): 事件真真正正发生产生的时间
- 摄入时间(IngestionTime): 事件到达Flink的时间
- 处理时间(ProcessingTime): 事件真正被处理/计算的时间
毋庸置疑,EventTime
事件时间是最为重要的,因为只要事件时间一产生就不会变化,事件时间更能反映事件的本质!
为何事件时间这么重要?
举个例子:比如在地下车库点外卖,下单的时候是11:59分,但是由于地下车库没信号,程序一直在重试提交,当走出地下车库时,已经12:05分了,这个时候,如果要统计12之前的订单金额,那么这笔交易是否应被统计?当然应该统计,因为该数据的真真正正的产生时间为11点59分,这就是时间时间了。
事件时间反映了事件的时间,因为数据可能因为网络延迟等原因,所以需要一种机制来解决一定程度上的数据乱序或延迟到底的问题!也就是我们接下来要学习的Watermaker
水印机制/水位线机制。
03 Watermaker水印机制/水位线机制
3.1 Watermaker定义
Watermaker:就是给数据再额外的加的一个时间列,也就是Watermaker
是个时间戳!
Watermaker计算公式(这样可以保证Watermaker
水位线会一直上升(变大),不会下降):
Watermaker = 数据的事件时间(当前窗口的最大的事件时间) - 最大允许的延迟时间或乱序时间
3.2 Watermaker的作用
之前的窗口都是按照系统时间来触发计算的,如: [10:00:00 ~ 10:00:10) 的窗口,
一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失!
那么现在有了Watermaker
,窗口就可以按照Watermaker
来触发计算! 也就是说Watermaker
是用来触发窗口计算的!
3.3 Watermaker如何触发窗口计算
窗口计算的触发条件为:
- 窗口中有数据
Watermaker
>= 窗口的结束时间
因为前面说到,Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间,也就是说只要不断有数据来,就可以保证Watermaker
水位线是会一直上升/变大的,不会下降/减小的所以最终一定是会触发窗口计算的。
3.4 图解Watermaker
触发公式:
Watermaker
>= 窗口的结束时间Watermaker
= 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间- 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 >= 窗口的结束时间
- 当前窗口的最大的事件时间 >= 窗口的结束时间 + 最大允许的延迟时间或乱序时间
如上图所示的窗口时间为:[10:00:00~10:10:00]
,CBDA
数据依次到达窗口。
情形一:假如没有Watermaker机制:B数据迟到了(最少迟到了2分钟),那么B数据就丢失了。
情形二:有了Watermaker机制,并设置最大允许的延迟时间或乱序时间为5分钟,那么:
- C数据到达时,Watermaker=max(10:11:00)-5=10:06:00 < 窗口结束时间10:10:00 – 不触发条件
- B数据到达时,Watermaker=max(10:11:00,10:09:00) -5=10:06:00 < 窗口结束时间10:10:00 – 不触发条件
- D数据到达时,Watermaker=max(10:11:00,10:09:00,10:15:00) -5=10:10:00=窗口结束时间10:10:00 – 满足触发条件,这时候窗口才触发计算,B数据不会丢失
注意:Watermaker
机制可以在一定程度上解决数据乱序后延迟到达问题,但是更严重的还是无法解决如果A数据到达窗口已经计算完毕,所以A数据还是会丢失。如果要让A数据不丢失,可以将最大允许的延迟时间或乱序时间再设置大一点,或使用后续学习的Allowed Lateness
侧道输出机制。
04 案例演示
4.1 Watermaker案例演示
需求:有订单数据,格式为: (订单ID
,用户ID
,时间戳/事件时间,订单金额),要求每隔5s
,计算5
秒内,每个用户的订单总金额,并添加Watermaker
来解决一定程度上的数据延迟和数据乱序问题。
核心API:DataStream.assignTimestampsAndWatermarks(...)
定期生成 | 根据特殊记录生成 |
显示时间驱动 | 数据驱动 |
每隔一段时间调用生成方法 | 每一次分配TimeStamp 都会调用生成方法 |
实现AssignerWithPeriodicWathermarks |
实现AssignerWithPunctuatedWatermarks |
注意:一般我们都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor
实现方式1:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html
/** * @author : YangLinWei * @createTime: 2022/3/7 11:07 下午 * <p> * 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间) * 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 * 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。 */ public class WatermakerDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); //3.Transformation //-告诉Flink要基于事件时间来计算! //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime //-告诉Flnk数据中的哪一列是事件时间,因为Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间 /*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(3)) {//最大允许的延迟时间或乱序时间 @Override public long extractTimestamp(Order element) { return element.eventTime; //指定事件时间是哪一列,Flink底层会自动计算: //Watermaker = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间 } });*/ DataStream<Order> watermakerDS = orderDS .assignTimestampsAndWatermarks( WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((event, timestamp) -> event.getEventTime()) ); //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 DataStream<Order> result = watermakerDS .keyBy(Order::getUserId) //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); //4.Sink result.print(); //5.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
运行结果:
实现方式2:
/** * Check * * @author : YangLinWei * @createTime: 2022/3/7 11:15 下午 */ public class WatermakerDemo02 { public static void main(String[] args) throws Exception { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss"); //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000; System.out.println("发送的数据为: " + userId + " : " + df.format(eventTime)); ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); //3.Transformation /*DataStream<Order> watermakerDS = orderDS .assignTimestampsAndWatermarks( WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((event, timestamp) -> event.getEventTime()) );*/ //开发中直接使用上面的即可 //学习测试时可以自己实现 DataStream<Order> watermakerDS = orderDS .assignTimestampsAndWatermarks( new WatermarkStrategy<Order>() { @Override public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Order>() { private int userId = 0; private long eventTime = 0L; private final long outOfOrdernessMillis = 3000; private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; @Override public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) { userId = event.userId; eventTime = event.eventTime; maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { //Watermaker = 当前最大事件时间 - 最大允许的延迟时间或乱序时间 Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1); System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp())); output.emitWatermark(watermark); } }; } }.withTimestampAssigner((event, timestamp) -> event.getEventTime()) ); //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 /* DataStream<Order> result = watermakerDS .keyBy(Order::getUserId) //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money");*/ //开发中使用上面的代码进行业务计算即可 //学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermaker时间 DataStream<String> result = watermakerDS .keyBy(Order::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) //把apply中的函数应用在窗口中的数据上 //WindowFunction<IN, OUT, KEY, W extends Window> .apply(new WindowFunction<Order, String, Integer, TimeWindow>() { @Override public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception { //准备一个集合用来存放属于该窗口的数据的事件时间 List<String> eventTimeList = new ArrayList<>(); for (Order order : input) { Long eventTime = order.eventTime; eventTimeList.add(df.format(eventTime)); } String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s", key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList); out.collect(outStr); } }); //4.Sink result.print(); //5.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
运行结果:
4.2 Watermaker案例演示
需求:有订单数据,格式为: (订单ID
,用户ID
,时间戳/事件时间,订单金额)
要求每隔5s
,计算5
秒内,每个用户的订单总金额并添加Watermaker
来解决一定程度上的数据延迟和数据乱序问题。并使用OutputTag+allowedLateness
解决数据丢失问题。
API:
示例代码:
/** * allowedLateness * * @author : YangLinWei * @createTime: 2022/3/7 11:18 下午 */ public class WatermakerDemo03 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); //TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); //3.Transformation DataStream<Order> watermakerDS = orderDS .assignTimestampsAndWatermarks( WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((event, timestamp) -> event.getEventTime()) ); //代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class)); SingleOutputStreamOperator<Order> result = watermakerDS .keyBy(Order::getUserId) //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) .sideOutputLateData(outputTag) .sum("money"); DataStream<Order> result2 = result.getSideOutput(outputTag); //4.Sink result.print("正常的数据和迟到不严重的数据"); result2.print("迟到严重的数据"); //5.execute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; } }
运行结果:
05 文末
本文主要讲解了Time
和Watermaker
的原理与用法,谢谢大家的阅读,本文完!