这里还有再说三个概念:
Watermarks:水印是关于事件时间的输入完整性的概念。如果到某一个时间的水印,应该是已经获取到了小于该时间的所有数据。在处理无界数据时,水印就作为处理进度的标准。
Triggers: 触发器是一种机制,用于声明窗口何时应该输出,触发器可灵活选择何时应发出输出。我们可以随着时间的推移不断改进结果,也可以处理那些比水印晚到达的数据,改进结果。
Accumulation: 累积模式指定在同一窗口中观察到的多个结果之间的关系。这些结果可能是完全脱节的,即随着时间的推移表示独立的增量,或者它们之间可能存在重叠。
四个新的问题:what?where?when?How?
计算什么? 希望通过数据计算的结果,和批处理类似,构建直方图,计算总和,训练机器学习等等。
在哪里计算?事件时间窗口可以回答这个问题,比如之前提到的(固定,滑动,会话),当然这个时间也可能是处理时间。
什么时候处理产生结果?通过水印和触发器来回答。可能有无限的变化,常见的模式是使用水印描述给定窗口的输入是否完整,触发器指定早期和后期结果。
结果如何相关? 通过累计模式来回答,丢弃不同的,累积产生的结果。
一、Streaming 101 Redux
详细介绍Streaming 101的一些概念,并提供一些例子。
What:transformations
计算的结果是什么?熟悉批处理的应该很熟悉这个。
举一个例子,计算由10个值组成的简单数据集的整数和。您可以想象为求一组人的分数和,或者是计费,监控等场景。
如果您了解Spark Streaming或Flink之类的东西,那么您应该相对容易地了解Dataflow代码正在做什么。
Dataflow Java SDK 模型:
- PCollections,表示可以执行并行转换的数据集(可能是大量的数据集)。
- PTransforms,将PCollections创建成新的PCollections。PTransforms可以执行逐元素变换,它们可以将多个元素聚合在一起,或者它们可以是多个PTransforms的组合。
图二 转换类型
我们从IO源中获取消息,以KV的形式转换,最后求出分数和。示例代码如下:
PCollection<String> raw = IO.read(...); PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn()); PCollection<KV<String, Integer>> scores = input .apply(Sum.integersPerKey());
这个过程可以是在多个机器分布式执行的,分布的将不同时间情况的数据进行累加,输出得到最终的结果,我们不用关心分布式的问题,只要把所有的结果集转换累加即可。
图三 x为事件时间 y为处理时间
这里我们计算的是所有事件时间,没有进行窗口转换,因此输出矩形覆盖整个X轴,但是我们处理无界数据时,这就不够了,我们不能等到结束了再处理,因为永远不会结束。所有我们需要考虑在哪里计算呢?这就需要窗口。
Where:windowing
还记得我们之前提过的三种窗口,固定,滑动,会话。
图四 三种窗口
我们用刚才的例子,将其固定为两分钟的窗口。
PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))) .apply(Sum.integersPerKey());
Dataflow提供了一个统一的模型,可以在批处理和流式处理中同时工作,因为批处理实际上只是流的一个子集。
图五 窗口处理
和以前一样,输入的数据在累积,直到它们被完全处理,然后产生输出。在这种情况下,我们得到四个输出而不是一个输出:四个基于这个两分钟事件时间窗口中的单个输出。
现在我们可以通过更具体的水印,触发器和累计来解决更多的问题了。
二、Streaming 102
刚才的处理还是通用的批处理方式,延迟很大,但我们已经成功把每个窗口的输入都计算了,我们目前缺乏一种对无限数据处理方法,还要能保证其完整性。
When
Watermarks
水印是什么时候处理产生结果?其实也就是我们之前研究事件时间和处理时间的那张图。
上文图 事件时间 处理时间 水印
这条红色曲线就是水印,它随着处理时间的推移不断的去捕获事件时间。从概念上讲,我们将其视为从处理时间到事件时间的映射。水印可以有两种类型:
完美水印:这要求我们对的输入数据全部了解。也就没有了后期数据,所有的数据准时到达。
启发式水印:对于大部分分布式输入源,完整的了解输入数据是不可能的,这就需要启发式水印。启发式水印通过分区,分区排序等提供尽可能准确的估计。所以是有可能错误的,这就需要触发器在后期解决,这个一会会讲。
下面是两个使用了不同水印的流处理引擎:
图六 左完美 右启发
在这两种情况下,当水印通过窗口的末端时,窗口被实现。两次执行之间的主要区别在于右侧水印计算中使用的启发式算法未考虑9的值,这极大地改变了水印的形状。这些例子突出了水印的两个缺点:
太慢:如果因为网络等原因导致有数据未处理时,只能延迟输出结果。左图比较明显,迟到的9影响了整体的进度,这对于第二个窗口[12:02,12:04]尤为明显,从窗口中的第一个值开始到我们看到窗口的任何结果为止需要将近7分钟。而启发式水印要好一点只用了两分钟。
太快:当启发式水印错误地提前超过应有的水平时,水印之前的事件时间数据可能会在一段时间后到达,从而产生延迟数据。这就是右边示例中发生的情况:在观察到该窗口的所有输入数据之前,水印超过了第一个窗口的末尾,导致输出值不正确,正确的应该是14。这个缺点严格来说是启发式水印的问题, 他们的启发性意味着他们有时会出错。因此,如果您关心正确性,单靠它们来确定何时实现输出是不够的。
这时候我们就需要触发器。
triggers
触发器用于声明窗口何时应该输出。
触发的信号包括:水印进度,处理时间进度,计数,数据触发,重复,逻辑与AND,逻辑或OR,序列。
还是用上面的例子,我们增加一个触发器:
PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering(AtWatermark())) .apply(Sum.integersPerKey());
这里规定了触发的情况,我们可以考虑水印太快和太慢的情况。
太慢时,我们假设任何给定窗口都存在稳定的传入,我们可以周期性的触发。
太快时,可以在后期数据到达后去修正结果。如果后期数据不频繁,并不会影响性能。
最后我们可以综合考虑,协调早期,准时,晚期的情况:
PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering( AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1)))) .apply(Sum.integersPerKey());
生成结果如下,这个版本有了明显的改进:
图七 增加早期晚期
对于[12:02,12:04]窗口太慢的情况,每分钟定时更新。延迟时间从七分钟减少到三分半。
对于[12:00,12:02]窗口太快的情况,当值9显示较晚时,我们立即将其合并到一个值为14的新的已更正窗格中。
但是这里有一个问题,窗口要保持多长时间呢?这里我们需要垃圾收集机制。
Garbage collection
在[启发式水印示例中,每个窗口的持久状态在示例的整个生命周期,这是必要的,这样我们才能够在他们到达时适当处理迟到的数据。但是,虽然能够保持所有持久状态直到时间结束是很棒的,但实际上,在处理无限数据源时,保持给定窗口的状态通常是不切实际的。无限, 我们最终会耗尽磁盘空间。
因此,任何真实的无序处理系统都需要提供一些方法来限制它正在处理的窗口的生命周期。
我们可以定义一个范围,当超出这个范围后,我们就丢弃无用的数据。
PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .triggering( AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1))) .withAllowedLateness(Duration.standardMinutes(1))) .apply(Sum.integersPerKey());
一旦水印通过窗口的延迟范围,该窗口就会关闭,这意味着窗口的所有状态都将被丢弃。
图八 垃圾收集
这里的6在允许迟到的范围内,可以被收集,而9不在这个范围,就被丢弃了。
有两点要注意:
如果您正在使用可获得完美水印的数据源的数据,就不需要处理延迟数据。
即使在使用启发式水印时,如果是将有限数量聚合,而且能保证一直可控,也不用考虑窗口的寿命问题。
现在时间的问题解决了,下面我们讨论如何累积数据。
How:Accumulation
有三种不同的累积模式:
丢弃:当下游的消费者进行累积计算时,直接相加所要的,就可以得到最终结果。
累积:比如未来的可以覆盖之前的,一直要保持最新状态,例如Hbase这种键值对的存储。
累积和撤回:和累积类似,但更复杂。比如重新分组的情况,可能不只是覆盖那么简单,需要先删掉之前的,再加入最新的;还有动态窗口的情况,新窗口会替换旧窗口,但数据要放在不同的位置。
比如上图中事件时间范围[12:02,12:04],下表显示了三种累积模式:
丢弃 | 累积 | 累积和收回 | |
窗格1:[7] | 7 | 7 | 7 |
第2页:[3,4] | 7 | 14 | 14,-7 |
第3页:[8] | 8 | 22 | 22,-14 |
观察到最后的价值 | 8 | 22 | 22 |
总和 | 22 | 51 | 22 |
丢弃:每个窗格仅包含在该特定窗格期间到达的值。因此,观察到的最终值并未完全捕获总和。但是,如果您要自己对所有独立窗格求和,那么您将得到22的正确答案。
累积:每个窗格结合了特定窗格期间到达的值,加上从先前的窗格中的所有值。因此,正确观察到的最终值可以捕获22的总和。
累积和撤回:每个窗格都包含新的累积模式值以及前一个窗格值的缩进。因此,观察到的最后一个(非回缩)值以及所有物化窗格的总和(包括撤回)都为您提供了22的正确答案。这就是撤回如此强大的原因。
图九 三种累积模式
随着丢弃,累积,累积和撤回的顺序,存储和计算成本在提高,因此累积模式的选择要在正确性,延迟和成本中做出选择。
When*/*Where: Processing-time windows
我们已经解决了所有四个问题,What,Where,When,How。但我们都是再事件时间的固定窗口。
所以我们还要讨论一下处理时间中的固定窗口和事件时间中的会话窗口。
先讨论处理时间中的固定窗口,处理时间窗口很重要,原因有两个:
- 对于某些用例,例如使用监控(例如,Web服务流量QPS),您希望在观察到的情况下分析传入的数据流,处理时窗口绝对是适当的方法。
- 对于事件发生的时间很重要的用例(例如,分析用户行为趋势,计费,评分等),处理时间窗口绝对是错误的方法,并且能够识别这些情况是至关重要的。
有两种方法可用于实现处理时窗口:
触发器:忽略事件时间(即,使用跨越所有事件时间的全局窗口)并使用触发器在处理时间轴上提供该窗口的快照。
入口时间:将入口时间指定为数据到达时的事件时间,并使用正常的事件时间窗口。这基本上就像Spark Streaming目前所做的那样。
处理时间窗口的一个重大缺点是,当输入的观察顺序发生变化时,窗口的内容会发生变化。为了以更具体的方式展示,我们将看看这三个用例:
这里我们将两种事件时间相同而处理时间不同的情况比较。
事件时间窗口
图10 事件时间窗口
四个窗口最终结果依然相同。
通过触发器处理时间窗口
使用全局事件时间窗口,在处理时间域定期触发,使用丢弃模式进行
图11 触发器处理时间窗口
- 由于我们通过事件时间窗格模拟处理时间窗口,因此在处理时间轴中描绘了“窗口”,这意味着它们的宽度是在Y轴而不是X轴上测量的。
- 由于处理时间窗口对遇到输入数据的顺序敏感,因此每个“窗口”的结果对于两个观察订单中的每一个都不同,即使事件本身在技术上在每个版本中同时发生。在左边我们得到12,21,18,而在右边我们得到7,36,4。
通过入口时间处理时间窗口
当元素到达时,它们的事件时间需要在入口时被覆盖。返回使用标准的固定事件时间窗口。由于入口时间提供了计算完美水印的能力,我们可以使用默认触发器,在这种情况下,当水印通过窗口末端时,它会隐式触发一次。由于每个窗口只有一个输出,因此累积模式无关紧要。
图12 入口时间处理时间窗口
- 与其他处理时间窗口示例一样,即使输入的值和事件时间保持不变,当输入的顺序发生变化时,我们也会得到不同的结果。
- 与其他示例不同,窗口在事件时域中再次描绘(因此沿X轴)。尽管如此,它们并不是真正的事件时间窗口; 我们只是简单地将处理时间映射到事件时间域,删除每个输入的原始记录,并用新的输入替换它,而不是表示管道首次观察数据的时间。
- 尽管如此,由于水印,触发器发射仍然与前一个处理时间示例完全相同。此外,产生的输出值与该示例相同,如预测的那样:左侧为12,21,18,右侧为7,36,4。
如果您关心事件实际发生的时间,您必须使用事件时间窗口,否则您的结果将毫无意义。
Where: session windows
动态的,数据驱动的窗口,称为会话。
会话是一种特殊类型的窗口,它捕获数据中的一段活动,它们在数据分析中特别有用。
- 会话是数据驱动窗口的一个示例:窗口的位置和大小是输入数据本身的直接结果,而不是基于某些预定义模式在时间内,如固定窗口和滑动窗口。
- 会话也是未对齐窗口的示例,即,不是均匀地跨数据应用的窗口,而是仅对数据的特定子集(例如,每个用户)。这与固定窗口和滑动窗口等对齐窗口形成对比,后者通常均匀地应用于数据。
图13 会话
我们来构建一个会话:
PCollection<KV<String, Integer>> scores = input .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1))) .triggering( AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1))) .accumulatingAndRetractingFiredPanes()) .apply(Sum.integersPerKey());
我们得到结果如下:
图14 会话窗口
当遇到值为5的第一个记录时,它被放置在一个原始会话窗口中。
到达的第二个记录是7,它同样被放入它自己的原始会话窗口,因为它不与5的窗口重叠。
同时,水印已经过了第一个窗口的末尾,所以5的值在12:06之前被实现为准时结果。此后不久,第二个窗口也被实现为具有值7的推测结果,正如处理时间达到12:06那样。
我们接下来观察一系列记录,3,4和3,原始会话都重叠。结果,它们全部合并在一起,并且在12:07触发的早期触发时,发出值为10的单个窗口。
当8在此后不久到达时,它与具有值7的原始会话和具有值10的会话重叠。因此所有三个被合并在一起,形成具有值25的新组合会话。
当9到达时,将值为5的原始会话和值为25的会话加入到值为39的单个较大会话中。
这个非常强大的功能,Spark Streaming[2]已经做了实现。
简单回顾一下,我们讨论了事件时间与处理时间,窗口化,水印,触发器,累积。探索了What,When,Where,How四个问题。而最终,我们将平衡正确性,延迟和成本问题,得到最适合自己的实时流式处理方案。
参考资料
[1]
Google Cloud Dataflow: https://cloud.google.com/dataflow/
[2]
Spark Streaming: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/