简介
本章主要介绍鲁棒的处理乱序数据的核心概念,这些概念的运用使流处理系统超越批处理系统的关键所在。
本章我们从流计算系统的底层机制深入来探讨一下watermark。学习这些机制有助于我们更好理解和使用watermark。我们将讨论watermark如何生成,传播和影响输出结果的时间戳。我们还将解释,watermark如何保证结果的正确性。
本文由《Streaming System》一书第三章的提炼翻译而来,译者才疏学浅,如有错误,欢迎指正。转载请注明出处,侵权必究。
定义
对任何一个持续输入和输出数据的管道来说,我们希望知道如何判断事件时间窗口的结束。窗口结束之后,不会再有这个窗口的数据到来。
判断事件事件窗口结束的第一种方式是,按照处理事件来判断。但是真实世界中,处理事件一定比事件时间晚,并且由于各种原因导致的数据乱序问题,会导致数据进入错误的窗口,这种方式行不通。
另一种方式,是根据管道处理数据的速率来判断数据是否到齐。但这种方式及其不靠谱,输入数据量变化,网络抖动,逻辑错误等太多行为都会影响到数据处理速率。这个指标完全不能判断某个窗口的数据是否到齐。
我们需要一种更鲁棒和合理的方式,来判断无界数据流的窗口结束时间。在此,我们做一个假设:假设无界数据流中的数据都有一个事件时间(即事件发生的事件)。基于这个假设,我们研究一下事件时间在计算管道(pipeline)中的假设。在分布式系统中,这个管道可能在多个计算节点(agent)上执行,多个agents同时消费源头数据,并不能保证数据的有序,其事件时间的分布如下:
上图表示了:流计算中,在eventtime上等待处理和处理完成的消息的分布。新消息进入流计算系统时,是待处理状态(in-fligt),被处理完成后,变成处理完成(completed)状态。这个分布图的关键点,是待处理消息最左边的那条边界线(即红色和蓝色部分中间的边界线),这条线代表了管道中待处理数据的最早时间戳。我们用这条线来定义watermark:
Watermark是单调递增的,管道中最早的待处理数据的事件时间戳。
Watermark两个基本属性:
- 完整性(completeness):单调递增,表示如果watermark经过了事件点T,那么T之前的数据已经到齐,T之前的窗口可以关闭了。
- 可视性(vilibility):如果管道中消息由于某个原因被堵住了,则这个管道的watermark也就停止了。需要找到阻止watermark更新的消息才能处理问题。
创建数据源的watermark
watermark是怎么产生的呢?从之前内容了解到,每条数据都有事件时间。watermark有两类:完美式(perfect)watermark和启发式(heuristic)watermark。我们看以下例子来了解两类watermark的区别:
在这个窗口求和例子中,左边是完美型watermark,右边是启发式watermark。从这个例子里可以看出,完美型watermark能100%保证没有late event(晚到数据),而启发式watermark允许有late event(晚到数据)出现。watermark创建后,在pipeline下游会一直存在。至于创建完美型还是启发式watermark,完全跟数据源有关。接下来通过一些例子来说明。
创建完美式watermark
完美型watermark能严格保证窗口不需要处理晚到数据(late event),也就是当watermark通过某个eventtime时,不会再有这个event time之前的数据。真实世界的分布式系统数据源是无法保证的。一些可以使用perfect watermark的例子:
- 入口时间戳(Ingress Timestamping) : 将数据进入系统的时间(即处理时间)当作数据的事件事件。2016年前几乎所有的流计算系统都是这样做的,这种方式非常简单,能保证数据单调递增,但是坏处是处理时间与数据真正的事件事件没有关系,数据真正的事件时间在计算过程中被抛弃了。
按时间排序的静态日志集(Static sets of time-ordered logs):按时间排序的日志,且大小固定的数据源(比如某个kafka topic中,有固定数量的partition,且每个partition中数据的event time严格单调递增)。这种场景中,只需要知道有几个partition,和每个partition中待处理数据中eventtime最早的事件即可得出watermark。
所以使用perfect watermark的关键是保证数据源数据在event time上单调递增
创建启发式watermark
与完美型watermark相反,启发式watermark假设event time在watermark之前的数据已经到齐。当然,使用启发式watermark的管道难免会遇到late event。但是只要方式得当,是可以得到一个比较合理的启发式watermark的。系统需要提供一种方式来处理late event来保证正确性。使用启发式watermark的例子如下:
- 按事件排序的动态日志集:一系列动态结构化日志文件(比如:每个日志文件内部的数据在event time上单调递增,但是文件之间时间没有关系)。所有文件的数据进入Kafka后,在event time上就不能保证单调递增了。在这种场景下,可以通过跟踪最早的待处理数据的event time,数据增长率,网络拓扑,可用带宽等信息,来得到一个相比之下非常精确的watermark。
创建启发式watermark时,没有一个统一的方式,需要根据数据源类型,数据分布等信息 case by case的看。一旦watermark被创建,其就会在pipeline中被传递下去,且类型不会变。这样整个计算管道数据完整性的问题就在数据源头被解决了。
Watermark传播
生产任务的pipeline中通常有多个stage,在源头产生的watermark会在pipeline的多个stage间传递。了解watermark如何在一个pipeline的多个stage间进行传递,可以更好的了解watermark对整个pipeline的影响,以及对pipeline结果延时的影响。我们在pipeline的各stage的边界上对watermark做如下定义:
- 输入watermark(An input watermark):捕捉上游各阶段数据处理进度。对源头算子,input watermark是个特殊的function,对进入的数据产生watermark。对非源头算子,input watermark是上游stage中,所有shard/partition/instance产生的最小的watermark
输出watermark(An output watermark):捕捉本stage的数据进度,实质上指本stage中,所有input watermark的最小值,和本stage中所有非late event的数据的event time。比如,该stage中,被缓存起来等待做聚合的数据等。
通过每个stage的input watermark和output watermark可以计算出该stage在event time上产生的延时(event time lantency = output watermark-input watermark).比如,一个10s的聚合窗口,会产生一个>=10s的延时。管道中,每个stage根据具体的操作会对event time做相应延时。
每个stage内的操作并不是线性递增的。概念上,每个stage的操作都可以被分为几个组件(components),每个组件都会影响pipeline的输出watermark。每个组件的特性与具体的实现方式和包含的算子相关。理论上,这类算子会缓存数据,直到触发某个计算。比如缓存一部分数据并将其存入状态(state)中,直到触发聚合计算,并将计算结果写入下游stage。
上图为流计算系统中包含待处理数据缓存功能组件的stage示意图。每个component都有watermark。而整个stage的watermark就是所有buffer的watermark的最小值。在该例子中,watermark可以是以下项的最小值:
- 每个source的watermark(Per-source watermark) - 每个发送数据的stage.
- 每个外部数据源的watermark(Per-external input watermark) - pipeline之外的数据源
- 每个状态组件的watermark(Per-state component watermark) - 每种需要写入的state类型
- 每个输出buffer的watermark(Per-output buffer watermark) - 每个接收stage
这种精度的watermark能够更好的描述系统内部状态。能够更简单的跟踪数据在系统各个buffer中的流转状态,有助于排查数据堵塞问题。
理解watermark的传播
我们用一个例子来更好理解输入和输入watermark的关系以及他们如何影响watermark在计算pipeline中的传播。例子:通过计算用户玩游戏的时间来评估用户对游戏的参与度。假设游戏有pc端和手机端这两个数据源。就需要写两个pipeline来分别对每个数据源计算用户游戏得分。虽然计算逻辑相同,但由于数据源不同,会导致watermark非常不同。
计算用户游戏session时长的伪代码如下(session window):
PCollection<Double> mobileSessions = IO.read(new MobileInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))
.triggering(AtWatermark())
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))
.triggering(AtWatermark())
.discardingFiredPanes())
.apply(CalculateWindowLength());
该计算中,先按user进行分区,再对每个user开个中断时长为1min的session window,然后计算这个session的长度,即得到用户玩游戏的时长。两个pipeline的输出如下:
上图表示每个用户的session时长。
接着,我们求一下游戏用户的平均游戏时长,首先每个用户的session时长,再将两个流的数据合并成一个流,求固定窗口内的session时长,求手机端和PC端用户时长的伪代码如下:
PCollection<Double> mobileSessions = IO.read(new MobileInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))
.triggering(AtWatermark())
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))
.triggering(AtWatermark())
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Float> averageSessionLengths = PCollectionList
.of(mobileSessions).and(consoleSessions)
.apply(Flatten.pCollections())
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(AtWatermark())
.apply(Mean.globally());
两个重要的点:
- 输出watermark一定比输入watermark早。
- 这个例子中,求平均时长的stage的watermark是两个输入中watermark较早的那个。
pipeline中,下游各stage的watermark一定比上游小(早)。这个例子中,pipeline的逻辑修改起来非常简单,通过研究这个pipeline,我们再看研究watermark的另一个问题:输出时间戳(output timestamp)
watermark传播和输出时间戳
这个例子中,第二个求平均时长的stage的输出结果中的每条输出数据都带了个时间戳。由上文所述,watermark是单调递增的,不允许回退,那么窗口的输出数据的时间戳有以下几种赋值方式:
- 窗口结束时间:将窗口结束时间作为这个窗口输出数据的时间戳。这种方式系统的性能最好。
- 窗口中第一个非迟到数据的时间戳:用窗口中第一个非late event的数据的时间戳,作为窗口所有输出数据的时间戳是一种非常保守的方式,并会对系统性能有一定影响
- 用窗口中某个元素的时间戳:在某种用例中,比如一个查询流join一个点击流,有时希望用查询流的时间戳做watermark,有时又希望用点击流的时间戳做watermark。
接下来我们用个例子来说明输出时间戳在整个pipeline中的作用,用窗口中第一个非late event数据的时间戳作为窗口输出数据的时间戳的伪代码如下:
PCollection<Double> mobileSessions = IO.read(new MobileInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))
.triggering(AtWatermark())
.withTimestampCombiner(EARLIEST)
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource())
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))
.triggering(AtWatermark())
.withTimestampCombiner(EARLIEST)
.discardingFiredPanes())
.apply(CalculateWindowLength());
PCollection<Float> averageSessionLengths = PCollectionList
.of(mobileSessions).and(consoleSessions)
.apply(Flatten.pCollections())
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(AtWatermark())
.apply(Mean.globally());
这个图中,与上个图相比,watermark被hold住了,因为上个图片中的时间戳选的是窗口结束,但是这个图片中,选第一个非late event的数据的时间戳。同时可以明显看出,第二个stage的watermark也被hold住了。
两种时间戳选择方式的语义区别如下:
两点值得注意:
- watermark delay:第二种方式watermark明显比第一中方式慢,因为只有窗口结束,watermark才能向前推进
- 语义差别:watermark被定为第一个非late event数据的时间戳,在本例子中,同一个session可能落在不同的固定窗口中。这两种方式没有谁对谁错,但是在用之前,需要明确知道内部原理才能正确使用watermark
窗口重叠的复杂case
如何处理重叠窗口的输出时间戳非常重要和复杂。如果将输出数据时间戳设为窗口中最早非late event数据的时间戳,会导致下游窗口有很大延时。假设一个有两个stage的pipeline,一个元素通过三个连续的滑窗。理想的语义是:
- 第一个窗口在第一个stage结束,并输出到下游
- 第一个窗口在第二个stage结束,输出到下游
- 一段时间之后,第二个窗口在第一个stage结束,然后循环往复。。。
但真正的语义是:
- 第一个窗口在第一个stage结束,并输出到下游
- 第一个窗口在第二个stage无法输出,因为其输入watermark被第二个和第三个窗口拖住了,因为用了窗口中最早数据的时间戳作为窗口输出时间戳
- 第二个窗口在第一个stage结束,并输出到下游
- 第一个窗口和第二个窗口在第二个stage仍然不能输出,被上游第三个窗口拖住
- 第三个窗口在第一个stage结束,并输出到下游
- 第一/二/三个窗口在第二个stage同时输出
这种方式虽然结果是对的,但会大大增加不必要的延时。
百分比Watermark
我们可以分析event time的分布来得到窗口更好的触发时间。也就是说,在某个时间点,我已经处理了某个event time之前百分之多少的数据,示意图如下:
这样做的好处是,可以排除部分长尾数据,使watermark更快的向前推进,从而减小整个pipeline的延时。
看个例子,在2分钟固定窗口中,可以根据输入数据百分比画几条线:
该图展示了33%,66%和100% watermark。33%和66%的watermark使窗口更早输出,当然后果是更多的数据成了late event。比如[12:00,12:02)的窗口中,33%的watermark,只有4个元素会被计算,其他元素都是late event,但是窗口在处理时间的12:06就会输出。66%的watermark有7个元素,在12:07输出,而100% watermark有10个元素,窗口要在12:08才会输出。百分比watermark为我们提供了一种平衡输出延时和结果正确性的方式。
处理时间watermark
event time上的watermark解决了处理的输入与当前时间差别的问题,但不能解决这个延时到底是由于系统问题还是数据问题导致的。由此我们引入process time上的watermark。
process time上的watermark的定义与event time watermark定义完全相同,使用最早的还未完成的算子的时间作为process time上的watermark,其与当前时间的延时,有几种原因:从一个stage到另一个stage的数据发送被堵住了,访问state的IO被堵住了,处理过程中有异常。
上图表示了一个pipeline中,event time上的延时一直在增加,但是并不知道为啥增加。
![图片 1.png | center | 747x252]
结合event time和process time的延时,我们可以推断出由于某种原因导致系统无法及时处理数据。比如网络问题或者系统异常等等。process time上的延时就表明了由于某些系统异常导致了数据无法被及时处理,需要引起管理员的注意。
这个例子中,process time延时正常,但是event time上延时很高,说明系统是正常的,可能是由于系统中由于一些原因缓存了一些数据才导致了输出延时增长。
上图表示固定窗口的event time和process time的延时曲线。process time上延时一直是正常的,event time上延时会随着窗口缓存数据而增加,一旦窗口被触发输出,event time上的延时又变小。
因此process time上的watermark是区分数据延时或系统延时的非常好的工具。同时,在系统实现层面,process time上的watermark通常在系统垃圾回收会临时状态清理过程中使用。
案例解析
上文详述了watermark的理论知识,接下来我们看看现在主流的流计算系统是如何在正确性和延时之间做权衡,来实现这些理论的。
Google Cloud Dataflow 中的 Watermark
Dataflow每个stage,都将输入的数据按照key的范围分片,每个物理worker只负责一个key分区。当pipeline中又GroupByKey操作时,数据必须按照key被分发到相应的worker上进行计算,其逻辑示意图如下:
上图表明,在每个stage,数据都会被按key进行shuffle。其数据物理分区示意图如下:
每个worker中都负责两个stage的计算,每个stage计算完成后,数据都需要按key被分发到相应的worker进行下一阶段的计算。
Dataflow记录每个stage中每个组件的的每个key分区的watermark,watermark的聚合操作,会分析所有key分区的watermark的最小值,保证:
- 所有分区都有watermark。如果有某个分区没有watermark,整个pipeline的watermark都不回前进
- 保证分区的watermark单调递增。
Dataflow中有个中心化的聚合代理(agent),来处理watermark的聚合。agent是分布式的,可以更有效的做聚合操作。从正确行的角度来看,只有这个聚合agent才知道真正正确的watermark。
这种分布式watermark聚合的架构中,如何保证watermark的正确性是一个非常大的挑战。首先,必须保证watermark不能被提前触发,否则会造成很多的late event。每个worker都会被分配一段key空间,这个worker需要负责这段key的数据的state的更新/清理工作。在更新某个worker进程的watermark的时候,必须保证这个worker还在负责这段key的state的管理。否则,watermark更新协议就必须负责校验state的归属情况。
Apache Flink中的watermark
Flink是一个开源的流处理框架,能够支持分布式,高性能,高可用的流计算应用,且保证数据正确性。flink的计算管道示意图如下:
这个管道中有两个数据源,每个数据源都会产生一个watermark检查点,会与数据一起被发送到下游。比如,对数据源A来说,输出53这个watermark就意味这,不会再输出53之前的数据。下游的keyby算子会收到53这个watermark与其对应的数据。从operator的视角来看,他会不断的接到数据及watermark,并不断输出数据和watermark。
分布式发送watermark的方式,与DataFlow中心化管理watermark的方式相比,有几个优势:
- 减少watermark传输的时延
- 没有watermark聚合agent这种单点问题
- 可扩展性更强
当然DataFlow中心化管理watermark的方式也有一些优点: - Single source of “truth”:在一些debug场景中,如果能从一个地方拿到所有数据的watermark处理信息,会非常方便分析问题。而分布式watermark架构中,debug就非常麻烦
- 创建源头watermark:在某些场景中,需要其他系统的一些信息来产生watermark,使用中心化方式处理watermark就很方便了