如何生成水位线
上一节中我们讲到,水位线是用来保证窗口处理结果的正确性的,如果不能正确处理所有乱序数据,可以尝试调大延迟的时间。那在实际应用中,到底应该怎样生成水位线呢?本节我们就来讨论这个问题。
1.生成水位线的总体原则
我们知道,完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即,我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据,我们该怎么做呢?
我们知道,完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即,我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据,我们该怎么做呢?
更多的情况下,我们或许没那么大把握。毕竟未来是没有人能说得准的,我们怎么能确信未来不会出现一个超级迟到数据呢?所以另一种做法是,可以单独创建一个 Flink 作业来监控事件流,建立概率分布或者机器学习模型,学习事件的迟到规律。得到分布规律之后,就可以选择置信区间来确定延迟,作为水位线的生成策略了。例如,如果得到数据的迟到时间服从μ=1,σ=1 的正态分布,那么设置水位线延迟为 3 秒,就可以保证至少 97.7%的数据可以正确处理。
如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。对于这些 “漏网之鱼”,Flink 另外提供了窗口处理迟到数据的方法,我们会在后面介绍。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。接下来我们就具体了解一下水位线在代码中的使用。
2.水位线生成策略(Watermark Strategies)
在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy)
具体使用时,直接用 DataStream 调用该方法即可,与普通的 transform 方法完全一样。
DataStream<Event> stream = env.addSource(new ClickSource()); DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);
这里读者可能有疑惑:不是说数据里已经有时间戳了吗,为什么这里还要“分配”呢?这是因为原始的时间戳只是写入日志数据的一个字段,如果不提取出来并明确把它分配给数据,Flink 是无法知道数据真正产生的时间的。当然,有些时候数据源本身就提供了时间戳信息,比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳,而不需要单独提取字段分配了。
.assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是 所 谓 的 “ 水 位 线 生 成 策 略 ” 。 WatermarkStrategy 中 包 含 了 一 个 “ 时 间 戳 分 配器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{ @Override TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context); @Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
env.getConfig().setAutoWatermarkInterval(60 * 1000L);
3.Flink 内置水位线生成器
WatermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的需求;但看起来有些复杂,如果想要自己实现应该还是比较麻烦的。好在 Flink 充分考虑到了我们的痛苦,提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。
这两个生成器可以通过调用 WatermarkStrategy 的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。
(1)有序流
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
stream.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) );
上面代码中我们调用.withTimestampAssigner()方法,将数据中的 timestamp 字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。
这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。
(2)乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.addSource(new ClickSource()) // 插入水位线的逻辑 .assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 5s WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ) .print(); env.execute();
上面代码中,我们同样提取了 timestamp 字段作为时间戳,并且以 5 秒的延迟时间创建了处理乱序流的水位线生成器。
事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为 0 的乱序流水位线生成器,两者完全等同:
WatermarkStrategy.forMonotonousTimestamps() WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
这里需要注意的是,乱序流中生成的水位线真正的时间戳,其实是 当前最大时间戳 – 延迟时间 – 1,这里的单位是毫秒。为什么要减 1 毫秒呢?我们可以回想一下水位线的特点:时间戳为 t 的水位线,表示时间戳≤t 的数据全部到齐,不会再来了。如果考虑有序流,也就是延迟时间为 0 的情况,那么时间戳为 7 秒的数据到来时,之后其实是还有可能继续来 7 秒的数据的;所以生成的水位线不是 7 秒,而是 6 秒 999 毫秒,7 秒的数据还可以继续来。这一点可以在 BoundedOutOfOrdernessWatermarks 的源码中明显地看到:
public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); }
4.自定义水位线策略
一般来说,Flink 内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑可能非常复杂,这时对水位线生成的逻辑也有更高的要求,我们就必须自定义实现水位线策略WatermarkStrategy 了。
在 WatermarkStrategy 中,时间戳分配器 TimestampAssigner 都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于 WatermarkGenerator 的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。
还记得 WatermarkGenerator 接口中的两个方法吗?——onEvent()和 onPeriodicEmit(),前者是在每个事件到来时调用,而后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。
(1)周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线。
import org.apache.flink.api.common.eventtime.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Test1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.addSource(new ClickSource()) .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) .print(); env.execute(); } public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> { @Override public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段 } }; } @Override public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new CustomPeriodicGenerator(); } } public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> { private Long delayTime = 5000L; // 延迟时间 private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳 @Override public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) { // 每来一条数据就调用一次 maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳 } @Override public void onPeriodicEmit(WatermarkOutput output) { // 发射水位线,默认 200ms 调用一次 output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } } }
我们在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认 200ms 一次。所以水位线的时间戳是依赖当前已有数据的最大时间戳的(这里的实现与内置生成器类似,也是减去延迟时间再减 1),但具体什么时候生成与数据无关。
(2)断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。
public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> { @Override public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) { // 只有在遇到特定的 itemId 时,才发出水位线 if (r.user.equals("Mary")) { output.emitWatermark(new Watermark(r.timestamp - 1)); } } @Override public void onPeriodicEmit(WatermarkOutput output) { // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线 } }
我们在 onEvent()中判断当前事件的 user 字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。
5. 在自定义数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用 assignTimestampsAndWatermarks 方法 来 生 成 水 位 线 了 。 在 自 定 义 数 据 源 中 生 成 水 位 线 和 在 程 序 中 使 用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。示例程序如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import java.util.Calendar; import java.util.Random; public class EmitWatermarkInSourceFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.addSource(new ClickSourceWithWatermark()).print(); env.execute(); } // 泛型是数据源中的类型 public static class ClickSourceWithWatermark implements SourceFunction<Event> { private boolean running = true; @Override public void run(SourceContext<Event> sourceContext) throws Exception { Random random = new Random(); String[] userArr = {"Mary", "Bob", "Alice"}; String[] urlArr = {"./home", "./cart", "./prod?id=1"}; while (running) { long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳 String username = userArr[random.nextInt(userArr.length)]; String url = urlArr[random.nextInt(urlArr.length)]; Event event = new Event(username, url, currTs); // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段 sourceContext.collectWithTimestamp(event, event.timestamp); // 发送水位线 sourceContext.emitWatermark(new Watermark(event.timestamp - 1L)); Thread.sleep(1000L); } } @Override public void cancel() { running = false; } } }
在自定义水位线中生成水位线相比 assignTimestampsAndWatermarks 方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写 Flink 的测试程序,测试 Flink 的各种各样的特性。
水位线的传递
我们知道水位线是数据流中插入的一个标记,用来表示事件时间的进展,它会随着数据一起在任务间传递。如果只是直通式(forward)的传输,那很简单,数据和水位线都是按照本身的顺序依次传递、依次处理的;一旦水位线到达了算子任务, 那么这个任务就会将它内部的时钟设为这个水位线的时间戳。
在这里,“任务的时钟”其实仍然是各自为政的,并没有统一的时钟。实际应用中往往上下游都有多个并行子任务,为了统一推进事件时间的进展,我们要求上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。这样,后续任务就不需要依赖原始数据中的时间戳(经过转化处理后,数据可能已经改变了),也可以知道当前事件时间了。
可是还有另外一个问题,那就是在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,所以同一时刻发给下游任务的水位线可能并不相同。这时下游任务又该听谁的呢?
这就要回到水位线定义的本质了:它表示的是“当前时间之前的数据,都已经到齐了”。这是一种保证,告诉下游任务“只要你接到这个水位线,就代表之后我不会再给你发更早的数据了,你可以放心做统计计算而不会遗漏数据”。所以如果一个任务收到了来自上游并行任务的不同的水位线,说明上游各个分区处理得有快有慢,进度各不相同比如上游有两个并行子任务都发来了水位线,一个是 5 秒,一个是 7 秒;这代表第一个并行任务已经处理完 5 秒之前的所有数据,而第二个并行任务处理到了 7 秒。那这时自己的时钟怎么确定呢?当然也要以“这之前的数据全部到齐”为标准。如果我们以较大的水位线 7 秒作为当前时间,那就表示“7 秒前的数据都已经处理完”,这显然不是事实——第一个上游分区才处理到 5 秒,5~7 秒的数据还会不停地发来;而如果以最小的水位线 5 秒作为当前时钟就不会有这个问题了,因为确实所有上游分区都已经处理完,不会再发 5 秒前的数据了。这让我们想到“木桶原理”:所有的上游并行任务就像围成木桶的一块块木板,它们中最短的那一块,决定了我们桶中的水位。
我们可以用一个具体的例子,将水位线在任务间传递的过程完整梳理一遍。如图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:
(1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线” (Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
(2)当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
(3)再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
(4)同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间戳为 4 的水位线,广播到下游各个分区任务。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。对于有多条流合并之后进行处理的场景,水位线传递的规则是类似的。关于 Flink中的多流转换,我们会在后续章节中介绍。
水位线的总结
水位线在事件时间的世界里面,承担了时钟的角色。也就是说在事件时间的流中,水位线是唯一的时间尺度。如果想要知道现在几点,就要看水位线的大小。后面讲到的窗口的闭合,以及定时器的触发都要通过判断水位线的大小来决定是否触发。
水位线是一种特殊的事件,由程序员通过编程插入的数据流里面,然后跟随数据流向下游流动。
水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒。
所以这里涉及到一个问题,就是不同的算子看到的水位线的大小可能是不一样的。因为下游的算子可能并未接收到来自上游算子的水位线,导致下游算子的时钟要落后于上游算子的时钟。比如 map->reduce 这样的操作,如果在 map 中编写了非常耗时间的代码,将会阻塞水位线的向下传播,因为水位线也是数据流中的一个事件,位于水位线前面的数据如果没有处理完毕,那么水位线不可能弯道超车绕过前面的数据向下游传播,也就是说会被前面的数据阻塞。这样就会影响到下游算子的聚合计算,因为下游算子中无论由窗口聚合还是定时器的操作,都需要水位线才能触发执行。这也就告诉了我们,在编写 Flink 程序时,一定要谨慎的编写每一个算子的计算逻辑,尽量避免大量计算或者是大量的 IO 操作,这样才不会阻塞水位线的向下传递。
在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE)的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保证所有的窗口闭合以及所有的定时器都被触发。
对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算的正确,无需在数据流的中间插入水位线了。
水位线的重要性在于它的逻辑时钟特性,而逻辑时钟这个概念可以说是分布式系统里面最为重要的概念之一了,理解透彻了对理解各种分布式系统非常有帮助。具体可以参考 LeslieLamport 的论文。
尚硅谷yyds
学习资料来自于尚硅谷:https://www.bilibili.com/video/BV133411s7Sa?p=1