Apache Flink 是一个分布式流处理框架,可以通过多种方式处理延迟数据。这里有几个选项:
🍊水位线WaterMarker:Flink 使用水位线来跟踪流中的时间进度。水位线是由源发出的周期性时间戳,用于确定一条数据的延迟时间。您可以根据水位线指定数据的最大延迟。例如,您可以指定延迟超过 10 个水位线的数据应该被删除。
🍊窗口延迟数据:Flink 允许为预计在窗口内无序到达的数据指定最大延迟(延迟)。allowedLateness可以在定义窗口时使用参数设置此最大延迟。任何在最大延迟时间之后到达的数据都将被丢弃。
🍊侧输入流:Flink 还允许您使用侧输入来处理延迟数据。辅助输入是除了主输入流之外还使用的数据集。您可以使用辅助输入来存储键的最新已知值,然后在迟到的数据到达时使用该值更新主输入流。Output Mode:Flink 还允许您指定在使用带参数的窗口时应处理多晚的数据OutputMode。您可以选择仅输出延迟数据、仅输出更新数据或同时输出延迟数据和更新数据。
1、设置水位线延迟时间
水位线是事件时间的进展,它是我们整个应用的全局逻辑时钟。水位线生成之后,会随着数据在任务间流动,从而给每个任务指明当前的事件时间。所以从这个意义上讲,水位线是一个覆盖万物的存在,它并不只针对事件时间窗口有效。之前我们讲到触发器时曾提到过“定时器”,时间窗口的操作底层就是靠定时器来控制触发的。既然是底层机制,定时器自然就不可能是窗口的专利了;事实上它是 Flink 底层 API— —处理函数(process function)的重要部分。
所以水位线其实是所有事件时间定时器触发的判断标准。那么水位线的延迟,当然也就是全局时钟的滞后,相当于是上帝拨动了琴弦,所有人的表都变慢了。既然水位线这么重要,那一般情况就不应该把它的延迟设置得太大,否则流处理的实时性就会大大降低。因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序,而网络传输的乱序程度一般并不会很大,大多集中在几毫秒至几百毫秒。所以实际应用中,我们往往会给水位线设置一个“能够处理大多数乱序数据的小延迟”,视需求一般设在毫秒~秒级。当我们设置了水位线延迟时间后,所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳,小于当前的水位线,那么它就是所谓的“迟到数据”。
~代码示例
// Define a window with a maximum lateness of 5 seconds Window<Tuple2<String, Integer>> window = Window.into(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5));
2、允许窗口处理迟到数据
水位线延迟设置的比较小,那之后如果仍有数据迟到该怎么办?对于窗口计算而言,如果水位线已经到了窗口结束时间,默认窗口就会关闭,那么之后再来的数据就要被丢弃了。自然想到,Flink 的窗口也是可以设置延迟时间,允许继续处理迟到数据的。这种情况下,由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。这样就可以逐步修正计算结果,最终得到准确的统计值了。
类比班车的例子,我们可以这样理解:大多数人是在发车时刻前后到达的,所以我们只要把表调慢,稍微等一会儿,绝大部分人就都上车了,这个把表调慢的时间就是水位线的延迟;到点之后,班车就准时出发了,不过可能还有该来的人没赶上。于是我们就先慢慢往前开,这段时间内,如果迟到的人抓点紧还是可以追上的;如果有人追上来了,就停车开门让他上来,然后车继续向前开。当然我们的车不能一直慢慢开,需要有一个时间限制,这就是窗口的允许延迟时间。一旦超过了这个时间,班车就不再停留,开上高速疾驰而去了。所以我们将水位线的延迟和窗口的允许延迟数据结合起来,最后的效果就是先快速实时地输出一个近似的结果,而后再不断调整,最终得到正确的计算结果。回想流处理的发展过程,这不就是著名的 Lambda 架构吗?原先需要两套独立的系统来同时保证实时性和结果的最终正确性,如今 Flink 一套系统就全部搞定了。
~代码示例
// Define a watermark strategy that drops data more than 10 watermarks late BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Integer>> watermarkStrategy = //有界无序时间戳提取器,设置 watermark 延迟时间,10 秒钟 new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Integer>>(Time.seconds(10)) { @Override public long extractTimestamp(Tuple2<String, Integer> element) { // extract the timestamp from the element return element.f0; } }; // Assign the watermark strategy to the input stream DataStream<Tuple2<String, Integer>> watermarkedStream = inputStream.assignTimestampsAndWatermarks(watermarkStrategy); // Process the watermarked stream watermarkedStream.process(new ProcessFunction<Tuple2<String, Integer>, String>() { @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) { // data with a timestamp more than 10 watermarks in the past will not be processed } });
3、将迟到数据放入窗口侧输出流
即使我们有了前面的双重保证,可窗口不能一直等下去,最后总要真正关闭。窗口一旦关闭,后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢?
用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。如果还用赶班车来类比,那就是车已经上高速开走了,这班车是肯定赶不上了。不过我们还留下了行进路线和联系方式,迟到的人如果想办法辗转到了目的地,还是可以和大部队会合的。最终,所有该到的人都会在目的地出现。所以总结起来,Flink 处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗口允许迟到数据,以及将迟到数据放入窗口侧输出流。
~代码示例
// Define an OutputTag for late data OutputTag<Tuple2<String, Integer>> lateDataOutputTag = new OutputTag<Tuple2<String, Integer>>("late-data"){}; // Apply a window to the stream with an allowed lateness of 5 seconds DataStream<Tuple2<String, Integer>> windowedStream = inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(5)) .sideOutputLateData(lateDataOutputTag); //Late data enters the measurement output stream // Process the main output stream windowedStream.apply((window, values, out) -> { // values in this stream will include only those that arrived within 5 seconds of the end of the window }); // Process the late data output stream DataStream<Tuple2<String, Integer>> lateDataStream = windowedStream.getSideOutput(lateDataOutputTag); lateDataStream.apply((window, values, out) -> { // values in this stream will include late data that arrived more than 5 seconds after the end of the window });
在上面简单示例中,我们将一个窗口应用于输入流,最大延迟为 5 秒。该sideOutputLateData()方法用于将任何迟到的数据输出到侧输出流,该侧输出流使用该getSideOutput()方法和lateDataOutputTagOutputTag 进行访问。
主输出流将仅包含在窗口结束后 5 秒内到达的那些值,而延迟数据输出流将包含在窗口结束后超过 5 秒到达的任何值。