流式系统:第九章到第十章

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
云解析DNS,个人版 1个月
简介: 流式系统:第九章到第十章

第五章:一次性和副作用

我们现在从讨论编程模型和 API 转向实现它们的系统。模型和 API 允许用户描述他们想要计算的内容。在规模上准确地运行计算需要一个系统——通常是一个分布式系统。

在本章中,我们将重点介绍一个实现系统如何正确实现 Beam 模型以产生准确结果。流处理系统经常谈论一次性处理;也就是确保每个记录只被处理一次。我们将解释我们的意思,并介绍如何实现它。

作为一个激励性的例子,本章重点介绍了 Google Cloud Dataflow 用于有效地保证记录的一次性处理的技术。在本章末尾,我们还将介绍一些其他流行的流处理系统用于保证一次性处理的技术。

为什么一次性很重要

许多用户来说,数据处理管道中出现丢失记录或数据丢失的风险是不可接受的。即便如此,历史上许多通用流处理系统并没有对记录处理提供任何保证——所有处理都只是“尽力而为”。其他系统提供至少一次的保证,确保记录至少被处理一次,但记录可能会重复(从而导致不准确的聚合);实际上,许多这样的至少一次系统在内存中执行聚合,因此当机器崩溃时它们的聚合仍然可能会丢失。这些系统用于低延迟的、推测性的结果,但通常无法保证这些结果的真实性。

正如第一章所指出的,这导致了一个被称为Lambda 架构的策略——运行一个流处理系统以获得快速但不准确的结果。稍后(通常是在一天结束后),批处理系统运行以得到正确的答案。这只有在数据流是可重放的情况下才有效;然而,足够多的数据源都满足这一条件,这种策略被证明是可行的。尽管如此,许多尝试过这种策略的人都遇到了许多 Lambda 架构的问题:

不准确性

用户往往低估了故障的影响。他们经常假设只有少量记录会丢失或重复(通常是基于他们进行的实验),当有一天出现 10%(甚至更多!)的记录丢失或重复时,他们会感到震惊。在某种意义上,这样的系统只提供了“一半”的保证——没有完整的保证,一切皆有可能。

不一致性

用于每日计算的批处理系统通常具有与流处理系统不同的数据语义。让这两个管道产生可比较的结果的过程比最初想象的更加困难。

复杂性

根据定义,Lambda 要求您编写和维护两个不同的代码库。您还必须运行和维护两个复杂的分布式系统,每个系统都有不同的故障模式。除了最简单的管道之外,这很快就变得不堪重负。

不可预测性

在许多用例中,最终用户将看到与每日结果有不确定差异的流处理结果,这种差异可能会随机变化。在这些情况下,用户将停止信任流处理数据,而等待每日批处理结果,从而破坏了首次获得低延迟结果的价值。

延迟

一些业务用例需要低延迟的正确结果,而 Lambda 架构设计上并不提供这种功能。

幸运的是,许多 Beam 运行程序可以做得更好。在本章中,我们将解释一次流处理如何帮助用户依靠准确的结果并避免数据丢失的风险,同时依赖于单一的代码库和 API。由于一系列可能影响管道输出的问题经常被错误地与一次性保证混淆在一起,我们首先解释了在 Beam 和数据处理的上下文中,当我们提到“一次性”时,确切指的是哪些问题在范围内,哪些不在范围内。

准确性与完整性

每当 Beam 管道处理一个记录时,我们希望确保记录永远不会丢失或重复。然而,流水线的特性是有时记录会在时间窗口的聚合已经被处理后出现。Beam SDK 允许用户配置系统应该等待延迟数据的时间;任何(且仅有)晚于截止日期到达的记录都会被丢弃。这个特性有助于完整性,而不是准确性:所有及时到达的记录都会被准确处理一次,而这些延迟的记录则会被明确丢弃。

尽管延迟记录通常是在流式系统的背景下讨论的,但值得注意的是批处理管道也存在类似的完整性问题。例如,一个常见的批处理范例是在凌晨 2 点运行前一天所有数据的作业。然而,如果昨天的一些数据直到凌晨 2 点后才被收集,它就不会被批处理作业处理!因此,批处理管道也提供准确但不总是完整的结果。

副作用

Beam 和 Dataflow 的一个特点是用户可以注入自定义代码,作为他们的管道图的一部分执行。Dataflow 保证该代码仅对每个记录运行一次,¹无论是通过流式处理还是批处理运行器。它可能会多次运行给定的记录通过用户转换,甚至可能同时在多个工作器上运行相同的记录;这是为了保证至少一次的处理在工作器故障的情况下。这些调用中只有一个可以“获胜”并在管道中产生输出。

因此,不幂等的副作用不能保证只执行一次;如果您编写的代码对外部服务具有副作用,例如联系外部服务,这些效果可能会对给定记录执行多次。这种情况通常是不可避免的,因为没有办法在 Dataflow 的处理与外部服务的副作用之间进行原子提交。管道最终需要将结果发送到外部世界,这样的调用可能不是幂等的。正如你将在本章后面看到的,这样的输出通常能够添加一个额外的阶段来将调用重构为幂等操作。

问题定义

所以,我们给出了一些我们讨论的例子。那么我们所说的一次性处理是什么意思呢?为了激励这一点,让我们从一个简单的流水线开始,²如示例 5-1 所示。

示例 5-1. 一个简单的流水线
Pipeline p = Pipeline.create(options);
// Calculate 1-minute counts of events per user.
PCollection<..> perUserCounts = 
      p.apply(ReadFromUnboundedSource.read())
       .apply(new KeyByUser())
       .Window.<..>into(FixedWindows.of(Duration.standardMinutes(1)))
       .apply(Count.perKey());
// Process these per-user counts, and write the output somewhere.
perUserCounts.apply(new ProcessPerUserCountsAndWriteToSink());
// Add up all these per-user counts to get 1-minute counts of all events.
perUserCounts.apply(Values.<..>create())
             .apply(Count.globally())
             .apply(new ProcessGlobalCountAndWriteToSink());
p.run();

这个管道计算了两种不同的窗口聚合。第一个计算了每个用户在一分钟内来自多少事件,第二个计算了每分钟总共有多少事件。这两个聚合都写入了未指定的流式输出。

请记住,Dataflow 并行在许多不同的工作器上执行管道。在每个GroupByKeyCount操作在底层使用GroupByKey),所有具有相同键的记录都在同一台机器上进行shuffle处理。Dataflow 工作器使用远程过程调用(RPC)在它们之间进行数据洗牌,确保给定键的记录都最终在同一台机器上。

图 5-1 显示了 Dataflow 为示例 5-1 中的管道创建的洗牌。³Count.perKey将每个用户的所有数据洗牌到给定的工作器,而Count.globally将所有这些部分计数洗牌到一个单一的工作器以计算全局总和。

图 5-1. 管道中的洗牌

为了 Dataflow 准确处理数据,这个 shuffle 过程必须确保每个记录只被洗牌一次。正如你将在下一刻看到的,shuffle 的分布式特性使得这成为一个具有挑战性的问题。

这个管道还可以从外部世界读取和写入数据,因此 Dataflow 必须确保这种交互不会引入任何不准确性。Dataflow 一直支持这项任务,即 Apache Spark 和 Apache Flink 所称的“端到端精确一次”,只要在技术上可行的情况下,对于数据源和数据汇。

本章的重点将放在三件事情上:

洗牌

Dataflow 如何保证每条记录只被洗牌一次。

数据源

Dataflow 如何保证每个源记录只被处理一次。

数据汇

Dataflow 如何保证每个数据汇产生准确的输出。

确保洗牌中的精确一次

正如刚才解释的,Dataflow 的流式洗牌使用 RPC。现在,每当有两台机器通过 RPC 进行通信时,都应该认真考虑数据完整性。首先,RPC 可能因为很多原因而失败。网络可能中断,RPC 可能在完成之前超时,或者接收服务器可能决定失败调用。为了保证记录在洗牌过程中不会丢失,Dataflow 采用了“上游备份”。这意味着发送方将重试 RPC,直到收到接收确认。Dataflow 还确保即使发送方崩溃,它也会继续重试这些 RPC。这保证了每条记录至少被传递一次。

现在的问题是,这些重试可能会产生重复。大多数 RPC 框架,包括 Dataflow 使用的框架,都会为发送方提供成功或失败的状态。在分布式系统中,你需要意识到 RPC 有时可能会在看似失败的情况下成功。这有很多原因:与 RPC 超时的竞争条件,服务器的积极确认尽管 RPC 成功但传输失败,等等。发送方真正可以信任的唯一状态是成功的状态。

返回失败状态的 RPC 通常表示调用可能成功也可能失败。尽管特定的错误代码可以传达明确的失败,但许多常见的 RPC 失败,如超过截止日期,都是模棱两可的。在流式洗牌的情况下,重试一个真正成功的 RPC 意味着将记录传递两次!Dataflow 需要一种方法来检测和删除这些重复。

在高层次上,这个任务的算法非常简单(见图 5-2):每个发送的消息都带有一个唯一标识符。每个接收者都存储了已经被看到和处理的所有标识符的目录。每次接收到一条记录时,它的标识符都会在这个目录中查找。如果找到了,记录就会被丢弃为重复。因为 Dataflow 是建立在可扩展的键/值存储之上的,所以这个存储被用来保存去重目录。

图 5-2。在洗牌中检测重复

解决确定性问题

然而,在现实世界中使这种策略生效需要非常小心。一个立即显现的问题是,Beam 模型允许用户代码产生非确定性输出。这意味着ParDo可能会对相同的输入记录执行两次(由于重试),但每次重试可能会产生不同的输出。期望的行为是,只有一个输出会提交到管道中;然而,涉及的非确定性使得很难保证这两个输出具有相同的确定性 ID。更棘手的是,ParDo可以输出多条记录,因此每次重试可能会产生不同数量的输出!

那么,为什么我们不要求所有用户处理都是确定性的呢?我们的经验是,在实践中,许多管道需要非确定性转换。而且很多时候,管道作者并没有意识到他们编写的代码是非确定性的。例如,考虑一个在 Cloud Bigtable 中查找补充数据以丰富其输入数据的转换。这是一个非确定性的任务,因为外部值可能会在转换的重试之间发生变化。任何依赖当前时间的代码也是不确定的。我们还看到需要依赖随机数生成器的转换。即使用户代码是纯确定的,任何允许延迟数据的事件时间聚合也可能具有非确定性的输入。

Dataflow 通过使用检查点来使非确定性处理有效地变为确定性来解决这个问题。每个转换的输出与其唯一 ID 一起被检查点到稳定存储中,然后再传递到下一个阶段之前。⁵在洗牌传递中的任何重试都只是重放已经被检查点的输出 - 用户的非确定性代码不会在重试时再次运行。换句话说,用户的代码可能会运行多次,但只有其中一个运行可以“获胜”。此外,Dataflow 使用一致的存储,可以防止重复写入稳定存储。

性能

为了实现精确一次的洗牌传递,每个接收器键中都存储了记录 ID 的目录。对于到达的每个记录,Dataflow 查找已经看到的 ID 目录,以确定这个记录是否是重复的。从一步到另一步的每个输出都被检查点到存储中,以确保生成的记录 ID 是稳定的。

然而,除非实施得当,否则这个过程会通过增加大量的读写来显著降低客户的管道性能。因此,为了使 Dataflow 用户的精确一次处理可行,必须减少 I/O,特别是通过阻止每个记录上的 I/O。

Dataflow 通过两种关键技术实现了这一目标:图优化Bloom 过滤器

图优化

在执行管道之前,Dataflow 服务对管道图运行一系列优化。其中一种优化是融合,在这种优化中,服务将许多逻辑步骤融合成单个执行阶段。图 5-3 显示了一些简单的示例。

图 5-3. 示例优化:融合

所有融合的步骤都作为一个内部单元运行,因此不需要为它们中的每一个存储精确一次数据。在许多情况下,融合将整个图减少到几个物理步骤,大大减少了所需的数据传输量(并节省了状态使用)。

Dataflow 还通过在将数据发送到主要分组操作之前在本地执行部分组合来优化关联和交换的Combine操作(例如CountSum),如图 5-4 所示。这种方法可以大大减少传递的消息数量,因此也减少了读写的数量。

图 5-4. 示例优化:组合器提升

布隆过滤器

上述优化是改进精确一次性性能的通用技术。对于严格旨在改进精确一次处理的优化,我们转向Bloom 过滤器

在一个健康的管道中,大多数到达的记录都不是重复的。我们可以利用这一点通过布隆过滤器大大提高性能,布隆过滤器是一种紧凑的数据结构,可以快速进行成员检查。布隆过滤器有一个非常有趣的特性:它们可以返回误报,但永远不会返回假阴性。如果过滤器说“是的,元素在集合中”,我们知道该元素可能在集合中(可以计算概率)。然而,如果过滤器说一个元素在集合中,那么它肯定不在。这个功能非常适合当前的任务。

Dataflow 中的实现方式如下:每个工作节点都保留了它所见过的每个 ID 的布隆过滤器。每当出现新的记录 ID 时,它会在过滤器中查找。如果过滤器返回 false,则该记录不是重复的,工作节点可以跳过更昂贵的稳定存储查找。只有当布隆过滤器返回 true 时,它才需要进行第二次查找,但只要过滤器的误报率低,这一步就很少需要。

然而,随着时间的推移,布隆过滤器往往会填满,这样做的话,误报率会增加。此外,每当工作节点重新启动时,我们还需要通过扫描状态中存储的 ID 目录来构建这个布隆过滤器。有帮助的是,Dataflow 为每条记录附加了一个系统时间戳。因此,服务不是创建一个单一的布隆过滤器,而是为每个 10 分钟范围创建一个单独的布隆过滤器。当记录到达时,Dataflow 根据系统时间戳查询适当的过滤器。这一步防止了布隆过滤器饱和,因为随着时间的推移,过滤器会被垃圾回收,并且它也限制了需要在启动时扫描的数据量。

图 5-5 说明了这个过程:记录到达系统并根据它们的到达时间被分配到一个布隆过滤器。第一个过滤器中的记录都不是重复的,它们的所有目录查找都被过滤了。记录r1被传递了第二次,因此需要进行目录查找以验证它是否确实是重复的;对于记录r4r6也是如此。记录r8不是重复的;然而,由于它的布隆过滤器中出现了误报,生成了一个目录查找(这将确定r8不是重复的,应该被处理)。

图 5-5. 一次性布隆过滤器

垃圾回收

每个 Dataflow 工作节点都持久存储了它所见过的唯一记录 ID 的目录。由于 Dataflow 的状态和一致性模型是按键的,实际上每个键都存储了传递到该键的记录的目录。我们不能永远存储这些标识符,否则所有可用的存储空间最终都会被填满。为了避免这个问题,您需要对已确认的记录 ID 进行垃圾回收。

实现这一目标的一种策略是,发送方为了跟踪仍在传输中的最早序列号(对应于未确认的记录传递),为每条记录标记一个严格递增的序列号。目录中具有较早序列号的任何标识符都可以进行垃圾回收,因为所有较早的记录都已经被确认。

然而,有一个更好的选择。如前所述,Dataflow 已经为每个记录标记了一个系统时间戳,用于分桶一次性布隆过滤器。因此,Dataflow 不是使用序列号来垃圾回收一次性目录,而是基于这些系统时间戳计算垃圾回收水印(这是第三章讨论的处理时间水印)。这种方法的一个好处是,因为这个水印是基于在给定阶段等待的物理时间量(不像数据水印是基于自定义事件时间),它提供了对管道的哪些部分是慢的直觉。这些元数据是 Dataflow WebUI 中显示的系统滞后度指标的基础。

如果一个记录到达时带有旧的时间戳,而我们已经对这个时间点的标识符进行了垃圾回收,会发生什么?这可能是由于我们称之为网络残留的影响,其中一个旧消息在网络中停留了无限期,然后突然出现。垃圾回收触发的低水位不会提前,直到记录交付被确认,因此我们知道这个记录已经被成功处理。这样的网络残留显然是重复的,会被忽略。

在数据源中精确执行一次

Beam 提供了一个用于将数据读入 Dataflow 管道的源 API。⁹ 如果处理失败并且需要确保每个数据源产生的唯一记录被精确执行一次,Dataflow 可能会重试从源读取数据。

对于大多数数据源,Dataflow 会在后台处理这个过程;这些数据源是确定性的。例如,考虑一个从文件中读取数据的数据源。文件中的记录总是以确定性顺序和确定性字节位置出现,无论文件被读取多少次。¹⁰ 文件名和字节位置唯一标识每个记录,因此服务可以自动生成每个记录的唯一 ID。另一个提供类似确定性保证的数据源是 Apache Kafka;每个 Kafka 主题被分成一个静态的分区集,分区中的记录总是有确定性顺序的。这样的确定性数据源将在 Dataflow 中无重复地工作。

然而,并非所有的数据源都是如此简单。例如,Dataflow 管道的一个常见数据源是 Google Cloud Pub/Sub。Pub/Sub 是一个不确定性的数据源:多个订阅者可以从 Pub/Sub 主题中拉取消息,但哪些订阅者接收到给定的消息是不可预测的。如果处理失败,Pub/Sub 将重新传递消息,但消息可能会被传递给与最初处理它们的不同工作器,并且顺序也可能不同。这种不确定性行为意味着 Dataflow 需要帮助来检测重复,因为服务无法确定地分配在重试时稳定的记录 ID。(我们将在本章后面更详细地研究 Pub/Sub 的一个案例。)

因为 Dataflow 无法自动分配记录 ID,不确定性数据源需要通知系统记录 ID 应该是什么。Beam 的源 API 提供了UnboundedReader.getCurrentRecordId¹¹方法。如果一个数据源为每个记录提供唯一的 ID,并通知 Dataflow 它需要去重,¹²具有相同 ID 的记录将被过滤掉。

在汇聚中精确执行一次

在某个时候,每个管道都需要向外部输出数据,而汇聚是简单地执行这一操作的转换。请记住,向外部传递数据是一种副作用,我们已经提到 Dataflow 不能保证副作用的精确执行一次。那么,汇聚如何保证输出只被传递一次呢?

最简单的答案是 Beam SDK 提供了一些内置的汇聚。这些汇聚经过精心设计,以确保它们不会产生重复,即使执行多次。在可能的情况下,鼓励管道作者使用其中一个内置的汇聚。

然而,有时内置功能是不够的,你需要编写自己的功能。最好的方法是确保你的副作用操作是幂等的,因此在重播时是稳健的。然而,通常副作用DoFn的某些组件是不确定的,因此在重播时可能会发生变化。例如,在窗口聚合中,窗口中的记录集也可能是不确定的!

具体来说,窗口可能尝试使用元素e0e1e2触发,但工作器在提交窗口处理之前崩溃(但在这些元素作为副作用发送之前没有崩溃)。当工作器重新启动时,窗口将再次触发,但现在会出现一个延迟元素e3。因为这个元素在窗口提交之前出现,所以它不被视为延迟数据,所以DoFn会再次调用元素e0e1e2e3。然后这些元素被发送到副作用操作。在这里幂等性是无法帮助的,因为每次发送的是不同的逻辑记录集。

还有其他引入不确定性的方式。解决这种风险的标准方法是依赖于 Dataflow 目前保证只有一个DoFn的输出版本可以通过洗牌边界。¹³

利用这一保证的一种简单方法是通过内置的Reshuffle转换。示例 5-2 中提出的模式确保副作用操作始终接收到一个确定性的记录以输出。

示例 5-2。重排示例
c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

前面的管道将接收端分为两个步骤:PrepareOutputDataWriteToSideEffect。如果我们简单地依次运行,整个过程可能会在故障时重播,PrepareOutputData可能会产生不同的结果,并且两者都将被写入为副作用。当我们在两者之间添加Reshuffle时,Dataflow 保证这种情况不会发生。

当然,Dataflow 可能仍然多次运行WriteToSideEffect操作。这些副作用本身仍然需要是幂等的,否则接收端将收到重复的数据。例如,设置或覆盖数据存储中的值的操作是幂等的,即使运行多次,也会生成正确的输出。向列表追加的操作不是幂等的;如果操作运行多次,每次都会追加相同的值。

虽然Reshuffle提供了一种简单的方法来实现对DoFn的稳定输入,但GroupByKey同样有效。然而,目前有一个提案,可以消除添加GroupByKey以实现对DoFn的稳定输入的需要。相反,用户可以使用特殊注解@RequiresStableInput注解WriteToSideEffect,系统将确保该转换的输入稳定。

用例

为了说明这一点,让我们来看一些内置的源和接收端,看看它们如何实现上述模式。

示例来源:Cloud Pub/Sub

Cloud Pub/Sub 是一个完全托管的、可扩展的、可靠的、低延迟的系统,用于将消息从发布者传递给订阅者。发布者在命名主题上发布数据,订阅者创建命名订阅以从这些主题中拉取数据。可以为单个主题创建多个订阅,这种情况下,每个订阅从创建订阅时刻起都会接收到主题上发布的所有数据的完整副本。Pub/Sub 保证记录将继续传递直到被确认;但是,一条记录可能会被传递多次。

Pub/Sub 旨在用于分布式使用,因此许多发布过程可以发布到同一个主题,许多订阅过程可以从同一个订阅中拉取。在记录被拉取后,订阅者必须在一定时间内确认它,否则该拉取将过期,Pub/Sub 将重新将该记录传递给另一个订阅过程。

尽管这些特性使 Pub/Sub 具有高度可扩展性,但这也使它成为 Dataflow 等系统的一个具有挑战性的数据源。不可能知道哪个记录会被传递给哪个工作器,以及以什么顺序。更重要的是,在发生故障的情况下,重新传递可能会以不同的顺序将记录发送到不同的工作器!

Pub/Sub 为每条消息提供一个稳定的消息 ID,并且在重新传递时该 ID 将保持不变。Dataflow Pub/Sub 源将默认使用此 ID 来从 Pub/Sub 中删除重复项。(记录根据 ID 的哈希进行洗牌,因此重复的传递总是在同一个工作器上处理。)然而,在某些情况下,这还不够。用户的发布过程可能会重试发布,并因此将重复项引入 Pub/Sub。从该服务的角度来看,这些是唯一的记录,因此它们将获得唯一的记录 ID。Dataflow 的 Pub/Sub 源允许用户提供自己的记录 ID 作为自定义属性。只要发布者在重试时发送相同的 ID,Dataflow 就能够检测到这些重复项。

Beam(因此 Dataflow)为 Pub/Sub 提供了一个参考源实现。但是,请记住,这不是Dataflow 使用的,而是仅由非 Dataflow 运行器(如 Apache Spark,Apache Flink 和 DirectRunner)使用的实现。出于各种原因,Dataflow 在内部处理 Pub/Sub,并且不使用公共 Pub/Sub 源。

示例接收器:文件

流式运行器可以使用 Beam 的文件接收器(TextIOAvroIO和任何实现FileBasedSink的其他接收器)来持续将记录输出到文件。示例 5-3 提供了一个示例用例。

示例 5-3。窗口化文件写入
c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(TextIO.writeStrings().to(new MyNamePolicy()).withWindowedWrites());

示例 5-3 中的片段每分钟写入 10 个新文件,其中包含该窗口的数据。MyNamePolicy是一个用户编写的函数,根据分片和窗口确定输出文件名。您还可以使用触发器,在这种情况下,每个触发器窗格将作为一个新文件输出。

这个过程是使用示例 5-3 中的模式的变体实现的。文件被写入临时位置,这些临时文件名通过GroupByKey发送到后续的转换。在GroupByKey之后是一个最终转换,它会将临时文件原子地移动到它们的最终位置。示例 5-4 中的伪代码提供了 Beam 中一致的流式文件接收器的实现草图。(有关更多详细信息,请参见 Beam 代码库中的FileBasedSinkWriteFiles。)

示例 5-4。文件接收器
c
  // Tag each record with a random shard id.
  .apply("AttachShard", WithKeys.of(new RandomShardingKey(getNumShards())))
  // Group all records with the same shard.
  .apply("GroupByShard", GroupByKey.<..>())
  // For each window, write per-shard elements to a temporary file. This is the 
  // non-deterministic side effect. If this DoFn is executed multiple times, it will
  // simply write multiple temporary files; only one of these will pass on through 
  // to the Finalize stage.
  .apply("WriteTempFile", ParDo.of(new DoFn<..> {
    @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) {
       // Write the contents of c.element() to a temporary file.
       // User-provided name policy used to generate a final filename.
      c.output(new FileResult()).
    }
  }))
  // Group the list of files onto a singleton key.
  .apply("AttachSingletonKey", WithKeys.<..>of((Void)null))
  .apply("FinalizeGroupByKey", GroupByKey.<..>create())
  // Finalize the files by atomically renaming them. This operation is idempotent. 
  // Once this DoFn has executed once for a given FileResult, the temporary file 
  // is gone, so any further executions will have no effect. 
  .apply("Finalize", ParDo.of(new DoFn<..>, Void> {
    @ProcessElement
     public void processElement(ProcessContext c)  {
       for (FileResult result : c.element()) { 
         rename(result.getTemporaryFileName(), result.getFinalFilename());
       }
}}));

您可以看到WriteTempFile中的非幂等工作是如何完成的。在GroupByKey完成后,Finalize步骤将始终看到相同的捆绑包进行重试。因为文件重命名是幂等的,¹⁴这给了我们一个恰好一次的接收器。

示例接收器:Google BigQuery

Google BigQuery 是一个完全托管的云原生数据仓库。Beam 提供了 BigQuery 接收器,BigQuery 提供了支持极低延迟插入的流式插入 API。这个流式插入 API 允许您为每个记录标记插入一个唯一的 ID,并且 BigQuery 将尝试使用相同的 ID 过滤重复的插入。¹⁵为了使用这个功能,BigQuery 接收器必须为每条记录生成统计上唯一的 ID。它通过使用java.util.UUID包来实现这一点,该包生成统计上唯一的 128 位 ID。

生成随机的通用唯一标识符(UUID)是一个非确定性操作,因此我们必须在插入到 BigQuery 之前添加Reshuffle。这样做后,Dataflow 的任何重试都将始终使用相同的被洗牌的 UUID。对 BigQuery 的重复尝试插入将始终具有相同的插入 ID,因此 BigQuery 能够对其进行过滤。示例 5-5 中显示的伪代码说明了 BigQuery 接收器的实现方式。

示例 5-5。BigQuery 接收器
// Apply a unique identifier to each record
c
 .apply(new DoFn<> {
  @ProcessElement
  public void processElement(ProcessContext context) {
   String uniqueId = UUID.randomUUID().toString();
   context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
                                     new RecordWithId(context.element(), uniqueId)));
 }
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> {
   @ProcessElement
   public void processElement(ProcessContext context) {
     insertIntoBigQuery(context.element().record(), context.element.id());
   }
 });

再次,我们将接收器分成一个非幂等步骤(生成随机数),然后是一个幂等步骤。

其他系统

现在我们已经详细解释了 Dataflow 的恰好一次,让我们将其与其他流行的流式系统的简要概述进行对比。每个系统以不同的方式实现恰好一次保证,并因此做出不同的权衡。

Apache Spark Streaming

Spark Streaming 使用微批处理架构进行连续数据处理。用户在逻辑上处理一个流对象;然而,在底层,Spark 将这个流表示为连续的一系列 RDD。¹⁶ 每个 RDD 都作为一个批次进行处理,Spark 依赖批处理的精确一次性特性来确保正确性;正如之前提到的,正确的批处理洗牌技术已经有一段时间了。这种方法可能会导致输出的延迟增加,特别是对于深层管道和高输入量,通常需要仔细调整才能实现所需的延迟。

Spark 假设操作都是幂等的,并且可能重放操作链直到当前图中的点。提供了一个检查点原语,可以导致一个 RDD 被实体化,从而保证该 RDD 之前的历史不会被重放。这个检查点功能是为了性能原因而设计的(例如,防止重放昂贵的操作);然而,您也可以使用它来实现非幂等的副作用。

Apache Flink

Apache Flink 还为流式管道提供了精确一次处理,但是它的方式与 Dataflow 或 Spark 不同。Flink 流式管道定期计算一致的快照,每个快照代表整个管道在一致时间点的状态。Flink 快照是逐步计算的,因此在计算快照时无需停止所有处理。这使得记录可以在系统中继续流动,同时进行快照,缓解了 Spark Streaming 方法的一些延迟问题。

Flink 通过向从源流出的数据流插入特殊编号的快照标记来实现这些快照。当每个算子接收到快照标记时,它执行特定的算法,使其将状态复制到外部位置,并将快照标记传播到下游算子。在所有算子执行完这个快照算法后,完整的快照就可用了。任何工作器故障都将导致整个管道从最后一个完整快照中回滚其状态。在途消息不需要包含在快照中。Flink 中的所有消息传递都是通过有序的基于 TCP 的通道完成的。任何连接故障都可以通过从最后一个良好序列号恢复连接来处理;¹⁷ 与 Dataflow 不同,Flink 任务是静态分配给工作器的,因此可以假定连接将从相同的发送方恢复,并重放相同的有效载荷。

由于 Flink 可能随时回滚到先前的快照,尚未在快照中的任何状态修改都必须被视为临时的。将数据发送到 Flink 管道外部世界的接收器必须等到快照完成,然后只发送包含在该快照中的数据。Flink 提供了一个 notifySnapshotComplete 回调,允许接收器在每个快照完成时得知,并发送数据。尽管这会影响 Flink 管道的输出延迟,¹⁸ 但这种延迟只在接收器处引入。实际上,这使得 Flink 在深层管道中的端到端延迟比 Spark 更低,因为 Spark 在管道的每个阶段都引入了批处理延迟。

Flink 的分布式快照是处理流式管道一致性的一种优雅方式;然而,对管道做出了一些假设。假设故障是罕见的,¹⁹ 因为故障的影响(回滚到先前的快照)是重大的。为了保持低延迟输出,还假设快照可以快速完成。尚不清楚这是否会在非常大的集群中引起问题,那里的故障率可能会增加,完成快照所需的时间也会增加。

实现也简化了,因为假设任务静态分配给工作程序(至少在单个快照时期内)。这个假设允许 Flink 在工作程序之间提供简单的一次性传输,因为它知道如果连接失败,相同的数据可以按顺序从同一个工作程序中拉取。相比之下,Dataflow 不断地在工作程序之间进行负载平衡(并且工作程序的集合不断增长和缩减),因此 Dataflow 无法做出这个假设。这迫使 Dataflow 实现一个更复杂的传输层,以提供一次性处理。

总结

总之,曾经被认为与低延迟结果不兼容的一次性数据处理是完全可能的——Dataflow 在不牺牲延迟的情况下高效地实现了这一点。这为流处理提供了更丰富的用途。

尽管本章重点介绍了 Dataflow 特定的技术,其他流处理系统也提供了一次性保证。Apache Spark Streaming 将流式管道作为一系列小批处理作业运行,依赖于 Spark 批处理运行器中的一次性保证。Apache Flink 使用 Chandy Lamport 分布式快照的变体来获得运行一致状态,并可以使用这些快照来确保一次性处理。我们鼓励您也了解这些其他系统,以便广泛了解不同的流处理系统的工作方式!

¹ 实际上,我们所知道的没有一个系统能够保证至少一次(或更好),包括所有其他 Beam 运行器。

² Dataflow 还提供了准确的批处理运行器;然而,在这个上下文中,我们专注于流式运行器。

³ Dataflow 优化器将许多步骤组合在一起,并仅在需要时添加洗牌。

⁴ 批处理管道也需要防范洗牌中的重复项。但是,在批处理中解决这个问题要容易得多,这就是为什么历史批处理系统会这样做而流式系统不会这样做的原因。使用微批处理架构的流式运行时,比如 Spark Streaming,将重复项检测委托给批处理洗牌器。

⁵ 我们非常小心确保这种检查点是高效的;例如,与底层键/值存储的特性密切相关的模式和访问模式优化。

⁶ 这不是用于窗口化的自定义用户提供的时间戳。相反,这是由发送工作程序分配的确定性处理时间时间戳。

⁷ 需要小心确保这个算法能够运行。每个发送者必须保证系统生成的时间戳严格递增,并且这个保证必须在工作重新启动时保持不变。

⁸ 从理论上讲,我们可以通过在一个桶中的时间戳达到阈值时才懒惰地构建 Bloom 过滤器来完全摒弃启动扫描。

⁹ 在撰写本文时,Apache Beam 提供了一个名为SplittableDoFn的新的、更灵活的 API。

¹⁰ 我们假设在我们读取文件时没有人恶意修改文件中的字节。

¹¹ 再次注意,SplittableDoFn API具有不同的方法。

¹² 使用requiresDedupping覆盖。

¹³ 请注意,这些确定性边界可能在某个时候在 Beam 模型中变得更加明确。其他 Beam 运行器在处理非确定性用户代码的能力上有所不同。

¹⁴ 只要在源文件不再存在时正确处理故障。

¹⁵ 由于服务的全局性质,BigQuery 不能保证所有重复项都被移除。用户可以定期对他们的表运行查询,以移除流式插入 API 没有捕捉到的任何重复项。有关更多信息,请参阅 BigQuery 文档。

¹⁶ 弹性分布式数据集;Spark 对分布式数据集的抽象,类似于 Beam 中的 PCollection。

¹⁷ 这些序列号是针对每个连接的,与快照时期编号无关。

¹⁸ 仅适用于非幂等的接收器。完全幂等的接收器不需要等待快照完成。

¹⁹ 具体来说,Flink 假设工作器故障的平均时间小于快照时间;否则,管道将无法取得进展。

第二部分:流和表

第六章:流与表

你已经到达了书中讨论流和表的部分。如果你还记得,在第一章中,我们简要讨论了数据的两个重要但正交的维度:基数构成。到目前为止,我们严格关注基数方面(有界与无界),并且忽略了构成方面(流与表)。这使我们能够了解无界数据集引入的挑战,而不用太担心真正驱动事物运作方式的底层细节。我们现在将扩展我们的视野,看看构成的增加维度给混合带来了什么。

虽然有点牵强,但一种思考这种方法转变的方式是将经典力学与量子力学的关系进行比较。你知道在物理课上他们教你一堆经典力学的东西,比如牛顿理论之类的,然后在你觉得你更或多少掌握了之后,他们告诉你那都是废话,经典物理只给你部分图景,实际上还有这个叫做量子力学的东西,它真正解释了更低层次的事物运作方式,但一开始让事情变得复杂并不合理,所以…哦等等…我们之间还没有完全协调好一切,所以只是眯着眼睛相信我们,总有一天一切都会有意义?嗯,这很像那个,只是你的大脑会疼得少一些,因为物理学比数据处理难得多,你也不必眯着眼睛假装一切都有意义,因为实际上最后一切都会美好地结合在一起,这真的很酷。

所以,舞台已经适当地设置好了,这一章的重点有两个:

  • 试图描述 Beam 模型(就像我们在书中描述的那样)与“流和表”理论(由Martin KleppmannJay Kreps等人普及,但实质上起源于数据库世界)之间的关系。事实证明,流和表理论很好地描述了 Beam 模型的底层概念。此外,当考虑如何将健壮的流处理概念清晰地集成到 SQL 中时,对它们之间的关系有一个清晰的理解尤为重要(这是我们在第八章中考虑的内容)。
  • 为了纯粹的乐趣而向你轰炸糟糕的物理学类比。写一本书是一项艰苦的工作;你必须在这里找到一点点小乐趣来继续前行。

流与表的基础知识或者说:流与表的相对论特殊理论

流和表的基本概念源自数据库世界。熟悉 SQL 的人可能熟悉表及其核心属性,大致总结为:表包含数据的行和列,每行都由某种键唯一标识,可以是显式的也可以是隐式的。

如果你回想一下大学数据库系统课程,你可能会记得大多数数据库的数据结构是追加日志。当事务被应用到数据库中的表时,这些事务被记录在日志中,日志的内容然后被串行应用到表中以实现这些更新。在流和表的命名法中,该日志实际上就是流。

从这个角度来看,我们现在明白了如何从流创建表:表只是应用于流中找到的更新事务日志的结果。但是我们如何从表创建流呢?本质上是相反的:流是表的更改日志。通常用于表到流转换的激励示例是物化视图。SQL 中的物化视图允许您在表上指定查询,然后数据库系统将其本身作为另一个一流表来实现。这个物化视图本质上是该查询的缓存版本,数据库系统确保它始终保持最新,因为源表的内容随时间演变。也许并不奇怪,物化视图是通过原始表的更改日志实现的;每当源表更改时,该更改都会被记录。然后数据库在物化视图查询的上下文中评估该更改,并将任何结果更改应用于目标物化视图表。

将这两点结合起来,并运用另一个值得怀疑的物理学类比,我们就得到了可以称之为流和表相对论的特殊理论:

流→表

随时间对更新流的聚合产生一个表。

表→流

随时间观察表的变化产生一个流。

这是一对非常强大的概念,它们对流处理世界的精心应用是 Apache Kafka 巨大成功的一个重要原因,这个生态系统是围绕这些基本原则构建的。然而,这些陈述本身并不够一般化,无法将流和表与 Beam 模型中的所有概念联系起来。为此,我们必须深入一点。

朝着流和表相对论的一般理论

如果我们想要将流/表理论与我们对 Beam 模型的所有了解联系起来,我们需要解决一些问题,具体来说:

  • 批处理如何融入其中?
  • 流与有界和无界数据集的关系是什么?
  • 四个“什么”、“哪里”、“何时”、“如何”问题如何映射到流/表世界?

当我们试图这样做时,对流和表有正确的心态将会有所帮助。除了理解它们之间的关系,如前面的定义所捕捉的那样,独立于彼此定义它们也是有启发性的。以下是一个简单的看待它的方式,将强调我们未来分析的一些内容:

  • 表是静态数据。
    这并不是说表在任何方面都是静态的;几乎所有有用的表在某种程度上都在不断变化。但在任何给定时间,表的快照提供了数据集的某种整体图片。在这方面,表充当数据随时间累积和观察的概念休息地。因此,静态数据。
  • 流是运动中的数据。
    表捕捉了在特定时间点数据集的整体视图,而流捕捉了数据随时间的演变。Julian Hyde 喜欢说流就像表的导数,而表就像流的积分,这对于数学思维的人来说是一个很好的思考方式。不管怎样,流的重要特征是它们捕捉了表内数据的固有运动,因为它改变了。因此,数据在运动。

尽管表和流密切相关,但重要的是要记住它们并不完全相同,即使有许多情况下,一个可能完全源自另一个。差异微妙但重要,我们将会看到。

批处理与流和表

现在我们已经做好了准备,让我们开始解决一些问题。首先,我们解决第一个问题,关于批处理。最后,我们会发现,关于流与有界和无界数据的关系的解决方案将自然而然地从第一个问题的答案中得出。这是巧合的一次得分。

MapReduce 的流和表分析

为了保持我们的分析相对简单,但又坚实具体,让我们看看传统MapReduce作业如何适应流/表世界。正如其名称所暗示的,MapReduce 作业表面上由两个阶段组成:Map 和 Reduce。然而,为了我们的目的,更深入地看待它并将其视为六个阶段是有用的:

MapRead

这个阶段消耗输入数据并对其进行一些预处理,使其成为映射的标准键值形式。

Map

这个阶段重复(和/或并行)从预处理输入中消耗一个键值对³,并输出零个或多个键值对。

MapWrite

这个阶段将具有相同键的 Map 阶段输出值组合在一起,并将这些键值对列表组写入(临时)持久存储。这样,MapWrite 阶段本质上是一个按键分组和检查点操作。

ReduceRead

这个阶段消耗保存的洗牌数据,并将它们转换成标准的键值对列表形式以便进行减少。

Reduce

这个阶段重复(和/或并行)消耗一个键及其关联的值记录列表,并输出零个或多个记录,所有这些记录都可以选择保持与相同键相关联。

ReduceWrite

这个阶段将 Reduce 阶段的输出写入输出数据存储。

请注意,MapWrite 和 ReduceRead 阶段有时被称为 Shuffle 阶段的一部分,但对于我们的目的,最好将它们视为独立的阶段。也值得注意的是,MapRead 和 ReduceWrite 阶段的功能如今更常被称为源和汇。然而,撇开这些,现在让我们看看这与流和表的关系。

Map 作为流/表

因为我们从静态⁴数据集开始并结束,所以很明显我们从一个表开始并以一个表结束。但在中间我们有什么?天真地,人们可能会认为中间都是表;毕竟,批处理(概念上)被认为是消耗和产生表。如果你把批处理作业看作是执行经典 SQL 查询的粗略类比,那感觉相对自然。但让我们更仔细地看看一步步发生了什么。

首先,MapRead 消耗一个表并产生某物。接下来,Map 阶段消耗了这个东西,所以如果我们想要了解它的性质,一个好的起点就是 Map 阶段的 API,它在 Java 中看起来像这样:

void map(KI key, VI value, Emit<KO, VO> emitter);

对于输入表中的每个键值对,map 调用将被重复调用。如果你觉得这听起来像输入表被消耗为记录流,那么你是对的。我们稍后会更仔细地看一下表是如何转换为流的,但现在,可以说 MapRead 阶段正在迭代输入表中的静态数据,并将它们以流的形式放入运动中,然后被 Map 阶段消耗。

接下来,Map 阶段消耗了这个流,然后做了什么?因为映射操作是逐元素转换,它并没有做任何会停止移动元素并使其休息的事情。它可能通过过滤一些元素或将一些元素分解成多个元素来改变流的有效基数,但在 Map 阶段结束后,这些元素仍然相互独立。因此,可以说 Map 阶段既消耗流,又产生流。

在 Map 阶段完成后,我们进入 MapWrite 阶段。正如我之前所指出的,MapWrite 通过键分组记录,然后以该格式将它们写入持久存储。实际上,在这一点上,写入的持久部分实际上并不是严格必要的,只要某个地方有持久性(即,如果上游输入被保存,并且在失败的情况下可以从中重新计算中间结果,类似于 Spark 对 Resilient Distributed Datasets(RDDs)采取的方法)。重要的是记录被组合到某种数据存储中,无论是在内存中,磁盘上,还是其他位置。这很重要,因为由于这个分组操作,以前在流中一个接一个地飞过的记录现在被带到由它们的键所指示的位置,从而允许每个键组积累,就像它们的同类兄弟姐妹到达一样。请注意,这与之前提供的流到表转换的定义有多么相似:随着时间的推移,对更新流的聚合产生了一个表。通过根据它们的键对记录进行分组,MapWrite 阶段使这些数据得到休息,从而将流转换回表。⁵酷!

现在我们已经完成了 MapReduce 的一半,所以,使用图 6-1,让我们回顾一下到目前为止我们所看到的内容。

我们已经通过三个操作从表转换为流,然后再转换回来。MapRead 将表转换为流,然后 Map(通过用户的代码)将其转换为新流,然后 MapWrite 将其转换回表。我们将发现 MapReduce 中的接下来的三个操作看起来非常相似,所以我会更快地通过它们,但我仍然想在途中指出一个重要的细节。

图 6-1。 MapReduce 中的映射阶段。表中的数据被转换为流,然后再转换回去。

将流/表减少

在 MapWrite 阶段之后,ReduceRead 本身相对不那么有趣。它基本上与 MapRead 相同,只是读取的值是值的单例列表,而不是单个值,因为 MapWrite 存储的数据是键/值列表对。但它仍然只是在表的快照上进行迭代,将其转换为流。这里没有什么新东西。

即使在这种情况下,Reduce 听起来可能很有趣,但实际上它只是一个有点特别的 Map 阶段,它恰好接收每个键的值列表,而不是单个值。因此,它仍然只是将单个(复合)记录映射为零个或多个新记录。这里也没有什么特别新的东西。

ReduceWrite 是一个有点值得注意的阶段。我们已经知道这个阶段必须将流转换为表,因为 Reduce 产生了一个流,最终输出是一个表。但是这是如何发生的呢?如果我告诉你,这是由于将前一阶段的输出键分组到持久存储中的直接结果,就像我们在 MapWrite 中看到的那样,你可能会相信我,直到你记得我之前指出的 Reduce 阶段的键关联是一个可选特性。启用了该特性,ReduceWrite 基本上 与 MapWrite 相同。⁶但是如果禁用了该特性,并且 Reduce 的输出没有关联的键,那么到底发生了什么来使这些数据得到休息呢?

要理解正在发生的事情,重新思考 SQL 表的语义是有用的。虽然经常建议,但并不严格要求 SQL 表具有唯一标识每行的主键。在无键表的情况下,插入的每一行都被视为新的、独立的行(即使其中的数据与表中的一个或多个现有行的数据相同),就像有一个隐式的 AUTO_INCREMENT 字段被用作键一样(顺便说一句,在大多数实现中,实际上就是这样的,即使在这种情况下,“键”可能只是一些从未公开或预期用作逻辑标识符的物理块位置)。这种隐式的唯一键分配正是在没有键数据的 ReduceWrite 中发生的。从概念上讲,仍然发生着按键分组操作;这就是将数据置于静止状态的原因。但是由于缺少用户提供的键,ReduceWrite 将每个记录都视为具有新的、以前从未见过的键,并有效地将每个记录与自身分组,再次导致数据处于静止状态。

看一下图 6-2,它显示了从流/表的角度看整个管道。你可以看到这是一个 TABLE → STREAM → STREAM → TABLE → STREAM → STREAM → TABLE 的序列。即使我们处理的是有界数据,即使我们正在进行传统意义上的批处理,实际上它只是在表面下进行流和表处理。

图 6-2。从流和表的角度看 MapReduce 中的 Map 和 Reduce 阶段

与批处理的调和

那么,这对我们的前两个问题有什么影响呢?

  1. Q:批处理如何适应流/表理论?A:非常好。基本模式如下:
  1. 表被完整地读取成为流。
  2. 流被处理成新的流,直到遇到分组操作。
  3. 分组将流转换为表。
  4. 步骤 a 到 c 重复,直到管道中没有阶段为止。
  1. Q: 流如何与有界/无界数据相关联?
    A: 从 MapReduce 示例中可以看出,流只是数据的运动形式,无论它们是有界的还是无界的。

从这个角度来看,很容易看出流/表理论与有界数据的批处理并不矛盾。事实上,它进一步支持了我一直在强调的观点,即批处理和流处理并没有那么不同:归根结底,它一直都是流和表。

有了这个,我们已经在通向流和表的一般理论的道路上了。但是为了清晰地总结,我们最后需要重新讨论流/表上下文中的四个什么/哪里/何时/如何问题,看看它们如何相关。

什么哪里何时如何在流和表的世界中

在本节中,我们将看看这四个问题中的每一个,看看它们如何与流和表相关。我们还将回答可能从上一节中挥之不去的任何问题,其中一个重要的问题是:如果分组是将数据置于静止状态的原因,那么“取消分组”的逆过程究竟是什么?稍后再说。但现在,让我们来看看转换。

什么:转换

在第三章中,我们了解到转换告诉我们管道正在计算的是什么;也就是说,它是在构建模型、计算总和、过滤垃圾邮件等。我们在前面的 MapReduce 示例中看到,六个阶段中的四个回答了什么问题:

  • Map 和 Reduce 都对输入流中的每个键/值或键/值列表对应用了管道作者的逐元素转换,分别产生了一个新的、转换后的流。
  • MapWrite 和 ReduceWrite 都根据上一阶段分配的键对输出进行分组(在可选的 Reduce 情况下可能是隐式的),这样做可以将输入流转换为输出表。

从这个角度来看,你可以看到从流/表理论的角度来看,基本上有两种what转换类型:

非分组

这些操作(正如我们在 Map 和 Reduce 中看到的)只是接受一系列记录,并在另一侧生成一系列新的转换记录。非分组转换的示例包括过滤器(例如,删除垃圾邮件消息)、扩展器(即,将较大的复合记录拆分为其组成部分)和变换器(例如,除以 100),等等。

分组

这些操作(正如我们在 MapWrite 和 ReduceWrite 中看到的)接受一系列记录,并以某种方式将它们组合在一起,从而将流转换为表。分组转换的示例包括连接、聚合、列表/集合累积、变更日志应用、直方图创建、机器学习模型训练等。

为了更好地了解所有这些是如何联系在一起的,让我们看一下图 2-2 的更新版本,我们首次开始研究转换。为了避免你跳回去看我们在谈论什么,示例 6-1 包含了我们正在使用的代码片段。

示例 6-1。求和管道
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals =
  input.apply(Sum.integersPerKey());

这个管道只是简单地读取输入数据,解析单个团队成员的分数,然后对每个团队的分数进行求和。它的事件时间/处理时间可视化看起来像图 6-3 中呈现的图表。

<assets/stsy_0603.mp4>

图 6-3。经典批处理的事件时间/处理时间视图

图 6-4 描述了随着时间推移,从流和表的角度呈现的管道的更顶层视图。

<assets/stsy_0604.mp4>

图 6-4。经典批处理的流和表视图

在这种可视化的流和表版本中,时间的流逝通过在处理时间维度(y 轴)向下滚动图形区域来体现。以这种方式呈现事物的好处在于,它非常清楚地指出了非分组和分组操作之间的差异。与我们以前的图表不同,在那些图表中,我省略了管道中除了Sum.integersByKey之外的所有初始转换操作,但在这里,我也包括了初始的解析操作,因为解析操作的非分组方面与求和的分组方面形成了鲜明对比。从这个角度来看,很容易看出两者之间的区别。非分组操作对流中的元素运动没有任何影响,因此在另一侧产生另一个流。相反,分组操作将流中的所有元素汇聚在一起,将它们相加得到最终的总和。因为这个示例是在有界数据上运行的批处理引擎上运行的,最终结果只有在输入结束后才会被发出。正如我们在第二章中指出的那样,这个示例对有界数据是足够的,但在无界数据的情况下太过限制,因为理论上输入永远不会结束。但它真的不够吗?

从图表的新流/表部分来看,如果我们所做的只是计算总和作为我们的最终结果(而不在管道中的下游实际上以任何其他方式转换这些总和),那么我们用分组操作创建的表中就有我们的答案,随着新数据的到来而不断演变。为什么我们不直接从那里读取我们的结果呢?

这正是那些支持流处理器作为数据库的人所要表达的观点⁸(主要是 Kafka 和 Flink 团队):在管道中进行分组操作时,实际上创建了一个包含该阶段输出值的表。如果这些输出值恰好是管道正在计算的最终结果,那么如果可以直接从该表中读取它们,就不需要在其他地方重新生成它们。除了在时间演变过程中提供快速和便捷的结果访问外,这种方法通过不需要在管道中添加额外的接收阶段来节省计算资源,通过消除冗余数据存储来节省磁盘空间,并且消除了构建前述接收阶段的任何工程工作的需要。⁹ 唯一的主要注意事项是,您需要小心确保只有数据处理管道有能力对表进行修改。如果表中的值可以在管道之外由外部修改而发生变化,那么关于一致性保证的所有赌注都将失效。

行业中有许多人一直在推荐这种方法,并且它正在被广泛应用于各种场景中。我们已经看到 Google 内部的 MillWheel 客户通过直接从基于 Bigtable 的状态表中提供数据来做同样的事情,而且我们正在为从 Google 内部使用的 C+±based Apache Beam 等效版本(Google Flume)中的管道外部访问状态添加一流支持;希望这些概念将来某一天能够真正地传递到 Apache Beam。

现在,如果从状态表中读取值是很好的,如果其中的值是您的最终结果。但是,如果您在管道下游有更多的处理要执行(例如,想象一下我们的管道实际上正在计算得分最高的团队),我们仍然需要一种更好的方式来处理无界数据,允许我们以更增量的方式将表转换回流。为此,我们将希望通过剩下的三个问题的旅程,从窗口化开始,扩展到触发,最后将其与累积结合起来。

在哪里:窗口化

正如我们从第三章所知,窗口化告诉我们在事件时间中分组发生的位置。结合我们之前的经验,我们也可以推断它必须在流到表转换中起到作用,因为分组是驱动表创建的原因。窗口化有两个方面与流/表理论相互作用:

窗口分配

这实际上意味着将记录放入一个或多个窗口中。

窗口合并

这就是使动态的、数据驱动类型的窗口(例如会话)成为可能的逻辑。

窗口分配的效果非常直接。当记录在概念上放置到窗口中时,窗口的定义基本上与该记录的用户分配的键结合起来,以在分组时创建一个隐式的复合键。¹⁰ 简单。

为了完整起见,让我们再次从第三章的原始窗口化示例中看一看,但从流和表的角度来看。如果你还记得,代码片段看起来有点像示例 6-2(这次没有省略解析)。

示例 6-2。求和管道
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES)))
  .apply(Sum.integersPerKey());

原始可视化效果如图 6-5 所示。

<assets/stsy_0605.mp4>

图 6-5。批处理引擎上窗口化求和的事件时间/处理时间视图

现在,图 6-6 显示了流和表的版本。

<assets/stsy_0606.mp4>

图 6-6。批处理引擎上窗口化求和的流和表视图

正如你可能期望的那样,这看起来与图 6-4 非常相似,但表中有四个分组(对应数据占据的四个窗口),而不是只有一个。但与以前一样,我们必须等到有界输入结束后才能发出结果。我们将在下一节讨论如何处理无界数据,但首先让我们简要谈一下合并窗口。

窗口合并

接下来讨论合并,我们会发现窗口合并的影响比窗口分配更加复杂,但是当你考虑到需要发生的逻辑操作时,它仍然是直接的。当将流分组到可以合并的窗口时,该分组操作必须考虑到所有可能合并在一起的窗口。通常,这仅限于数据都具有相同键的窗口(因为我们已经确定窗口化修改了分组不仅仅是按键,还有键和窗口)。因此,系统实际上并不将键/窗口对视为一个平面复合键,而是将其视为分层键,用户分配的键是根,窗口是该根的子组件。当实际上将数据分组在一起时,系统首先按分层复合键的根(用户分配的键)进行分组。在按键分组后,系统可以继续在该键内按窗口进行分组(使用分层复合键的子组件)。按窗口进行分组就是窗口合并发生的地方。

从流和表的角度来看,有趣的是窗口合并如何改变最终应用于表的突变;也就是说,它如何修改了随时间指示表内容的更改日志。对于非合并窗口,每个新分组的元素都会导致对表的单个突变(将该元素添加到元素的键+窗口的组中)。对于合并窗口,分组新元素的操作可能导致一个或多个现有窗口与新窗口合并。因此,合并操作必须检查当前键的所有现有窗口,找出哪些窗口可以与新窗口合并,然后原子地删除旧未合并窗口并插入新合并窗口到表中。这就是为什么支持合并窗口的系统通常将原子性/并行性的单位定义为键,而不是键+窗口。否则,要提供正确性保证所需的强一致性将是不可能的(或者至少更加昂贵)。当你开始以这种细节水平来看待它时,你就会明白为什么让系统来处理窗口合并的麻烦事是多么美妙。要更近距离地了解窗口合并语义,我建议你参考“数据流模型”的 2.2.2 节。

归根结底,窗口化实际上只是对分组语义的轻微改变,这意味着它对流到表转换的语义也是轻微的改变。对于窗口分配,就像在分组时将窗口合并到隐式复合键中一样简单。当涉及窗口合并时,这个复合键更像是一个分层键,允许系统处理按键分组,找出该键内的窗口合并,然后原子地应用所有必要的突变到相应的表中。抽象层次的叠加真是太好了!

尽管如此,我们实际上还没有解决将表转换为流的问题,特别是在无界数据的情况下以更增量的方式进行。为此,我们需要重新审视触发器。

何时:触发器

我们在第三章学到,我们使用触发器来决定窗口的内容何时被实现(水印为某些类型的触发器提供了输入完整性的有用信号)。在数据被分组到窗口中之后,我们使用触发器来决定何时将这些数据发送到下游。在流/表术语中,我们了解到分组意味着流到表的转换。从那里,我们可以很容易地看到触发器是分组的补充;换句话说,这是我们之前所探索的“取消分组”操作。触发器是驱动表到流转换的东西。

在流/表术语中,触发器是应用于表的特殊程序,允许对表中的数据在响应相关事件时进行实现。以这种方式陈述,它们实际上听起来非常类似于经典数据库触发器。事实上,这里选择的名称并非巧合;它们本质上是相同的东西。当您指定触发器时,实际上是在随着时间的推移为状态表中的每一行编写代码。当触发器触发时,它会获取当前静止在表中的相应数据,并将它们置于运动中,产生一个新的流。

让我们回到我们的例子。我们将从第二章的简单的每记录触发器开始,该触发器在每次到达新记录时都会发出新的结果。该示例的代码和事件时间/处理时间可视化如示例 6-3 所示。图 6-7 呈现了结果。

示例 6-3. 每条记录重复触发
PCollection<String>> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());  
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(Repeatedly(AfterCount(1))));
  .apply(Sum.integersPerKey());

<assets/stsy_0607.mp4>

图 6-7. 批处理引擎上窗口求和的流和表视图

与以前一样,每次遇到新记录时都会实现新的结果。以流和表类型的视图呈现,该图将类似于图 6-8。

<assets/stsy_0608.mp4>

图 6-8. 流引擎上每条记录触发的窗口求和的流和表视图

使用每条记录触发器的一个有趣的副作用是它在某种程度上掩盖了数据被静止的效果,因为它们随后立即被触发器重新置于运动中。即便如此,从分组中产生的聚合物件仍然静止在表中,而未分组的值流则从中流走。

为了更好地了解静止/运动关系,让我们跳过我们的触发示例,转到第二章的基本水印完整性流示例,该示例在完成时简单地发出结果(由于水印通过窗口末端)。该示例的代码和事件时间/处理时间可视化如示例 6-4 所示(请注意,我这里只显示了启发式水印版本,以便简洁和比较),图 6-9 说明了结果。

示例 6-4. 水印完整性触发器
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(AfterWatermark()))
  .apply(Sum.integersPerKey());

<assets/stsy_0609.mp4>

图 6-9. 流引擎上带有启发式水印的窗口求和的事件时间/处理时间视图

由于在示例 6-4 中指定的触发器声明窗口应在水印通过它们时实现,系统能够在管道的无界输入变得越来越完整时以渐进的方式发出结果。在图 6-10 中的流和表版本中,它看起来就像您所期望的那样。

<assets/stsy_0610.mp4>

图 6-10. 带有启发式水印的窗口求和的流和表视图

在这个版本中,您可以非常清楚地看到触发器对状态表的取消分组效果。随着水印通过每个窗口的末尾,它将该窗口的结果从表中取出,并将其与表中的所有其他值分开,向下游传送。当然,我们仍然有之前的迟到数据问题,我们可以再次使用示例 6-5 中显示的更全面的触发器来解决。

示例 6-5。通过早期/准时/迟 API 进行早期、准时和迟触发
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(
                 AfterWatermark()
                   .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                   .withLateFirings(AfterCount(1))))
  .apply(Sum.integersPerKey());

事件时间/处理时间图看起来像图 6-11。

<assets/stsy_0611.mp4>

图 6-11。事件时间/处理时间视图的窗口求和在具有早期/准时/迟触发器的流引擎上

而流和表版本看起来像图 6-12 所示。

<assets/stsy_0612.mp4>

图 6-12。具有早期/准时/迟触发器的流引擎上窗口求和的流和表视图

这个版本更清楚地显示了触发器的取消分组效果,根据示例 6-6 中指定的触发器,将表的各个独立部分呈现为流的不断变化视图。

到目前为止,我们谈到的所有具体触发器的语义(事件时间、处理时间、计数、早期/准时/迟等复合触发器等)都符合我们从流/表视角看到的预期,因此不值得进一步讨论。然而,我们还没有花太多时间讨论触发器在经典批处理场景中的样子。现在我们了解了批处理管道的底层流/表拓扑结构是什么样子,这值得简要提及。

归根结底,在经典批处理场景中实际上只有一种类型的触发器:当输入完成时触发。对于我们之前看过的 MapReduce 作业的初始 MapRead 阶段,该触发器在概念上会立即为输入表中的所有数据触发,因为批处理作业的输入被假定为从一开始就是完整的。¹¹因此,该输入源表将被转换为单个元素的流,之后 Map 阶段可以开始处理它们。

在管道中间的表到流转换中,例如我们示例中的 ReduceRead 阶段,使用相同类型的触发器。然而,在这种情况下,触发器实际上必须等待表中的所有数据完成(即更常见地称为所有数据被写入洗牌),就像我们示例中的批处理管道在图 6-4 和 6-6 中等待输入结束之前发出最终结果一样。

鉴于经典批处理实际上总是使用输入数据完成触发器,您可能会问在批处理场景中作者指定的任何自定义触发器可能意味着什么。答案实际上是:这取决于情况。有两个值得讨论的方面:

触发器的保证(或缺乏保证)

大多数现有的批处理系统都是根据这种锁步读取-处理-分组-写入-重复的顺序进行设计的。在这种情况下,很难提供任何更精细的触发能力,因为它们可能会在管道的最终洗牌阶段才会表现出任何变化。然而,这并不意味着用户指定的触发器不会被尊重;触发器的语义是可以在适当的时候采用更低的共同分母。

例如,AfterWatermark触发器意味着在水印通过窗口结束时触发。 它不保证水印在触发时距离窗口结束有多远。 同样,AfterCount(N)触发器只保证在触发之前已处理至少 N个元素;N很可能是输入集中的所有元素。

请注意,触发器名称的巧妙措辞并不仅仅是为了适应经典的批处理系统,而是模型本身的一个非常必要的部分,考虑到触发的自然异步性和不确定性。 即使在经过精心调整的低延迟真正流式处理系统中,基本上不可能保证AfterWatermark触发器会在水印恰好任何给定窗口的结束时触发,除非在极端有限的情况下(例如,单台机器处理管道的所有数据,并且负载相对较小)。 即使您可以保证,真的有什么意义吗? 触发器提供了一种控制数据从表到流的流动的手段,仅此而已。

批处理和流处理的融合

根据我们在本文中学到的知识,应该清楚批处理和流处理系统之间的主要语义区别是触发表的增量能力。 但即使这也不是真正的语义区别,而更多的是延迟/吞吐量的权衡(因为批处理系统通常以更高的吞吐量换取更高的结果延迟)。

这可以追溯到我在“批处理和流处理效率差异”中所说的一些内容:今天批处理和流处理系统之间实际上没有太大的区别,除了效率差异(有利于批处理)和处理无界数据的自然能力(有利于流处理)。 我当时认为,这种效率差异很大程度上来自于更大的捆绑大小(明确地在延迟和吞吐量之间进行折衷)和更有效的洗牌实现(即,流→表→流转换)。 从这个角度来看,应该可以提供一个系统,它可以无缝地整合两者的优点:既可以自然地处理无界数据,又可以通过透明地调整捆绑大小、洗牌实现和其他实现细节来平衡延迟、吞吐量和成本之间的紧张关系,以满足广泛的用例。

这正是 Apache Beam 在 API 级别已经做到的。¹² 这里提出的论点是,在执行引擎级别也有统一的空间。 在这样的世界中,批处理和流处理将不再存在,我们将能够永远告别批处理流处理作为独立的概念。 我们将只有结合了两者最佳思想的通用数据处理系统,以提供特定用例的最佳体验。 某一天。

在这一点上,我们可以在触发部分插入叉子。 它完成了。 在我们全面了解 Beam 模型和流和表理论之间关系的过程中,我们只有一个更简短的停留:累积

如何:累积

在第二章中,我们了解到三种累积模式(丢弃、累积、累积和撤销¹³)告诉我们结果的细化如何与窗口在其生命周期内多次触发相关。 幸运的是,在这里与流和表的关系非常直接:

  • 丢弃模式要求系统在触发时要么丢弃窗口的先前值,要么保留先前值的副本并在下次窗口触发时计算增量¹⁴。(这种模式最好被称为增量模式。)
  • 累积模式不需要额外的工作;在触发时表中窗口的当前值就会被发出。(这种模式最好被称为值模式。)
  • 累积和撤回模式需要保留窗口中所有先前触发的(但尚未撤回)值的副本。在合并窗口(如会话)的情况下,先前值的列表可能会变得非常大,但对于干净地撤销先前触发的效果是至关重要的,因为新值不能简单地用于覆盖先前的值。(这种模式最好被称为值和撤回模式。)

流和表的可视化对累积模式的语义几乎没有额外的洞察力,因此我们不会在这里进行调查。

Beam 模型中流和表的整体视图

在解决了这四个问题之后,我们现在可以对 Beam 模型流水线中的流和表进行整体视图。让我们以我们的运行示例(团队得分计算流水线)为例,看看它在流和表级别的结构是什么样子。流水线的完整代码可能类似于示例 6-6(重复示例 6-4)。

示例 6-6。我们完整的分数解析流水线
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(
                 AfterWatermark()
                   .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                   .withLateFirings(AfterCount(1))))
  .apply(Sum.integersPerKey());

将其分解为由中间的PCollection类型分隔的阶段(我使用了更语义化的“类型”名称,如TeamUser Score,而不是真实类型,以便清楚地说明每个阶段发生了什么),你会得到类似于图 6-13 所示的东西。

图 6-13。团队得分总和流水线的逻辑阶段,带有中间的 PCollection 类型

当您实际运行此流水线时,它首先通过优化器,其工作是将此逻辑执行计划转换为经过优化的物理执行计划。每个执行引擎都是不同的,因此实际的物理执行计划将在运行程序之间有所不同。但是一个可信的计划可能看起来像图 6-14。

图 6-14。团队得分总和流水线的理论物理阶段,带有中间的 PCollection 类型

这里有很多事情要做,所以让我们逐一讨论。图 6-13 和 6-14 之间有三个主要区别,我们将讨论:

逻辑与物理操作

作为构建物理执行计划的一部分,底层引擎必须将用户提供的逻辑操作转换为引擎支持的一系列原始操作。在某些情况下,这些物理等价物看起来基本相同(例如Parse),而在其他情况下,它们则非常不同。

物理阶段和融合

在流水线中,将每个逻辑阶段作为完全独立的物理阶段执行通常是低效的(伴随着每个阶段之间的序列化、网络通信和反序列化开销)。因此,优化器通常会尝试将尽可能多的物理操作融合成单个物理阶段。

键、值、窗口和分区

为了更清楚地说明每个物理操作正在做什么,我已经注释了中间的PCollection,并注明了每个点的键、值、窗口和数据分区的类型。

现在让我们详细地走一遍每个逻辑操作,看看它在物理计划中是如何转换的,以及它们如何与流和表相关联:

ReadFromSource

除了与紧随其后的物理操作融合在一起(Parse),ReadFromSource的翻译中并没有太多有趣的事情发生。就我们目前数据的特征而言,因为读取基本上是消耗原始输入字节,我们基本上有没有键、没有窗口和没有(或随机的)分区的原始字符串。原始数据源可以是表(例如 Cassandra 表)或流(例如 RabbitMQ)或类似两者的东西(例如处于日志压缩模式的 Kafka)。但无论如何,从输入源读取的最终结果是一个流。

Parse

逻辑Parse操作也以相对直接的方式转换为物理版本。Parse从原始字符串中提取键(团队 ID)和值(用户得分)。这是一个非分组操作,因此它消耗的流仍然是另一侧的流。

Window+Trigger

这个逻辑操作分布在许多不同的物理操作中。首先是窗口分配,其中每个元素被分配到一组窗口中。这立即发生在AssignWindows操作中,这是一个非分组操作,只是用窗口注释流中的每个元素,使其属于的窗口,产生另一个流。

第二个是窗口合并,我们在本章前面学到的,作为分组操作的一部分发生。因此,它被沉入管道后面的GroupMergeAndCombine操作中。我们在下一个逻辑Sum操作时讨论该操作。

最后,有触发。触发发生在分组之后,是我们将由分组创建的表转换回流的方式。因此,它被沉入自己的操作中,跟随GroupMergeAndCombine

Sum

求和实际上是一个复合操作,由几个部分组成:分区和聚合。分区是一个非分组操作,以这样的方式重定向流中的元素,使得具有相同键的元素最终进入同一台物理机。分区的另一个词是洗牌,尽管这个术语有点过载,因为“Shuffle”在 MapReduce 意义上通常用来表示分区分组(排序,无论如何)。无论如何,分区在物理上改变了流,使其可以分组,但实际上并没有做任何事情来使数据真正停下来。因此,它是一个非分组操作,产生另一个流。

分区后是分组。分组本身是一个复合操作。首先是按键分组(由先前的按键分区操作启用)。接下来是窗口合并和按窗口分组,正如我们之前描述的那样。最后,因为求和在 Beam 中是作为CombineFn实现的(本质上是一个增量聚合操作),所以有组合,即当单个元素到达时将它们相加。具体细节对我们来说并不是非常重要的。重要的是,由于这显然是一个分组操作,我们的流链现在最终停留在一个包含随着时间演变的团队总分的表中。

WriteToSink

最后,我们有写操作,它接收触发产生的流(你可能还记得它被沉入GroupMergeAndCombine操作下面),并将其写入到我们的输出数据汇中。数据本身可以是表或流。如果是表,WriteToSink将需要执行某种分组操作来将数据写入表中。如果是流,就不需要分组(尽管可能仍然需要分区;例如,当写入类似 Kafka 的东西时)。

这里的重点不是物理计划中正在进行的一切的精确细节,而是 Beam 模型与流和表世界的整体关系。我们看到了三种类型的操作:非分组(例如,解析),分组(例如,GroupMergeAndCombine),和取消分组(例如,触发)。非分组操作总是消耗流并在另一侧产生流。分组操作总是消耗流并产生表。取消分组操作消耗表并产生流。这些见解以及我们一路学到的一切足以让我们制定关于 Beam 模型与流和表关系的更一般理论。

流和表相对论的一般理论

在调查了流处理、批处理、四个什么/哪里/何时/如何问题以及整个 Beam 模型与流和表理论的关系之后,现在让我们试图阐明流和表相对论的更一般定义。

流和表相对论的一般理论:

  • 数据处理管道(批处理和流处理)包括对这些表和流进行的操作
  • 静止的数据,作为数据积累和随时间观察的容器。
  • 运动中的数据,并编码了随时间演变的表的离散视图。
  • 操作作用于流或表并产生新的流或表。它们被分类如下:
  • 流 → 流:非分组(逐元素)操作
    对流应用非分组操作会改变流中的数据,同时保持它们在运动中,产生一个可能具有不同基数的新流。
  • 流 → 表:分组操作在流中对数据进行分组会使这些数据静止下来,产生一个随时间演变的
  • 窗口化将事件时间维度纳入这样的分组中。
  • 合并窗口会随时间动态组合,使它们能够根据观察到的数据重新塑造自己,并决定键保持原子性/并行化的单位,窗口作为该键内分组的子组件。
  • 表 → 流:取消分组(触发)操作在表中触发数据会将它们取消分组并投入运动,产生一个捕获表随时间演变的
  • 水印提供了相对于事件时间的输入完整性概念,这是一个有用的参考点,特别是在触发事件时间戳数据时,特别是从无界流中分组的数据。
  • 触发器的累积模式决定了流的性质,决定它是否包含增量或值,以及是否提供先前增量/值的撤销。
  • 表 → 表:(无)
    没有操作可以消耗表并产生表,因为数据不可能在不被投入运动的情况下从静止到静止。因此,对表的所有修改都是通过转换为流,然后再转换回来。

我喜欢这些规则的原因是它们很有道理。它们给人一种非常自然和直观的感觉,因此使人更容易理解数据如何通过一系列操作流动(或不流动)。它们规范了这样一个事实:在任何给定时间,数据存在于两种状态中的一种(流或表格),并且它们提供了简单的规则来推理这些状态之间的转换。它们通过展示窗口化只是每个人都本能理解的分组的轻微修改来揭示窗口化的神秘。它们强调了为什么分组操作通常是流处理中的一个难点(因为它们将流中的数据转化为表格),但也非常清楚地表明了需要哪些操作来解决这个问题(触发器;即非分组操作)。它们强调了在概念上批处理和流处理实际上是多么统一。

当我开始写这一章时,我并不完全确定最终会得到什么,但最终的结果比我想象的要令人满意得多。在接下来的章节中,我们将再次使用这种流和表格相对论的理论来指导我们的分析。每一次应用都会带来清晰和洞察力,否则这些洞察力将会更难获得。流和表格是最好的。

总结

在这一章中,我们首先建立了流和表格理论的基础。我们首先相对地定义了流和表格:

流 → 表格

随着时间的推移,对更新流的聚合会产生一个表格。

表格 → 流

随着时间的推移观察表格的变化会产生一个流。

接下来我们独立定义它们:

  • 表格是数据静止的。
  • 流是数据在运动中。

然后,我们从流和表格的角度评估了经典的 MapReduce 批处理计算模型,并得出结论,以下四个步骤描述了从这个角度进行的批处理:

  1. 表格被完整地读取以成为流。
  2. 流被处理成新的流,直到遇到分组操作。
  3. 分组将流转换为表格。
  4. 步骤 1 到 3 重复,直到管道中的操作用尽。

通过这种分析,我们能够看到流在批处理中和流处理中同样重要,以及数据是流的想法与所讨论的数据是有界还是无界是无关的。

接下来,我们花了很多时间考虑流和表格之间的关系,以及 Beam 模型提供的强大的、无序的流处理语义,最终得出了我们在前一节中列举的流和表格相对论的一般理论。除了流和表格的基本定义之外,该理论的关键见解是数据处理管道中有四(实际上只有三)种操作类型:

流 → 流

非分组(逐元素)操作

流 → 表格

分组操作

表格 → 流

非分组(触发)操作

表格 → 表格

(不存在)

通过这种方式对操作进行分类,可以轻松地理解数据如何随着时间在给定的管道中流动(或停留)。

最后,也许最重要的是,我们学到了这一点:当你从流和表格的角度看问题时,批处理和流处理在概念上实际上是一样的。有界或无界都无所谓。从头到尾都是流和表格。

¹ 如果你不是为了计算机科学而上大学,但你已经读到了这本书的这一部分,你很可能是 1)我的父母,2)受虐狂,或者 3)非常聪明(就记录而言,我并不意味着这些群体必然是互相排斥的;如果你能理解这一点,妈妈和爸爸,就自己想想吧!)。

2 请注意,在某些情况下,表本身可以接受时间作为查询参数,允许您向过去查看表的快照。

3 请注意,对于单个 mapper 观察到的两个连续记录的键,没有任何保证,因为尚未进行键分组。这里的键实际上只是为了让带键数据集以一种自然的方式被消费,如果输入数据没有明显的键,它们实际上都将共享一个全局的空键。

4 将批处理作业的输入称为“静态”可能有点过分。实际上,被消费的数据集在处理过程中可能会不断变化;也就是说,如果你直接从 HBase/Bigtable 表中读取在时间戳范围内的数据,这些数据并不保证是不可变的。但在大多数情况下,建议的方法是确保你以某种方式处理了输入数据的静态快照,任何偏离这一假设的情况都是自己的风险。

5 请注意,按键对流进行分组与简单地按键对流进行分区是有重要区别的,后者确保具有相同键的所有记录最终由同一台机器处理,但并不会使记录停止。它们仍然保持运动,因此继续作为流进行。分组操作更像是按键分区后写入适当分区的组,这是使它们停止并将流转换为表的原因。

6 一个巨大的区别,至少从实现的角度来看,是 ReduceWrite 知道键已经被 MapWrite 分组在一起,进一步知道 Reduce 无法改变键,因此它可以简单地累积减少值生成的输出,以便将它们分组在一起,这比 MapWrite 阶段所需的完整洗牌实现要简单得多。

7 另一种看待这个问题的方式是,有两种类型的表:可更新的和可追加的;这是 Flink 团队为他们的 Table API 所构建的方式。但即使这是捕捉到两种情况的观察语义的一个很好的直观方式,我认为它掩盖了实际发生的导致流变成表的基本本质;也就是分组。

8 尽管从这个例子中我们可以清楚地看到,这不仅仅是一个流处理的问题;如果批处理系统的状态表是全局可读的,你也可以得到相同的效果。

9 如果你的存储系统中还没有适合的接收器,这将特别痛苦;构建能够保证一致性的适当接收器是一个令人惊讶地微妙和困难的任务。

10 这也意味着,如果你将一个值放入多个窗口——例如滑动窗口——这个值在概念上必须被复制成多个独立的记录,每个窗口一个。即便如此,在某些情况下,底层系统可以智能地处理某些类型的重叠窗口,从而优化掉实际复制值的需要。例如,Spark 就为滑动窗口做到了这一点。

11 请注意,批处理管道中事物工作的这种高层概念视图掩盖了有效触发整个数据表的复杂性,特别是当该表足够大以至于需要多台机器来处理时。Beam 最近添加的 SplittableDoFn API 提供了一些关于涉及的机制的见解。

12 是的,如果你将批处理和流处理混合在一起,你就会得到 Beam,这也是这个名字最初的由来。真的。

13 这就是为什么你应该始终使用牛津逗号。

¹⁴请注意,在合并窗口的情况下,除了合并两个窗口的当前值以得到合并后的当前值之外,还需要合并这两个窗口的先前值,以便在触发时间后进行合并增量的计算。

第七章:持久状态的实际性

人们为什么写书?当你排除了创造的乐趣、对语法和标点的某种喜爱,也许偶尔的自恋,你基本上只剩下了捕捉本来是短暂的想法,以便将来可以重新访问。在非常高的层面上,我刚刚激发并解释了数据处理管道中的持久状态。

持久状态,确切地说,就是我们在第六章中讨论过的表,额外的要求是这些表要稳固地存储在相对不易丢失的介质上。存储在本地磁盘上是可以的,只要你不问你的网站可靠性工程师。存储在一组复制的磁盘上更好。存储在不同物理位置的一组复制的磁盘上更好。存储在内存中绝对不算数。存储在多台机器上的复制内存,配备 UPS 电源备份和现场发电机,也许可以算数。你明白了。

在本章中,我们的目标是做以下事情:

  • 激发管道内持久状态的需求
  • 看看管道内经常出现的两种隐式状态形式
  • 考虑一个现实世界的用例(广告转化归因),它本身不适合隐式状态,用它来激发一般显式持久状态管理的显著特点
  • 探索一个具体的状态 API 的实例,就像在 Apache Beam 中找到的那样

动机

首先,让我们更准确地激发持久状态。我们从第六章知道,分组是给我们提供表的东西。而我在本章开头提出的核心观点是正确的:持久化这些表的目的是捕获其中包含的本来是短暂的数据。但为什么这是必要的呢?

失败的必然性

这个问题的答案在处理无界输入数据的情况下最清楚,所以我们从那里开始。主要问题是处理无界数据的管道实际上是打算永远运行的。但永远运行是一个更具挑战性的服务级别目标,远远超出了这些管道通常执行的环境所能实现的。长时间运行的管道将不可避免地因为机器故障、计划维护、代码更改以及偶尔的配置错误命令而中断整个生产管道集群。为了确保它们可以在这些情况发生时恢复到中断之前的状态,长时间运行的管道需要某种持久的记忆来记录它们中断之前的位置。这就是持久状态的作用。

让我们在无界数据之外再扩展一下这个想法。这只在无界情况下才相关吗?批处理管道使用持久状态吗,为什么或为什么不?与我们遇到的几乎每一个批处理与流处理的问题一样,答案与批处理和流处理系统本身的性质无关(也许这并不奇怪,鉴于我们在第六章学到的东西),而更多地与它们历史上用于处理的数据集类型有关。

有界数据集本质上是有限大小的。因此,处理有界数据的系统(历史上是批处理系统)已经针对这种情况进行了调整。它们通常假设在失败时可以重新处理输入的全部内容。换句话说,如果处理管道的某个部分失败,如果输入数据仍然可用,我们可以简单地重新启动处理管道的适当部分,让它再次读取相同的输入。这被称为重新处理输入

他们可能还会假设失败不太频繁,因此会尽量少地进行持久化,接受在失败时重新计算的额外成本。对于特别昂贵的多阶段管道,可能会有某种每阶段全局检查点的方式,以更有效地恢复执行(通常作为洗牌的一部分),但这并不是严格要求,可能在许多系统中都不存在。

另一方面,无界数据集必须假定具有无限大小。因此,处理无界数据的系统(历史上的流处理系统)已经建立起来。它们从不假设所有数据都可用于重新处理,只假设其中的某个已知子集可用。为了提供至少一次或精确一次的语义,任何不再可用于重新处理的数据必须在持久检查点中得到考虑。如果最多一次是您的目标,您就不需要检查点。

归根结底,持久状态并不是批处理或流处理特有的。状态在这两种情况下都是有用的。只是在处理无界数据时,它变得至关重要,因此您会发现流处理系统通常提供更复杂的持久状态支持。

正确性和效率

考虑到失败的不可避免性和应对失败的需要,持久状态可以被视为提供两个东西:

  • 在处理暂时输入时,提供正确性的基础。在处理有界数据时,通常可以安全地假设输入会永远存在;¹对于无界数据,这种假设通常不符合现实。持久状态允许您保留必要的中间信息,以便在不可避免的情况发生时继续处理,即使您的输入源已经移动并且忘记了之前提供给您的记录。
  • 一种最小化重复工作和持久化数据的方式,作为应对失败的一部分。无论您的输入是暂时的,当您的管道遇到机器故障时,任何未在某个地方进行检查点的失败机器上的工作都必须重新进行。根据管道的性质和其输入,这在两个方面可能是昂贵的:重新处理期间执行的工作量以及存储以支持重新处理的输入数据量。
    最小化重复工作相对比较简单。通过在管道内部进行部分进度的检查点(计算的中间结果以及检查点时间内的当前输入位置),可以大大减少失败发生时重复工作的量,因为检查点之前的操作都不需要从持久输入中重新播放。最常见的是,这涉及到静态数据(即表),这就是为什么我们通常在表和分组的上下文中提到持久状态。但是也有流的持久形式(例如 Kafka 及其相关产品)可以起到这样的作用。
    最小化持久化数据量是一个更大的讨论,这将占据本章的相当大一部分。至少目前可以说,对于许多真实用例,与其记住管道中任何给定阶段的所有原始输入,通常实际上记住一些部分的中间形式更为实际,这些中间形式占用的空间比所有原始输入要少(例如,在计算平均值时,总和和值的计数比贡献到总和和计数的完整值列表更紧凑)。检查点这些中间数据不仅可以大大减少您需要在管道中任何给定点记住的数据量,而且还可以相应地减少从失败中恢复所需的重新处理量。
    此外,通过智能地对那些不再需要的持久状态进行垃圾回收(即已知已被管道完全处理的记录的状态),即使输入在技术上是无限的,也可以随着时间的推移将存储在给定管道的持久状态中的数据保持在可管理的大小,这样处理无界数据的管道就可以继续有效地运行,同时仍然提供强一致性保证,但不需要完全回忆管道的原始输入。

归根结底,持久状态实际上只是在数据处理管道中提供正确性和高效的容错的手段。在这两个方面所需的支持程度取决于管道输入的性质和正在执行的操作。无界输入往往需要比有界输入更多的正确性支持。计算昂贵的操作往往需要比计算廉价的操作更多的效率支持。

隐式状态

现在让我们开始谈论持久状态的实际情况。在大多数情况下,这基本上归结为在始终持久化一切(对一致性有利,对效率不利)和从不持久化任何东西(对一致性不利,对效率有利)之间找到合适的平衡。我们将从始终持久化一切的极端端点开始,并朝着另一个方向前进,看看如何在不损害一致性的情况下权衡实现复杂性以换取效率(因为通过从不持久化任何东西来牺牲一致性是一种简单的解决方案,对于一致性无关紧要的情况来说,但在其他情况下是不可选的)。与以前一样,我们使用 Apache Beam API 来具体地落实我们的讨论,但我们讨论的概念适用于今天存在的大多数系统。

此外,由于在原始输入中几乎没有可以减少大小的方法,除了可能压缩数据,我们的讨论重点是围绕在管道内进行分组操作时创建的中间状态表中数据的持久化方式。将多个记录聚合到某种复合形式中的固有性质将为我们提供机会,在实现复杂性的代价下获得效率上的收益。

原始分组

我们探索的第一步是在持续保持一切的极端端点,即在管道内进行最直接的分组实现:对输入进行原始分组。在这种情况下,分组操作通常类似于列表追加:每当新元素到达组时,它都会被追加到该组已见元素的列表中。

在 Beam 中,当您将GroupByKey转换应用于PCollection时,您将获得的正是这种状态。代表该PCollection的流在运动中被按键分组,以产生一个包含来自流的记录的静态表,²以相同键的值的列表分组在一起。这显示在GroupByKeyPTransform签名中,它声明输入为K/V对的PCollection,输出为K/Iterable<V>对的集合:

class GroupByKey<K, V> extends PTransform<
    PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>>

每当表中的键+窗口触发时,它将为该键+窗口发出一个新的窗格,值为我们在前面签名中看到的Iterable<V>

让我们在示例 7-1 中看一个示例。我们将从示例 6-5 中的求和流水线(具有固定窗口和早期/准时/延迟触发)转换为使用原始分组而不是增量组合(我们稍后在本章中讨论)。我们首先对解析的用户/分数键值对应用GroupByKey转换。GroupByKey操作执行原始分组,产生一个具有用户和分数组的PCollection键值对。然后,我们通过使用一个简单的MapElements lambda 将每个可迭代的Integer相加,将Integer的所有值相加起来,将Iterable<Integer>转换为IntStream<Integer>并在其上调用sum

示例 7-1。通过早期/准时/延迟 API 进行早期、准时和延迟触发
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> groupedScores = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(
                 AfterWatermark()
                   .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                   .withLateFirings(AfterCount(1))))
  .apply(GroupBy.<String, Integer>create());
PCollection<KV<Team, Integer>> totals = input
  .apply(MapElements.via((KV<String, Iterable<Integer>> kv) ->
    StreamSupport.intStream(
      kv.getValue().spliterator(), false).sum()));

观察这个流水线的运行,我们会看到类似于图 7-1 所示的情况。

<资产/stsy_0701.mp4>

图 7-1。通过窗口化和早期/准时/延迟触发的原始输入进行求和。原始输入被分组在一起,并通过 GroupByKey 转换存储在表中。在被触发后,MapElements lambda 将单个窗格内的原始输入相加,得出每个团队的得分。

与图 6-10 进行比较(该图使用了增量组合,稍后讨论),很明显可以看出这是更糟糕的。首先,我们存储了更多的数据:不再是每个窗口一个整数,而是现在存储了该窗口的所有输入。其次,如果我们有多个触发触发,我们会重复努力,重新对已经添加到以前触发触发的输入进行求和。最后,如果分组操作是我们将状态检查点到持久存储的地方,那么在机器故障时,我们必须重新计算表的任何重新触发的总和。这是大量重复的数据和计算。更好的做法是增量计算和检查点实际的总和,这是增量组合的一个例子。

增量组合

我们在将实现复杂性交换为效率的旅程中的第一步是增量组合。这个概念通过CombineFn类在 Beam API 中体现出来。简而言之,增量组合是一种自动状态,建立在用户定义的可结合和可交换的组合操作符之上(如果你不确定我所说的这两个术语是什么意思,我马上会更准确地定义它们)。虽然这对接下来的讨论并不是严格必要的,但 CombineFn API 的重要部分看起来像示例 7-2。

示例 7-2。来自 Apache Beam 的简化 CombineFn API
class CombineFn<InputT, AccumT, OutputT> {
    // Returns an accumulator representing the empty value.
    AccumT createAccumulator();
    // Adds the given input value into the given accumulator
    AccumT addInput(AccumT accumulator, InputT input);
    // Merges the given accumulators into a new, combined accumulator
    AccumT mergeAccumulators(Iterable<AccumT> accumulators);
    // Returns the output value for the given accumulator
    OutputT extractOutput(AccumT accumulator);
}

CombineFn接受类型为InputT的输入,可以将其组合成称为累加器的部分聚合,类型为AccumT。这些累加器本身也可以组合成新的累加器。最后,累加器可以转换为类型为OutputT的输出值。对于像平均值这样的东西,输入可能是整数,累加器可能是整数对(即Pair<输入总和,输入计数>),输出是表示组合输入的平均值的单个浮点值。

但是,这种结构给我们带来了什么?从概念上讲,增量组合的基本思想是,许多类型的聚合(求和、平均值等)表现出以下特性:

  • 增量聚合具有一个中间形式,它捕获了组合一组 N 个输入的部分进展,比这些输入本身的完整列表更紧凑(即CombineFn中的AccumT类型)。如前所述,对于平均值来说,这是一个总和/计数对。基本求和甚至更简单,它的累加器是一个单一的数字。直方图的累加器相对复杂,由桶组成,每个桶包含在某个特定范围内看到的值的计数。然而,在这三种情况下,表示 N 个元素聚合的累加器所占用的空间仍然明显小于原始 N 个元素本身所占用的空间,特别是当 N 的大小增长时。
  • 增量聚合对两个维度的排序都是漠不关心的:
  • 单个元素,意味着:
    COMBINE(a, b) == COMBINE(b, a)
  • 元素的分组,意味着:
    COMBINE(COMBINE(a, b), c) == COMBINE(a, COMBINE(b, c))
  • 这些属性分别被称为可交换性结合性。在一起,它们有效地意味着我们可以自由地以任意顺序和任意分组组合元素和部分聚合。这使我们能够通过两种方式优化聚合:
    增量化
    因为个别输入的顺序并不重要,我们不需要提前缓冲所有的输入,然后按照某种严格的顺序处理它们(例如,按事件时间顺序;注意,这仍然独立于按事件时间将元素洗牌到适当的事件时间窗口中进行聚合);我们可以在它们到达时逐个组合它们。这不仅极大地减少了必须缓冲的数据量(由于我们操作的第一个属性,即中间形式是部分聚合的更紧凑表示,而不是原始输入本身),而且还可以更均匀地分散计算负载的负担(与在缓冲完整输入集之后一次性聚合输入的负担相比)。
    并行化
    因为部分输入子组的组合顺序并不重要,我们可以任意分配这些子组的计算。更具体地说,我们可以将这些子组的计算分散到多台机器上。这种优化是 MapReduce 的Combiners(Beam 的CombineFn的起源)的核心。
    MapReduce 的 Combiner 优化对解决热键问题至关重要,其中对输入流进行某种分组计算的数据量太大,无法由单个物理机器合理处理。一个典型的例子是将高容量的分析数据(例如,流量到一个热门网站的网页浏览量)按相对较少的维度(例如,按浏览器系列:Chrome,Firefox,Safari 等)进行分解。对于流量特别高的网站,即使该机器专门用于计算统计数据,也很难在单台机器上计算任何单个网页浏览器系列的统计数据;流量太大,无法跟上。但是,通过类似求和这样的结合和交换操作,可以将初始聚合分布到多台机器上,每台机器计算一个部分聚合。然后,这些机器生成的部分聚合集合(其大小现在比原始输入小几个数量级)可以在单台机器上进一步组合在一起,得到最终的聚合结果。
    顺便说一句,这种并行化的能力还带来了一个额外的好处:聚合操作自然与合并窗口兼容。当两个窗口合并时,它们的值也必须以某种方式合并。对于原始分组来说,这意味着将两个完整的缓冲值列表合并在一起,其成本为 O(N)。但是对于CombineFn来说,这只是两个部分聚合的简单组合,通常是 O(1)的操作。

为了完整起见,再考虑一下示例 6-5,如示例 7-3 所示,它使用增量组合实现了一个求和管道。

示例 7-3。通过增量组合进行分组和求和,就像示例 6-5 中那样
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(
                 AfterWatermark()
                   .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                   .withLateFirings(AfterCount(1))))
  .apply(Sum.integersPerKey());

执行时,我们得到了我们在图 6-10 中看到的结果(在这里显示为图 7-2)。与图 7-1 相比,这显然是一个很大的改进,存储的数据量和执行的计算量都大大提高了效率。

<资产/stsy_0702.mp4>

图 7-2。通过增量组合进行分组和求和。在这个版本中,增量和被计算并存储在表中,而不是输入的列表,这些列表必须在以后独立地进行求和。

通过为分组操作提供更紧凑的中间表示,并放宽对排序的要求(在元素和子组级别),Beam 的CombineFn在实现复杂性方面进行了一定程度的折衷,以换取效率的提高。这样做,它为热键问题提供了一个清晰的解决方案,并且与合并窗口的概念相互配合。

然而,一个缺点是,您的分组操作必须适合相对受限的结构。这对于求和、平均值等来说都很好,但在许多真实世界的用例中,需要更一般的方法,这种方法允许对复杂性和效率的权衡进行精确控制。接下来我们将看看这种一般方法包括什么。

广义状态

尽管我们迄今为止看到的两种隐式方法都各有其优点,但它们在某一方面都存在不足:灵活性。原始分组方法要求您在处理整个组之前始终缓冲原始输入到分组操作,因此在途中无法部分处理一些数据;要么全部处理,要么不处理。增量组合方法专门允许部分处理,但限制了所处理的处理必须是可交换和可结合的,并且是逐个记录到达时发生的。

如果我们想要支持更广义的流式持久状态方法,我们需要更灵活的东西。具体来说,我们需要在三个方面灵活:

  • 数据结构的灵活性;也就是说,我们写入和读取数据的能力,以最适合和最有效的方式进行结构化。原始分组基本上提供了一个可附加的列表,而增量组合基本上提供了一个始终以其全部写入和读取的单个值。但是,我们可能希望以其他无数种方式来结构化我们的持久数据,每种方式都具有不同类型的访问模式和相关成本:映射、树、图、集合等等。支持各种持久数据类型对于效率至关重要。
    Beam 通过允许单个DoFn声明多个特定类型的状态字段来支持数据类型的灵活性。通过这种方式,逻辑上独立的状态片段(例如访问和印象)可以分别存储,并且语义上不同类型的状态(例如映射和列表)可以以符合其访问模式类型的方式进行访问。
  • 写入和读取的灵活性;也就是说,能够根据需要调整在任何给定时间写入或读取的数据量和类型,以实现最佳效率。归根结底,这意味着能够在任何给定时间点精确地写入和读取必要数量的数据:不多,也不少(并且尽可能并行)。
    这与前面的观点相辅相成,因为专用数据类型允许专注于特定类型的访问模式(例如,可以使用类似 Bloom 过滤器的东西来极大地减少在某些情况下读取的数据量)。但它不仅限于此;例如,允许多个大型读取并行分派(例如,通过 futures)。
    在 Beam 中,通过特定数据类型的 API 实现了灵活的粒度写入和读取,这些 API 提供了细粒度的访问能力,结合了异步 I/O 机制,可以将写入和读取批量处理以提高效率。
  • 处理调度的灵活性;也就是说,能够将特定类型的处理发生的时间与我们关心的两种时间域中的时间进展绑定在一起:事件时间的完整性和处理时间。触发器在这里提供了一定程度的灵活性,完整性触发器提供了一种将处理绑定到窗口结束时通过水印的方式,而重复更新触发器提供了一种将处理绑定到处理时间域中定期进展的方式。但对于某些用例(例如,某些类型的连接,对于这些连接,您不一定关心整个窗口的输入完整性,只关心连接中特定记录的事件时间之前的输入完整性),触发器的灵活性不够。因此,我们需要一个更通用的解决方案。
    在 Beam 中,通过定时器提供了灵活的处理调度。定时器是一种特殊类型的状态,它将支持的时间域(事件时间或处理时间)中的特定时间点与在达到该时间点时要调用的方法绑定。通过这种方式,特定的处理可以延迟到未来更合适的时间。

这三个特征之间的共同点是灵活性。一些特定的用例子集通过原始分组或增量组合的相对不灵活的方法得到了很好的服务。但是,当处理超出它们相对狭窄的专业领域时,这些选项通常表现不佳。当发生这种情况时,您需要全面通用状态 API 的强大灵活性,以便您可以最佳地定制持久状态的利用。

换个角度来看,原始分组和增量组合是相对高级的抽象,可以简洁地表达具有(至少在组合器的情况下)一些良好属性的管道。但有时,您需要降低级别以获得所需的行为或性能。这就是通用状态让您能够做到的。

案例研究:转化归因

为了看到这一点,现在让我们来看一个既不受原始分组也不受增量组合良好服务的用例:转化归因。这是广告界广泛使用的一种技术,用于提供有关广告效果的具体反馈。尽管相对容易理解,但它的一些多样化要求并不完全适合我们迄今考虑的两种类型的隐式状态。

想象一下,您有一个分析管道,监视网站的流量以及将流量引导到该网站的广告印象。目标是将显示给用户的特定广告归因于网站本身的某个目标的实现(通常可能远远超出初始广告着陆页面的许多步骤),例如注册邮件列表或购买物品。

图 7-3 显示了一个网站访问、目标和广告展示的示例集合,其中一个归因转化以红色突出显示。在无界、无序的数据流中建立转化归因需要跟踪到目前为止所见的展示、访问和目标。这就是持久状态的作用所在。

图 7-3. 示例转化归因

在这个图表中,用户在网站上浏览各种页面的过程被表示为一个图形。展示是向用户展示并被点击的广告,导致用户访问网站上的页面。访问代表在网站上查看的单个页面。目标是被确定为用户的期望目的地的特定访问页面(例如,完成购买或注册邮件列表)。转化归因的目标是识别导致用户在网站上实现某个目标的广告展示。在这个图中,有一个这样的转化以红色突出显示。请注意,事件可能以无序方式到达,因此图表中有事件时间轴和水印参考点,指示认为输入正确的时间。

构建一个强大的大规模归因管道需要投入大量精力,但有一些方面值得明确指出。我们尝试构建的任何这样的管道必须做到以下几点:

处理无序数据

由于网站流量和广告展示数据来自分别作为分布式收集服务的系统,这些数据可能以极其无序的方式到达。因此,我们的管道必须对这种无序性具有弹性。

处理大量数据

我们不仅必须假设这个管道将处理大量独立用户的数据,而且根据给定广告活动的规模和给定网站的受欢迎程度,我们可能需要存储大量的展示和/或流量数据,以便我们尝试建立归因的证据。例如,为了让我们能够建立跨越多个月活动的归因,存储 90 天的访问、展示和目标树⁵数据对于每个用户来说并不罕见。

防范垃圾邮件

考虑到涉及到金钱,正确性至关重要。我们不仅必须确保访问和展示被准确计算一次(通过简单使用支持有效一次处理的执行引擎,我们基本上可以得到这一点),而且还必须保护我们的广告商免受试图不公平收费的垃圾邮件攻击。例如,同一用户连续多次点击同一广告将作为多个展示到达,但只要这些点击在一定时间内发生(例如在同一天内),它们只能被归因一次。换句话说,即使系统保证我们会看到每个单独的展示一次,我们还必须在技术上不同但根据我们的业务逻辑应该被解释为重复的展示之间执行一些手动去重。

优化性能

最重要的是,由于这个管道的潜在规模,我们必须始终关注优化管道的性能。由于写入持久存储的固有成本,持久状态往往会成为这种管道的性能瓶颈。因此,我们之前讨论的灵活性特征对于确保我们的设计尽可能高效至关重要。

使用 Apache Beam 进行转化归因

现在我们理解了我们要解决的基本问题,并且心中有一些重要的要求,让我们使用 Beam 的 State 和 Timers API 来构建一个基本的转化归因转换。我们将像在 Beam 中编写任何其他DoFn一样编写这个,但我们将利用状态和计时器扩展,允许我们编写和读取持久状态和计时器字段。那些想要在真实代码中跟随的人可以在GitHub上找到完整的实现。

请注意,与 Beam 中的所有分组操作一样,State API 的使用范围限定为当前的键和窗口,窗口的生命周期由指定的允许延迟参数决定;在这个例子中,我们将在一个全局窗口内操作。并行性是按键线性化的,就像大多数DoFns一样。还要注意,为了简单起见,我们将省略手动回收超出我们 90 天视野范围的访问和印象,这对于保持持久状态不断增长是必要的。

首先,让我们为访问、印象、访问/印象联合(用于连接)和已完成的归因定义一些 POJO 类,如示例 7-4 所示。

示例 7-4。Visit、Impression、VisitOrImpression 和 Attribution 对象的 POJO 定义
@DefaultCoder(AvroCoder.class)
class Visit {
    @Nullable private String url;
    @Nullable private Instant timestamp;
    // The referring URL. Recall that we’ve constrained the problem in this
    // example to assume every page on our website has exactly one possible
    // referring URL, to allow us to solve the problem for simple trees
    // rather than more general DAGs.
    @Nullable private String referer;
    @Nullable private boolean isGoal;
    @SuppressWarnings("unused")
    public Visit() {
    }
    public Visit(String url, Instant timestamp, String referer,
                 boolean isGoal) {
  this.url = url;
  this.timestamp = timestamp;
  this.referer = referer;
  this.isGoal = isGoal;
    }
    public String url() { return url; }
    public Instant timestamp() { return timestamp; }
    public String referer() { return referer; }
    public boolean isGoal() { return isGoal; }
    @Override
    public String toString() {
        return String.format("{ %s %s from:%s%s }", url, timestamp, referer,
                             isGoal ? " isGoal" : "");
    }
}
@DefaultCoder(AvroCoder.class)
class Impression {
    @Nullable private Long id;
    @Nullable private String sourceUrl;
    @Nullable private String targetUrl;
    @Nullable private Instant timestamp;
    public static String sourceAndTarget(String source, String target) { 
        return source + ":" + target;
    }
    @SuppressWarnings("unused")
    public Impression() {
    }
    public Impression(Long id, String sourceUrl, String targetUrl,
                      Instant timestamp) {
        this.id = id;
  this.sourceUrl = sourceUrl;
  this.targetUrl = targetUrl;
  this.timestamp = timestamp;
    }
    public Long id() { return id; }
    public String sourceUrl() { return sourceUrl; }
    public String targetUrl() { return targetUrl; }
    public String sourceAndTarget() {
        return sourceAndTarget(sourceUrl, targetUrl);
    }
    public Instant timestamp() { return timestamp; }
    @Override
    public String toString() {
  return String.format("{ %s source:%s target:%s %s }",
                             id, sourceUrl, targetUrl, timestamp);
    }
}
@DefaultCoder(AvroCoder.class)
class VisitOrImpression {
    @Nullable private Visit visit;
    @Nullable private Impression impression;
    @SuppressWarnings("unused")
    public VisitOrImpression() {
    }
    public VisitOrImpression(Visit visit, Impression impression) {
  this.visit = visit;
  this.impression = impression;
    }
    public Visit visit() { return visit; }
    public Impression impression() { return impression; }
}
@DefaultCoder(AvroCoder.class)
class Attribution {
    @Nullable private Impression impression;
    @Nullable private List<Visit> trail;
    @Nullable private Visit goal;
    @SuppressWarnings("unused")
    public Attribution() {
    }
    public Attribution(Impression impression, List<Visit> trail, Visit goal) {
  this.impression = impression;
  this.trail = trail;
  this.goal = goal;
    }
    public Impression impression() { return impression; }
    public List<Visit> trail() { return trail; }
    public Visit goal() { return goal; }
    @Override
    public String toString() {
  StringBuilder builder = new StringBuilder();
  builder.append("imp=" + impression.id() + " " + impression.sourceUrl());
  for (Visit visit : trail) {
      builder.append(" → " + visit.url());
  }
  builder.append(" → " + goal.url());
  return builder.toString();
    }
}

接下来,我们定义一个 Beam DoFn来消耗一个扁平化的VisitImpression集合,以用户为键。反过来,它将产生一个Attribution集合。它的签名看起来像示例 7-5。

示例 7-5。用于我们的转化归因转换的 DoFn 签名
class AttributionFn extends DoFn<KV<String, VisitOrImpression>, Attribution>

DoFn中,我们需要实现以下逻辑:

  1. 将所有访问存储在一个以它们的 URL 为键的映射中,这样我们可以在追踪访问路径时轻松查找它们。
  2. 将所有印象存储在一个以它们所引用的 URL 为键的映射中,这样我们可以识别引发通往目标的印象。
  3. 每当我们看到一个恰好是目标的访问时,为目标的时间戳设置一个事件时间计时器。与此计时器关联的是一个执行目标归因的方法。这将确保只有在导致目标的输入完成后才进行归因。
  4. 因为 Beam 缺乏对动态计时器集的支持(当前所有计时器必须在管道定义时声明,尽管每个单独的计时器可以在运行时的不同时间点设置和重置),我们还需要跟踪我们仍然需要归因的所有目标的时间戳。这将允许我们为所有待处理目标的最小时间戳设置一个单一的归因计时器。在我们归因最早时间戳的目标之后,我们再次使用下一个最早目标的时间戳设置计时器。

现在让我们逐步实现。首先,我们需要在DoFn中声明所有状态和计时器字段的规范。对于状态,规范规定了字段本身的数据结构类型(例如,映射或列表)以及其中包含的数据类型和它们关联的编码器;对于计时器,它规定了关联的时间域。然后,每个规范都被分配一个唯一的 ID 字符串(通过@StateID/@TimerId注释),这将允许我们动态地将这些规范与后续的参数和方法关联起来。对于我们的用例,我们将定义(在示例 7-6 中)以下内容:

  • 两个用于访问和印象的MapState规范
  • 一个用于目标的SetState规范
  • 一个用于跟踪最小待处理目标时间戳的ValueState规范
  • 一个用于延迟归因逻辑的Timer规范
示例 7-6。状态字段规范
class AttributionFn extends DoFn<KV<String, VisitOrImpression>, Attribution> {
    @StateId("visits")
    private final StateSpec<MapState<String, Visit>> visitsSpec =
  StateSpecs.map(StringUtf8Coder.of(), AvroCoder.of(Visit.class));
    // Impressions are keyed by both sourceUrl (i.e., the query) and targetUrl
    // (i.e., the click), since a single query can result in multiple impressions.
    // The source and target are encoded together into a single string by the
    // Impression.sourceAndTarget method.
    @StateId("impressions")
    private final StateSpec<MapState<String, Impression>> impSpec =
  StateSpecs.map(StringUtf8Coder.of(), AvroCoder.of(Impression.class));
    @StateId("goals")
    private final StateSpec<SetState<Visit>> goalsSpec =
  StateSpecs.set(AvroCoder.of(Visit.class));
    @StateId("minGoal")
    private final StateSpec<ValueState<Instant>> minGoalSpec =
  StateSpecs.value(InstantCoder.of());
    @TimerId("attribution")
    private final TimerSpec timerSpec =
  TimerSpecs.timer(TimeDomain.EVENT_TIME);
... continued in Example 7-7 below ...

接下来,我们实现我们的核心@ProcessElement方法。这是每次新记录到达时都会运行的处理逻辑。正如前面所述,我们需要将访问和展示记录到持久状态中,并跟踪目标并管理将我们的归因逻辑绑定到事件时间完整性进展的定时器,由水印跟踪。对状态和定时器的访问是通过传递给我们的@ProcessElement方法的参数提供的,Beam 运行时使用@StateId@TimerId注解指示适当的参数调用我们的方法。逻辑本身相对简单,如示例 7-7 所示。

示例 7-7。@ProcessElement 实现
... continued from Example 7-6 above ...
@ProcessElement
public void processElement(
        @Element KV<String, VisitOrImpression> kv,
  @StateId("visits") MapState<String, Visit> visitsState,
  @StateId("impressions") MapState<String, Impression> impressionsState,
  @StateId("goals") SetState<Visit> goalsState,
  @StateId("minGoal") ValueState<Instant> minGoalState,
  @TimerId("attribution") Timer attributionTimer) {
    Visit visit = kv.getValue().visit();
    Impression impression = kv.getValue().impression();
    if (visit != null) {
  if (!visit.isGoal()) {
      LOG.info("Adding visit: {}", visit);
      visitsState.put(visit.url(), visit);
  } else {
      LOG.info("Adding goal (if absent): {}", visit);
      goalsState.addIfAbsent(visit);
      Instant minTimestamp = minGoalState.read();
      if (minTimestamp == null || visit.timestamp().isBefore(minTimestamp)) {
                LOG.info("Setting timer from {} to {}",
                         Utils.formatTime(minTimestamp),
                         Utils.formatTime(visit.timestamp()));
                attributionTimer.set(visit.timestamp());
    minGoalState.write(visit.timestamp());
      }
      LOG.info("Done with goal");
  }
    }
    if (impression != null) {
        // Dedup logical impression duplicates with the same source and target URL.
  // In this case, first one to arrive (in processing time) wins. A more
  // robust approach might be to pick the first one in event time, but that
        // would require an extra read before commit, so the processing-time
        // approach may be slightly more performant.
        LOG.info("Adding impression (if absent): {} → {}",
                 impression.sourceAndTarget(), impression);
  impressionsState.putIfAbsent(impression.sourceAndTarget(), impression);
    }
}
... continued in Example 7-8 below ...

请注意,这与我们对通用状态 API 中的三个期望功能的联系:

数据结构的灵活性

我们有地图、集合、值和定时器。它们使我们能够以对我们的算法有效的方式高效地操作我们的状态。

写入和读取粒度的灵活性

我们的@ProcessElement方法会为我们处理的每一个访问和展示调用一次。因此,我们需要尽可能地提高其效率。我们利用了进行细粒度的盲目写入,只针对我们需要的特定字段。我们在@ProcessElement方法中只在遇到新目标的罕见情况下从状态中读取。当我们这样做时,我们只读取一个整数值,而不触及(可能要大得多的)地图和列表。

处理调度的灵活性

由于定时器的存在,我们能够延迟我们复杂的目标归因逻辑(下面定义)直到我们确信已经收到了所有必要的输入数据,最大限度地减少重复工作并最大化效率。

在定义了核心处理逻辑后,让我们现在看看我们的最后一段代码,即目标归因方法。这个方法被注解为@TimerId,以标识它为在相应的归因定时器触发时执行的代码。这里的逻辑比@ProcessElement方法复杂得多:

  1. 首先,我们需要加载我们的访问和展示地图的全部内容,以及我们的目标集。我们需要地图来逆向穿越我们将要构建的归因路径,我们需要目标来知道我们正在归因的目标是由于当前定时器触发的结果,以及我们想要在未来安排归因的下一个待定目标(如果有的话)。
  2. 在加载了我们的状态之后,我们在一个循环中逐个处理这个定时器的目标:
  • 检查是否有任何展示将用户引荐到路径中的当前访问(从目标开始)。如果是,我们已经完成了这个目标的归因,可以跳出循环并发出归因路径。
  • 接下来检查是否有任何访问是当前访问的引荐者。如果是,我们在我们的路径中找到了一个反向指针,所以我们遍历它并重新开始循环。
  • 如果找不到匹配的展示或访问,我们有一个是有机达成的目标,没有相关的展示。在这种情况下,我们只需跳出循环,继续下一个目标,如果有的话。
  1. 在我们用于归因的目标列表用尽后,我们为列表中的下一个待定目标设置一个定时器(如果有的话),并重置相应的ValueState以跟踪最小的待定目标时间戳。

为了简洁起见,我们首先看一下核心目标归因逻辑,如示例 7-8 所示,它大致对应于前面列表中的第 2 点。

示例 7-8。目标归因逻辑
... continued from Example 7-7 above ...
private Impression attributeGoal(Visit goal,
         Map<String, Visit> visits,
         Map<String, Impression> impressions,
         List<Visit> trail) {
    Impression impression = null;
    Visit visit = goal;
    while (true) {
        String sourceAndTarget = Impression.sourceAndTarget(
            visit.referer(), visit.url());
        LOG.info("attributeGoal: visit={} sourceAndTarget={}",
                 visit, sourceAndTarget);
  if (impressions.containsKey(sourceAndTarget)) {
      LOG.info("attributeGoal: impression={}", impression);
      // Walked entire path back to impression. Return success.
      return impressions.get(sourceAndTarget);
  } else if (visits.containsKey(visit.referer())) {
      // Found another visit in the path, continue searching.
      visit = visits.get(visit.referer());
      trail.add(0, visit);
  } else {
      LOG.info("attributeGoal: not found");
      // Referer not found, trail has gone cold. Return failure.
      return null;
  }
    }
}
... continued in Example 7-9 below ...

代码的其余部分(省略了一些简单的辅助方法),处理初始化和获取状态,调用归因逻辑,并处理清理以安排任何剩余的待定目标归因尝试,看起来像示例 7-9。

示例 7-9。目标归因的整体@TimerId 处理逻辑
... continued from Example 7-8 above ...
@OnTimer("attribution")
public void attributeGoal(
        @Timestamp Instant timestamp,
  @StateId("visits") MapState<String, Visit> visitsState,
  @StateId("impressions") MapState<String, Impression> impressionsState,
  @StateId("goals") SetState<Visit> goalsState,
  @StateId("minGoal") ValueState<Instant> minGoalState,
  @TimerId("attribution") Timer attributionTimer,
  OutputReceiver<Attribution> output) {
    LOG.info("Processing timer: {}", Utils.formatTime(timestamp));
    // Batch state reads together via futures.
    ReadableState<Iterable<Map.Entry<String, Visit> > > visitsFuture
        = visitsState.entries().readLater();
    ReadableState<Iterable<Map.Entry<String, Impression> > > impressionsFuture
        = impressionsState.entries().readLater();
    ReadableState<Iterable<Visit>> goalsFuture = goalsState.readLater();
    // Accessed the fetched state.
    Map<String, Visit> visits = buildMap(visitsFuture.read());
    Map<String, Impression> impressions = buildMap(impressionsFuture.read());
    Iterable<Visit> goals = goalsFuture.read();
    // Find the matching goal
    Visit goal = findGoal(timestamp, goals);
    // Attribute the goal
    List<Visit> trail = new ArrayList<>();
    Impression impression = attributeGoal(goal, visits, impressions, trail);
    if (impression != null) {
  output.output(new Attribution(impression, trail, goal));
  impressions.remove(impression.sourceAndTarget());
    }
    goalsState.remove(goal);
    // Set the next timer, if any.
    Instant minGoal = minTimestamp(goals, goal);
    if (minGoal != null) {
  LOG.info("Setting new timer at {}", Utils.formatTime(minGoal));
  minGoalState.write(minGoal);
  attributionTimer.set(minGoal);
    } else {
  minGoalState.clear();
    }
}

这个代码块与通用状态 API 的三个期望功能非常相似,与@ProcessElement方法有一个显著的区别:

写入和读取粒度的灵活性

我们能够进行一次单一的粗粒度读取,加载所有地图和集合中的数据。这通常比单独加载每个字段或者更糟糕的是逐个加载每个字段元素要高效得多。这也显示了能够遍历从细粒度到粗粒度的访问粒度的重要性。

就是这样!我们实现了一个基本的转化归因流水线,以一种足够高效的方式在可观的规模上运行,并且使用了合理数量的资源。而且,最重要的是,它在面对无序数据时能够正常运行。如果您查看单元测试中使用的数据集,您会发现即使在这个小规模上也存在许多挑战:

  • 跟踪和归因于共享 URL 集合中的多个不同的转化。
  • 数据无序到达,特别是在处理时间上,目标到达(在访问和导致它们的印象之前),以及其他较早发生的目标。
  • 生成多个不同印象到不同目标 URL 的源 URL。
  • 物理上不同的印象(例如,对同一广告的多次点击)必须被去重为单个逻辑印象。
示例 7-10。用于验证转化归因逻辑的示例数据集
private static TestStream<KV<String, VisitOrImpression>> createStream() {
    // Impressions and visits, in event-time order, for two (logical) attributable
    // impressions and one unattributable impression.
    Impression signupImpression = new Impression(
  123L, "http://search.com?q=xyz",
  "http://xyz.com/", Utils.parseTime("12:01:00"));
    Visit signupVisit = new Visit(
  "http://xyz.com/", Utils.parseTime("12:01:10"),
  "http://search.com?q=xyz", false/*isGoal*/);
    Visit signupGoal = new Visit(
  "http://xyz.com/join-mailing-list", Utils.parseTime("12:01:30"),
  "http://xyz.com/", true/*isGoal*/);
    Impression shoppingImpression = new Impression(
  456L, "http://search.com?q=thing",
  "http://xyz.com/thing", Utils.parseTime("12:02:00"));
    Impression shoppingImpressionDup = new Impression(
  789L, "http://search.com?q=thing",
  "http://xyz.com/thing", Utils.parseTime("12:02:10"));
    Visit shoppingVisit1 = new Visit(
  "http://xyz.com/thing", Utils.parseTime("12:02:30"),
  "http://search.com?q=thing", false/*isGoal*/);
    Visit shoppingVisit2 = new Visit(
  "http://xyz.com/thing/add-to-cart", Utils.parseTime("12:03:00"),
  "http://xyz.com/thing", false/*isGoal*/);
    Visit shoppingVisit3 = new Visit(
  "http://xyz.com/thing/purchase", Utils.parseTime("12:03:20"),
  "http://xyz.com/thing/add-to-cart", false/*isGoal*/);
    Visit shoppingGoal = new Visit(
  "http://xyz.com/thing/receipt", Utils.parseTime("12:03:45"),
  "http://xyz.com/thing/purchase", true/*isGoal*/);
    Impression unattributedImpression = new Impression(
  000L, "http://search.com?q=thing",
  "http://xyz.com/other-thing", Utils.parseTime("12:04:00"));
    Visit unattributedVisit = new Visit(
  "http://xyz.com/other-thing", Utils.parseTime("12:04:20"),
  "http://search.com?q=other thing", false/*isGoal*/);
    // Create a stream of visits and impressions, with data arriving out of order.
    return TestStream.create(
  KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(VisitOrImpression.class)))
  .advanceWatermarkTo(Utils.parseTime("12:00:00"))
  .addElements(visitOrImpression(shoppingVisit2, null))
  .addElements(visitOrImpression(shoppingGoal, null))
  .addElements(visitOrImpression(shoppingVisit3, null))
  .addElements(visitOrImpression(signupGoal, null))
  .advanceWatermarkTo(Utils.parseTime("12:00:30"))
  .addElements(visitOrImpression(null, signupImpression))
  .advanceWatermarkTo(Utils.parseTime("12:01:00"))
  .addElements(visitOrImpression(null, shoppingImpression))
  .addElements(visitOrImpression(signupVisit, null))
  .advanceWatermarkTo(Utils.parseTime("12:01:30"))
  .addElements(visitOrImpression(null, shoppingImpressionDup))
  .addElements(visitOrImpression(shoppingVisit1, null))
  .advanceWatermarkTo(Utils.parseTime("12:03:45"))
  .addElements(visitOrImpression(null, unattributedImpression))
  .advanceWatermarkTo(Utils.parseTime("12:04:00"))
  .addElements(visitOrImpression(unattributedVisit, null))
  .advanceWatermarkToInfinity();
}

还要记住,我们在这里处理的是相对受限的转化归因版本。一个完整的实现将有额外的挑战要处理(例如,垃圾收集,访问 DAG 而不是树)。无论如何,这个流水线提供了一个很好的对比,与原始分组和增量组合通常提供的不够灵活的方法相比。通过牺牲一定的实现复杂性,我们能够找到必要的效率平衡,而不会在正确性上妥协。此外,这个流水线突出了流处理更加命令式的方法,状态和定时器提供了这种方法(想想 C 或 Java),这是对窗口和触发器提供的更加功能性方法的一个很好的补充(想想 Haskell)。

总结

在本章中,我们仔细研究了为什么持久状态很重要,得出结论,它为长期运行的管道提供了正确性和效率的基础。然后,我们看了数据处理系统中遇到的两种最常见的隐式状态类型:原始分组和增量组合。我们了解到原始分组是简单直接的,但潜在地低效,而增量组合大大提高了对可交换和可结合操作的效率。最后,我们看了一个相对复杂但非常实际的用例(并通过 Apache Beam Java 实现),并用它来突出通用状态抽象中需要的重要特征:

  • 数据结构的灵活性,允许使用针对特定用例定制的数据类型。
  • 写入和读取粒度的灵活性,允许在任何时候写入和读取的数据量都可以根据用例进行调整,最小化或最大化 I/O。
  • 处理时间调度的灵活性,允许将某些处理部分延迟到更合适的时间点,例如当输入被认为在特定事件时间点上完整时。

¹ 某种定义下的“永远”,通常至少是“直到我们成功完成批处理管道的执行并且不再需要输入”。

² 请记住,Beam 目前不直接暴露这些状态表;您必须将它们触发回到流中,以观察它们的内容作为新的 PCollection。

³ 或者,正如我的同事肯·诺尔斯指出的,如果你把定义看作是集合之间的可交换性,那么三参数版本的可交换性实际上也足以暗示结合性:COMBINE(a, b, c) == COMBINE(a, c, b) == COMBINE(b, a, c) == COMBINE(b, c, a) == COMBINE(c, a, b) == COMBINE(c, b, a)。数学很有趣。

⁴ 而且,定时器是实现我们在第二章讨论的大部分完整性和重复更新触发器的基础特性,以及基于允许迟到的垃圾回收。

⁵ 由于网络浏览的特性,我们将要分析的访问路径是由 HTTP 引用字段链接的 URL 树。实际上,它们最终会成为有向图,但为了简单起见,我们假设我们网站上的每个页面都有来自该网站上确切一个其他引用页面的入站链接,从而产生一个更简单的树结构。泛化到图是树结构实现的自然扩展,这进一步强调了所提出的观点。

第八章:流 SQL

让我们谈谈 SQL。在本章中,我们将从中间某个地方开始,然后回到过去一点,以建立额外的背景,最后再回到未来,用一个漂亮的蝴蝶结来总结一切。想象一下,如果昆汀·塔伦蒂诺拥有计算机科学学位,并且非常兴奋地向世界讲述流 SQL 的精髓,所以他提出要和我一起幽灵写作这一章;有点像那样。当然没有暴力。

什么是流 SQL?

我认为这个问题在我们行业中已经困扰了几十年。公平地说,数据库界可能已经理解了答案的 99%。但我还没有看到一个真正有力和全面的流 SQL 定义,它包括了强大的流语义的全部广度。这就是我们将在这里尝试提出的,尽管假设我们现在已经走了 99.1%的路,这可能是傲慢的。一步一步。

不管怎样,我想提前指出,我们在本章中讨论的大部分内容在写作时仍然是纯粹假设的。本章和接下来的一章(涵盖流连接)都描述了流 SQL 可能的理想愿景。一些部分已经在 Apache Calcite、Apache Flink 和 Apache Beam 等系统中实现。许多其他部分在任何地方都没有实现。在这个过程中,我会尽量指出一些已经以具体形式存在的东西,但考虑到这是一个不断变化的目标,你最好的选择就是简单地查阅你感兴趣的特定系统的文档。

值得一提的是,这里提出的流 SQL 的愿景是 Calcite、Flink 和 Beam 社区之间合作讨论的结果。Calcite 的首席开发人员 Julian Hyde 长期以来一直提出了他对流 SQL 可能的样子的愿景。2016 年,Flink 社区的成员将 Calcite SQL 支持集成到 Flink 本身,并开始向 Calcite SQL 方言添加流特定功能,如窗口构造。然后,在 2017 年,所有三个社区开始了一场讨论,试图就 Calcite SQL 中用于强大流处理的语言扩展和语义达成一致。本章试图将该讨论中的想法概括为一个清晰而连贯的叙述,关于将流概念整合到 SQL 中,无论是 Calcite 还是其他方言。

关系代数

谈到 SQL 的流式处理意味着什么,重要的是要记住 SQL 的理论基础:关系代数。关系代数简单地描述了由命名、类型化元组组成的数据之间的关系的数学方式。在关系代数的核心是关系本身,它是这些元组的集合。在经典的数据库术语中,关系类似于表,无论是物理数据库表,SQL 查询的结果,视图(实体化或其他),等等;它是包含命名和类型化数据列的行的集合。

关系代数的一个更为关键的方面是其封闭性质:将关系代数中的任何运算符应用于任何有效的关系¹,总是产生另一个关系。换句话说,关系是关系代数的通用货币,所有运算符都将其作为输入并将其作为输出。

从历史上看,许多支持 SQL 中的流式处理的尝试都未能满足封闭性质。它们将流与经典关系分开处理,提供新的运算符来在两者之间转换,并限制可以应用于其中一个或另一个的操作。这显著提高了任何此类流式 SQL 系统的采用门槛:潜在用户必须学习新的运算符,并理解它们适用的地方,以及它们不适用的地方,并且同样需要重新学习在这个新世界中任何旧运算符的适用规则。更糟糕的是,这些系统中大多数仍然无法提供我们想要的完整流语义套件,比如对强大的无序处理和强大的时间连接支持(我们将在第九章中介绍后者)的支持。因此,我认为基本上不可能指出任何现有的流式 SQL 实现已经实现了真正广泛的采用。这些流式 SQL 系统的额外认知负担和受限能力确保它们仍然是一个小众企业。

为了真正将流式 SQL 引入前沿,我们需要一种方法,使流式处理在关系代数本身内成为一等公民,以便标准的关系代数可以自然地适用于流式和非流式用例。这并不是说流和表应该被视为完全相同的东西;它们绝对不是一样的,认识到这一点可以清晰地理解和掌握流/表关系的力量,我们很快就会看到。但是核心代数应该干净自然地适用于两个世界,只有在绝对必要的情况下才需要在标准关系代数之外进行最小的扩展。

时变关系

简而言之,我在本章开头提到的要点是:将流式处理自然地整合到 SQL 中的关键是扩展关系代数的核心数据对象,以表示一组数据随着时间的推移而不是在特定时间点的数据集。更简洁地说,我们需要的不是“特定时间点”的关系,而是“时变关系”。

但是时变关系是什么?让我们首先从经典关系代数的角度来定义它们,之后我们还将考虑它们与流和表理论的关系。

从关系代数的角度来看,时变关系实际上只是经典关系随时间的演变。要理解我的意思,想象一个由用户事件组成的原始数据集。随着用户生成新事件,数据集会不断增长和演变。如果你在特定时间观察这个集合,那就是一个经典关系。但是如果你观察这个集合随着时间的整体演变,那就是一个时变关系。

换句话说,如果经典关系就像是由 x 轴上具有命名和类型的列和 y 轴上的记录行组成的二维表,那么时变关系就像是具有 x 和 y 轴的三维表,但是还有一个额外的 z 轴,用来捕捉随时间变化的二维表的不同版本。随着关系的变化,关系的新快照被添加到 z 维度中。

让我们来看一个例子。想象一下我们的原始数据集是用户和分数;例如,来自手机游戏的每个用户的分数,就像本书中大部分其他例子一样。假设我们的例子数据集最终在特定时间点观察时看起来像这样,比如 12:07:

*12:07> SELECT * FROM UserScores;*
-------------------------
| Name  | Score | Time  |
-------------------------
| Julie | 7     | 12:01 |
| Frank | 3     | 12:03 |
| Julie | 1     | 12:03 |
| Julie | 4     | 12:07 |
-------------------------

换句话说,它记录了随时间到达的四个分数:12:01 时朱莉的 7 分,12:03 时弗兰克的 3 分和朱莉的第二个分数 1 分,最后 12:07 时朱莉的第三个分数 4 分(请注意,这里的“时间”列包含表示系统内记录的到达时间的处理时间戳;我们稍后会介绍事件时间戳)。假设这是该关系曾经到达的唯一数据,那么无论我们在 12:07 之后何时观察它,它看起来都像前面的表格。但如果我们在 12:01 观察关系,它会看起来像下面这样,因为那时只有朱莉的第一个分数到达了:

*12:01> SELECT * FROM UserScores;*
-------------------------
| Name  | Score | Time  |
-------------------------
| Julie | 7     | 12:01 |
-------------------------

如果我们在 12:03 再次观察它,弗兰克的分数和朱莉的第二个分数也会到达,所以关系会发展成这样:

*12:03> SELECT * FROM UserScores;*
-------------------------
| Name  | Score | Time  |
-------------------------
| Julie | 7     | 12:01 |
| Frank | 3     | 12:03 |
| Julie | 1     | 12:03 |
-------------------------

通过这个例子,我们可以开始对这个数据集的时间变化关系有所了解:它将捕捉关系随时间的整体演变。因此,如果我们在 12:07 或之后观察时间变化关系(或 TVR),它将看起来像下面这样(请注意使用假设的TVR关键字来表示我们希望查询返回完整的时间变化关系,而不是经典关系的标准时点快照):

*12:07> SELECT TVR * FROM UserScores;*
---------------------------------------------------------
|       [-inf, 12:01)       |       [12:01, 12:03)      |
| ------------------------- | ------------------------- |
| | Name  | Score | Time  | | | Name  | Score | Time  | |
| ------------------------- | ------------------------- |
| |       |       |       | | | Julie | 7     | 12:01 | |
| |       |       |       | | |       |       |       | |
| |       |       |       | | |       |       |       | |
| |       |       |       | | |       |       |       | |
| ------------------------- | ------------------------- |
---------------------------------------------------------
|       [12:03, 12:07)      |       [12:07, now)        |
| ------------------------- | ------------------------- |
| | Name  | Score | Time  | | | Name  | Score | Time  | |
| ------------------------- | ------------------------- |
| | Julie | 7     | 12:01 | | | Julie | 7     | 12:01 | |
| | Frank | 3     | 12:03 | | | Frank | 3     | 12:03 | |
| | Julie | 1     | 12:03 | | | Julie | 1     | 12:03 | |
| |       |       |       | | | Julie | 4     | 12:07 | |
| ------------------------- | ------------------------- |
---------------------------------------------------------

由于印刷/数字页面仍然受限于二维,我已经将第三维压缩成了二维关系网格。但你可以看到,时间变化关系本质上是一系列经典关系(从左到右,从上到下排序),每个关系捕捉了特定时间范围内关系的完整状态(根据定义,所有这些时间范围都是连续的)。

定义时间变化关系的重要之处在于,它们实际上只是一系列经典关系,每个关系在其自己的不相交(但相邻)时间范围内独立存在,每个时间范围捕捉了关系在其中关系未发生变化的时间段。这一点很重要,因为这意味着将关系运算符应用于时间变化关系等同于分别将该运算符应用于相应序列中的每个经典关系。再进一步,将关系运算符分别应用于与时间间隔相关联的一系列关系的结果,将始终产生具有相同时间间隔的相应关系序列。换句话说,结果是相应的时间变化关系。这个定义给我们带来了两个非常重要的属性:

  • 从经典关系代数中的完整运算符集在应用于时间变化关系时仍然有效,而且继续表现得正如你所期望的那样。
  • 当应用于时间变化关系时,关系代数的闭包性仍然保持完整。

或者更简洁地说,当应用于时间变化关系时,所有经典关系代数的规则仍然保持不变。这是非常重要的,因为这意味着我们用时间变化关系替代经典关系并没有以任何方式改变游戏的参数。一切继续按照经典关系领域的方式运作,只是在一系列经典关系而不是单个关系上。回到我们的例子,考虑一下我们原始数据集上的另外两个时间变化关系,都是在 12:07 之后观察到的。首先是使用WHERE子句的简单过滤关系:

*12:07> SELECT TVR * FROM UserScores WHERE Name = "Julie";*
---------------------------------------------------------
|       [-inf, 12:01)       |       [12:01, 12:03)      |
| ------------------------- | ------------------------- |
| | Name  | Score | Time  | | | Name  | Score | Time  | |
| ------------------------- | ------------------------- |
| |       |       |       | | | Julie | 7     | 12:01 | |
| |       |       |       | | |       |       |       | |
| |       |       |       | | |       |       |       | |
| ------------------------- | ------------------------- |
---------------------------------------------------------
|       [12:03, 12:07)      |       [12:07, now)        |
| ------------------------- | ------------------------- |
| | Name  | Score | Time  | | | Name  | Score | Time  | |
| ------------------------- | ------------------------- |
| | Julie | 7     | 12:01 | | | Julie | 7     | 12:01 | |
| | Julie | 1     | 12:03 | | | Julie | 1     | 12:03 | |
| |       |       |       | | | Julie | 4     | 12:07 | |
| ------------------------- | ------------------------- |
---------------------------------------------------------

正如你所期望的那样,这个关系看起来很像前面的关系,但弗兰克的分数被过滤掉了。尽管时间变化关系捕捉了记录数据集随时间演变所需的额外维度时间,但查询的行为与你对 SQL 的理解一样。

对于更复杂一些的情况,让我们考虑一个分组关系,我们将所有每个用户的得分相加,以生成每个用户的总体得分:

*12:07> SELECT TVR Name, SUM(Score) as Total, MAX(Time) as Time 
       FROM UserScores GROUP BY Name;*
---------------------------------------------------------
|       [-inf, 12:01)       |       [12:01, 12:03)      |
| ------------------------- | ------------------------- |
| | Name  | Total | Time  | | | Name  | Total | Time  | |
| ------------------------- | ------------------------- |
| |       |       |       | | | Julie | 7     | 12:01 | |
| |       |       |       | | |       |       |       | |
| ------------------------- | ------------------------- |
---------------------------------------------------------
|       [12:03, 12:07)      |       [12:07, now)        |
| ------------------------- | ------------------------- |
| | Name  | Total | Time  | | | Name  | Total | Time  | |
| ------------------------- | ------------------------- |
| | Julie | 8     | 12:03 | | | Julie | 12    | 12:07 | |
| | Frank | 3     | 12:03 | | | Frank | 3     | 12:03 | |
| ------------------------- | ------------------------- |
---------------------------------------------------------

再次,这个查询的时间变化版本的行为与你所期望的完全一样,序列中的每个经典关系简单地包含了每个用户得分的总和。而且,无论我们选择多么复杂的查询,结果总是与独立应用该查询到输入时间变化关系组成的相应经典关系相同。我无法强调这一点有多重要!

好吧,这一切都很好,但时间变化关系本身更多的是一个理论构想,而不是数据的实际、物理表现;很容易看出,它们可能会变得非常庞大和难以控制,对于频繁变化的大型数据集来说。为了了解它们如何实际与现实世界的流处理联系起来,让我们现在探讨时间变化关系与流和表理论之间的关系。

流和表

对于这个比较,让我们再次考虑一下我们之前看过的分组时间变化关系:

*12:07> SELECT TVR Name, SUM(Score) as Total, MAX(Time) as Time
       FROM UserScores GROUP BY Name;*
---------------------------------------------------------
|       [-inf, 12:01)       |       [12:01, 12:03)      |
| ------------------------- | ------------------------- |
| | Name  | Total | Time  | | | Name  | Total | Time  | |
| ------------------------- | ------------------------- |
| |       |       |       | | | Julie | 7     | 12:01 | |
| |       |       |       | | |       |       |       | |
| ------------------------- | ------------------------- |
---------------------------------------------------------
|       [12:03, 12:07)      |       [12:07, now)        |
| ------------------------- | ------------------------- |
| | Name  | Total | Time  | | | Name  | Total | Time  | |
| ------------------------- | ------------------------- |
| | Julie | 8     | 12:03 | | | Julie | 12    | 12:07 | |
| | Frank | 3     | 12:03 | | | Frank | 3     | 12:03 | |
| ------------------------- | ------------------------- |
---------------------------------------------------------

我们知道这个序列捕捉了关系随时间的完整历史。鉴于我们在第六章对表和流的理解,理解时间变化关系与流和表理论的关系并不太困难。

表非常简单:因为时间变化关系本质上是经典关系的序列(每个捕捉了特定时间点上的关系快照),而经典关系类似于表,将时间变化关系观察为表,简单地产生了观察时间点的关系快照。

例如,如果我们在 12:01 观察之前的分组时间变化关系作为表,我们将得到以下结果(注意使用另一个假设关键字TABLE,明确表示我们希望查询返回一个表):

*12:01> SELECT TABLE Name, SUM(Score) as Total, MAX(Time) as Time*
 *FROM UserScores GROUP BY Name;*
-------------------------
| Name  | Total | Time  |
-------------------------
| Julie | 7     | 12:01 |
-------------------------

在 12:07 观察到的结果将是预期的:

*12:07> SELECT TABLE Name, SUM(Score) as Total, MAX(Time) as Time*
 *FROM UserScores GROUP BY Name;*
-------------------------
| Name  | Total | Time  |
-------------------------
| Julie | 12    | 12:07 |
| Frank | 3     | 12:03 |
-------------------------

这里特别有趣的是,实际上 SQL 中已经支持了时间变化关系的想法。SQL 2011 标准提供了“时间表”,它存储了表随时间的版本历史(实质上是时间变化关系),以及一个AS OF SYSTEM TIME结构,允许您明确地查询和接收在您指定的任何时间点的时间表/时间变化关系的快照。例如,即使我们在 12:07 执行了我们的查询,我们仍然可以看到关系在 12:03 时是什么样子的:

*12:07> SELECT TABLE Name, SUM(Score) as Total, MAX(Time) as Time*
 *FROM UserScores GROUP BY Name AS OF SYSTEM TIME ‘12:03’;*
-------------------------
| Name  | Total | Time  |
-------------------------
| Julie | 8     | 12:03 |
| Frank | 3     | 12:03 |
-------------------------

因此,SQL 中已经有一些关于时间变化关系的先例。但我岔开了。这里的主要观点是,表在特定时间点捕捉时间变化关系的快照。大多数真实世界的表实现简单地跟踪我们观察到的实时时间;其他保留一些额外的历史信息,这在极限情况下等同于捕捉关系在时间上的整个历史的完整保真时间变化关系。

流畅有些不同。我们在第六章学到,它们也捕捉了表随时间的演变。但它们与我们迄今为止所看到的时间变化关系有些不同。它们不是在每次变化时整体捕捉整个关系的快照,而是捕捉导致这些快照的变化序列在时间变化关系中。这里微妙的差异在一个例子中变得更加明显。

作为一个提醒,再次回想一下我们的基准例子TVR查询:

*12:07> SELECT TVR Name, SUM(Score) as Total, MAX(Time) as Time
       FROM UserScores GROUP BY Name;*
---------------------------------------------------------
|       [-inf, 12:01)       |       [12:01, 12:03)      |
| ------------------------- | ------------------------- |
| | Name  | Total | Time  | | | Name  | Total | Time  | |
| ------------------------- | ------------------------- |
| |       |       |       | | | Julie | 7     | 12:01 | |
| |       |       |       | | |       |       |       | |
| ------------------------- | ------------------------- |
---------------------------------------------------------
|       [12:03, 12:07)      |       [12:07, now)        |
| ------------------------- | ------------------------- |
| | Name  | Total | Time  | | | Name  | Total | Time  | |
| ------------------------- | ------------------------- |
| | Julie | 8     | 12:03 | | | Julie | 12    | 12:07 | |
| | Frank | 3     | 12:03 | | | Frank | 3     | 12:03 | |
| ------------------------- | ------------------------- |
---------------------------------------------------------

现在,让我们观察我们的时变关系作为一个流在几个不同的时间点存在的情况。在每一步,我们将比较 TVR 在那个时间点的原始表格呈现与流到那个时间点的演变。为了看到我们的时变关系的流呈现是什么样子,我们需要引入两个新的假设关键词:

  • 一个STREAM关键词,类似于我已经介绍的TABLE关键词,表示我们希望查询返回一个事件流,捕捉时变关系随时间的演变。你可以把这看作是在时间上对关系应用每条记录触发器。
  • 一个特殊的Sys.Undo³列,可以从STREAM查询中引用,用于识别撤销的行。稍后会详细介绍。

因此,从 12:01 开始,我们将有以下情况:

*12:01> SELECT STREAM Name,* 
*12:01> SELECT TABLE Name,                          SUM(Score) as Total,*
 *SUM(Score) as Total,                      MAX(Time) as Time,*
 *MAX(Time) as Time                         Sys.Undo as Undo*
 *FROM UserScores GROUP BY Name;            FROM UserScores GROUP BY Name;*
-------------------------                 --------------------------------
| Name  | Total | Time  |                 | Name  | Total | Time  | Undo |
-------------------------                 --------------------------------
| Julie | 7     | 12:01 |                 | Julie | 7     | 12:01 |      |
-------------------------                 ........ [12:01, 12:01] ........

表格和流的呈现在这一点上几乎是相同的。除了Undo列(在下一个例子中会更详细地讨论),只有一个区别:表格版本在 12:01 时是完整的(通过下方的虚线表示关系的底部结束),而流版本仍然是不完整的,通过最后的省略号一样的一行点表示,标记了关系的开放尾部(未来可能会有额外的数据)以及迄今为止观察到的处理时间范围。实际上,如果在真实实现中执行,STREAM查询将无限期地等待额外的数据到达。因此,如果等到 12:03,STREAM查询将出现三行新数据。与 12:03 的新TABLE呈现进行比较:

*12:01> SELECT STREAM Name,* 
*12:03> SELECT TABLE Name,                          SUM(Score) as Total,*
 *SUM(Score) as Total,                      MAX(Time) as Time,*
 *MAX(Time) as Time                         Sys.Undo as Undo*
 *FROM UserScores GROUP BY Name;            FROM UserScores GROUP BY Name;*
-------------------------                 --------------------------------
| Name  | Total | Time  |                 | Name  | Total | Time  | Undo |
-------------------------                 --------------------------------
| Julie | 8     | 12:03 |                 | Julie | 7     | 12:01 |      |
| Frank | 3     | 12:03 |                 | Frank | 3     | 12:03 |      |
-------------------------                 | Julie | 7     | 12:03 | undo |
                                          | Julie | 8     | 12:03 |      |
                                          ........ [12:01, 12:03] ........

这里有一个值得讨论的有趣观点:为什么在流中有行新数据(Frank 的 3 和 Julie 的撤销-7 和 8),而我们原始数据集中只包含行(Frank 的 3 和 Julie 的 1)?答案在于我们观察到的是原始输入的聚合变化流;特别是,在 12:01 到 12:03 的时间段内,流需要捕捉关于 Julie 的聚合分数变化的两个重要信息:

  • 先前报告的总数为 7 是错误的。
  • 新的总数是 8。

这就是特殊的Sys.Undo列允许我们做的事情:区分普通行和以前报告的值的撤销行。⁴

STREAM查询的一个特别好的特性是,你可以开始看到所有这些与经典的在线事务处理(OLTP)表有关:这个查询的STREAM呈现基本上捕捉了一系列INSERTDELETE操作,你可以用它来在 OLTP 世界中随时间实现这个关系(实际上,当你考虑一下,OLTP 表本身本质上就是通过INSERTUPDATEDELETE的流来随时间变化的时变关系)。

现在,如果我们不关心流中的撤销,也完全可以不要求它们。在这种情况下,我们的STREAM查询将如下所示:

*12:01> SELECT STREAM Name,*
 *SUM(Score) as Total,*
 *MAX(Time) as Time*
 *FROM UserScores GROUP BY Name;*
------------------------- 
| Name  | Total | Time  |
------------------------- 
| Julie | 7     | 12:01 | 
| Frank | 3     | 12:03 |
| Julie | 8     | 12:03 |
.... [12:01, 12:03] .....

但显然了解完整的流是有价值的,所以我们将回到在我们的最后一个例子中包括Sys.Undo列。说到这一点,如果我们再等四分钟到 12:07,我们将在STREAM查询中看到另外两行新数据,而TABLE查询将继续像以前一样演变。

*12:01> SELECT STREAM Name,* 
*12:07> SELECT TABLE Name,                          SUM(Score) as Total,*
 *SUM(Score) as Total,                      MAX(Time) as Time,*
 *MAX(Time) as Time                         Sys.Undo as Undo*
 *FROM UserScores GROUP BY Name;            FROM UserScores GROUP BY Name;*
-------------------------                 --------------------------------
| Name  | Total | Time  |                 | Name  | Total | Time  | Undo |
-------------------------                 --------------------------------
| Julie | 12    | 12:07 |                 | Julie | 7     | 12:01 |      |
| Frank | 3     | 12:03 |                 | Frank | 3     | 12:03 |      |
-------------------------                 | Julie | 7     | 12:03 | undo |
                                          | Julie | 8     | 12:03 |      |
                                          | Julie | 8     | 12:07 | undo |
                                          | Julie | 12    | 12:07 |      |
                                          ........ [12:01, 12:07] ........

到目前为止,很明显STREAM版本的时变关系与表格版本非常不同:表格捕捉了特定时间点的整个关系快照,而流捕捉了关系随时间的个别变化的视图。有趣的是,这意味着STREAM呈现与我们原始的基于表格的 TVR 呈现有更多的共同点:

*12:07> SELECT TVR Name, SUM(Score) as Total, MAX(Time) as Time
       FROM UserScores GROUP BY Name;*
---------------------------------------------------------
|       -inf, 12:01)       |       [12:01, 12:03)      |
| ------------------------- | ------------------------- |
| | Name  | Total | Time  | | | Name  | Total | Time  | |
| ------------------------- | ------------------------- |
| |       |       |       | | | Julie | 7     | 12:01 | |
| |       |       |       | | |       |       |       | |
| ------------------------- | ------------------------- |
---------------------------------------------------------
|       [12:03, 12:07)      |       [12:07, now)        |
| ------------------------- | ------------------------- |
| | Name  | Total | Time  | | | Name  | Total | Time  | |
| ------------------------- | ------------------------- |
| | Julie | 8     | 12:03 | | | Julie | 12    | 12:07 | |
| | Frank | 3     | 12:03 | | | Frank | 3     | 12:03 | |
| ------------------------- | ------------------------- |
---------------------------------------------------------

事实上,可以说STREAM查询只是提供了与相应基于表的TVR查询中存在的整个数据历史的另一种呈现方式。STREAM呈现的价值在于它的简洁性:它仅捕捉了TVR中每个时间点关系快照之间的变化增量。序列表TVR呈现的价值在于它提供的清晰度:它以突出其与经典关系的自然关系的格式捕捉了关系随时间的演变,并在此过程中提供了对于在流媒体环境中的关系语义的简单和清晰定义,以及流媒体带来的额外时间维度。

STREAM和基于表的TVR呈现之间相似之处的另一个重要方面是它们在总体上编码的数据实际上是等价的。这涉及到流/表对偶的核心,即其支持者长期以来一直宣扬的观点:流和表⁶实际上只是同一概念的两个不同物理表现形式,取决于上下文,就像波和粒子对于光一样⁷:完整的时变关系同时是表和流;表和流只是同一概念的不同物理表现形式。

现在,重要的是要记住,只有在两个版本编码相同信息时,即当你有全保真度的表或流时,流/表的这种对偶才是真实的。然而,在许多情况下,全保真度是不切实际的。正如我之前所暗示的,无论是以流的形式还是表的形式编码时变关系的完整历史对于大型数据源来说都可能非常昂贵。流/表的表现形式通常在某种程度上是有损的。表通常只编码 TVR 的最新版本;支持时间或版本访问的表通常将编码的历史压缩到特定的时间点快照,并/或者清理比某个阈值更老的版本。同样,流通常只编码 TVR 演变的有限时间段,通常是最近历史的一部分。像 Kafka 这样的持久流可以编码 TVR 的全部内容,但这相对不常见,通常会通过垃圾回收过程丢弃一些阈值之前的数据。

这里的主要观点是流和表绝对是彼此的对偶,每种都是编码时变关系的有效方式。但在实践中,流/表的物理表现形式通常在某种程度上是有损的。这些部分保真度的流和表在总编码信息减少的情况下换取了一些好处,通常是减少资源成本。这些类型的权衡是重要的,因为它们通常是我们能够构建能够处理真正大规模数据源的管道的原因。但它们也使事情变得复杂,并需要更深入的理解才能正确使用。我们将在后面更详细地讨论这个话题,当我们涉及 SQL 语言扩展时。但在我们尝试推理 SQL 扩展之前,了解当今常见的 SQL 和非 SQL 数据处理方法中存在的偏见会很有用。

回顾:流和表的偏见

在许多方面,将强大的流支持添加到 SQL 中实质上是一种尝试将 Beam 模型的wherewhenhow语义与经典 SQL 模型的what语义合并的过程。但要做到干净利落地,并且保持对经典 SQL 的外观和感觉,需要理解这两种模型之间的关系。因此,就像我们在第六章中探讨了 Beam 模型与流和表理论的关系一样,现在我们将使用流和表理论作为比较的基础框架,探讨 Beam 模型与经典 SQL 模型的关系。通过这样做,我们将发现每个模型中存在的固有偏见,这将为我们提供一些洞察,以便以一种干净、自然的方式最好地将这两种模型结合起来。

Beam 模型:一种流偏向的方法

让我们从 Beam 模型开始,基于第六章的讨论。首先,我想讨论 Beam 模型相对于流和表的固有流偏向。

如果回想一下图 6-11 和 6-12,它们展示了我们在整本书中一直使用的一个示例——分数求和管道的两种不同视图:图 6-11 是逻辑的 Beam 模型视图,图 6-12 是物理的流和表导向视图。比较这两者有助于突出 Beam 模型与流和表的关系。但是通过将一个叠加在另一个上面,就像我在图 8-1 中所做的那样,我们可以看到关系的另一个有趣方面:Beam 模型固有的流偏向。

图 8-1. Beam 模型方法中的流偏向

在这张图中,我画了虚线连接逻辑视图中的变换与物理视图中对应的组件。以这种方式观察时显而易见的是,所有逻辑变换都由连接,即使涉及分组的操作(我们从第六章知道这会导致某处创建表)。在 Beam 术语中,这些变换是PTransforms,它们总是应用于PCollections以产生新的PCollections。这里的重要观点是,在 Beam 中,PCollections始终是。因此,Beam 模型是一种固有的流偏向数据处理方法:流是 Beam 管道中的通用货币(即使是批处理管道),而表始终被特别对待,要么在管道边缘抽象在源和汇处,要么在管道中的某个地方被隐藏在分组和触发操作之下。因为 Beam 以流为单位运行,任何涉及表的地方(源、汇以及任何中间分组/取消分组),都需要进行某种转换以隐藏底层表。Beam 中的这些转换看起来像这样:

  • 消费表的通常会硬编码表的触发方式;用户无法指定他们想要消费的表的自定义触发方式。源可能被编写为触发对表的每次新更新作为记录,它可能批量组合更新,或者在某个时间点上提供表中数据的单个有界快照。这实际上取决于对于给定源来说什么是实际可行的,以及源的作者试图解决的用例是什么。
  • 表的通常会硬编码它们输入流的方式。有时,这是以一种给用户一定控制的方式来完成的;例如,通过简单地按用户分配的键进行分组。在其他情况下,分组可能是隐式定义的;例如,通过在写入没有自然键的输入数据时在分片输出源上分组到一个随机物理分区号。与源一样,这实际上取决于给定汇的实际可行性,以及汇的作者试图解决的用例是什么。
  • 对于分组/取消分组操作,与源和汇点相反,Beam 为用户提供了完全灵活的方式将数据分组到表中,并将其取消分组为流。这是有意设计的。分组操作的灵活性是必要的,因为数据分组的方式是定义管道的算法的关键组成部分。取消分组的灵活性很重要,以便应用程序可以以适合手头用例的方式塑造生成的流。⁸

然而,这里有一个问题。从图 8-1 中可以看出,Beam 模型本质上偏向于流。因此,虽然可以直接将分组操作清晰地应用于流(这是 Beam 的GroupByKey操作),但该模型从不提供可以直接应用触发器的一等表对象。因此,触发器必须在其他地方应用。基本上有两个选择:

触发器的预声明

在管道中的某个点之前指定触发器的位置应用于它们实际应用的表。在这种情况下,您基本上是预先指定了在管道中遇到分组操作后稍后希望看到的行为。以这种方式声明时,触发器是向前传播的。

触发器声明后

在管道中的某个点指定触发器的位置之后,它们被应用的表。在这种情况下,您正在指定在声明触发器的地方希望看到的行为。以这种方式声明时,触发器是向后传播的。

因为触发器的后声明允许您在实际观察它的地方指定所需的行为,所以这更直观。不幸的是,Beam 目前(2.x 及更早版本)使用的是触发器的预声明(类似于窗口也是预声明的)。尽管 Beam 提供了许多应对表格隐藏的方法,但我们仍然面临一个事实,那就是必须在观察表格之前触发它们,即使该表格的内容确实是您想要消耗的最终数据。这是 Beam 模型目前存在的一个缺点,可以通过摆脱流中心模型,转向将流和表格视为一等实体的模型来解决。现在让我们来看一下 Beam 模型的概念对应:经典 SQL。## SQL 模型:以表为中心的方法与 Beam 模型的流为中心的方法相反,SQL 历来采用以表为中心的方法:查询应用于表,并且总是产生新的表。这类似于我们在第六章中看到的 MapReduce 的批处理模型,但是考虑到 Beam 模型的一个类似的具体示例会很有用。考虑以下非规范化的 SQL 表:

UserScores (user, team, score, timestamp)

它包含用户得分,每个得分都带有相应用户和他们所在团队的 ID。没有主键,因此可以假设这是一个仅追加的表,每行都隐式地由其唯一的物理偏移标识。如果我们想要从这个表中计算团队得分,我们可以使用类似于以下内容的查询:

SELECT team, SUM(score) as total    FROM UserScores    GROUP BY team;

当由查询引擎执行时,优化器可能会将此查询大致分解为三个步骤:

  1. 扫描输入表(即触发其快照)
  2. 将该表中的字段投影到团队和得分
  3. 按团队分组并求和得分

如果我们使用类似于图 8-1 的图表来查看这一点,它会看起来像图 8-2。SCAN操作将输入表触发为一个有界流,其中包含查询执行时表的内容的快照。该流被SELECT操作消耗,将四列输入行投影到两列输出行。作为一个非分组操作,它产生另一个流。最后,团队和用户得分的这个两列流进入GROUP BY,按团队分组成一个表,相同团队的得分被SUM在一起,产生了我们的输出表,团队及其对应的团队得分总数。

图 8-2。简单 SQL 查询中的表倾向

这是一个相对简单的例子,自然会以一个表结束,因此它实际上并不足以突出经典 SQL 中的表倾向。但是,我们可以通过简单地将这个查询的主要部分(投影和分组)拆分成两个单独的查询来找出更多的证据:

SELECT team, score
    INTO TeamAndScore
    FROM UserScores;
    SELECT team, SUM(score) as total
    INTO TeamTotals
    FROM TeamAndScore
    GROUP BY team;

在这些查询中,我们首先将UserScores表投影到我们关心的两列中,将结果存储在一个临时的TeamAndScore表中。然后我们按团队对该表进行分组,同时对得分进行求和。在将事物拆分成两个查询的管道后,我们的图表看起来像图 8-3 所示。

图 8-3。将查询分成两部分以揭示表倾向的更多证据

如果经典 SQL 将流作为一流对象暴露出来,你会期望第一个查询TeamAndScore的结果是一个流,因为SELECT操作消耗一个流并产生一个流。但是因为 SQL 的通用货币是表,它必须首先将投影流转换为表。并且因为用户没有指定任何显式的键来分组,它必须简单地按其标识(即附加语义,通常通过按每行的物理存储偏移量进行分组)分组键。

因为TeamAndScore现在是一个表,第二个查询必须在前面添加一个额外的SCAN操作,将表扫描回流,以便GROUP BY再次将其分组成表,这次是按团队分组,并将它们的个人得分总和在一起。因此,我们看到了两次隐式转换(从流到表,然后再次转回来),这是由于中间表的显式实现而插入的。

话虽如此,SQL 中的表并不总是显式的;隐式表也是存在的。例如,如果我们在带有GROUP BY语句的查询末尾添加一个HAVING子句,以过滤出得分低于某个阈值的团队,那么图表将会改变,看起来会像图 8-4 所示的样子。

图 8-4。最终带有 HAVING 子句的表倾向

通过添加HAVING子句,原来的用户可见的TeamTotals表现在是一个隐式的中间表。为了根据HAVING子句中的规则过滤表的结果,必须将该表触发为一个流,然后可以对该流进行过滤,然后该流必须隐式地重新分组成一个表,以产生新的输出表LargeTeamTotals

这里的重要观点是经典 SQL 中明显的表倾向。流总是隐式的,因此对于任何实现的流,都需要从/到表的转换。这种转换的规则可以大致分类如下:

输入表(即 Beam 模型术语中的源)

这些总是在特定时间点¹⁰(通常是查询执行时间)完全隐式触发,以产生一个包含该时间点表快照的有界流。这与经典的批处理得到的结果是相同的;例如,我们在第六章中看到的 MapReduce 案例。

输出表(即 Beam 模型术语中的接收器)

这些表要么是查询中最终分组操作创建的表的直接表现,要么是应用于查询的终端流的隐式分组(按行的某个唯一标识符)的结果,对于不以分组操作结束的查询(例如前面示例中的投影查询,或者GROUP BY后跟一个HAVING子句)。与输入一样,这与经典批处理中的行为相匹配。

分组/非分组操作

与 Beam 不同,这些操作只在一个维度上提供完全的灵活性:分组。而经典 SQL 查询提供了一整套分组操作(GROUP BYJOINCUBE等),它们只提供了一种隐式的非分组操作:在所有贡献数据都被合并后触发中间表的整体(再次强调,这与 MapReduce 中作为洗牌操作的隐式触发完全相同)。因此,SQL 在通过分组塑造算法方面提供了很大的灵活性,但在塑造查询执行过程中存在的隐式流方面基本上没有灵活性。

物化视图

考虑到经典 SQL 查询与经典批处理的相似性,可能会诱使人们认为 SQL 固有的表偏见只是 SQL 不以任何方式支持流处理的产物。但这样做将忽视一个事实,即数据库已经支持了一种特定类型的流处理很长一段时间:物化视图。物化视图是作为表物理材料化并随着时间由数据库保持更新的视图,源表发生变化时也会相应更新。请注意,这听起来与我们对时变关系的定义非常相似。物化视图的迷人之处在于,它为 SQL 增加了一种非常有用的流处理形式,而不会显著改变它的操作方式,包括其固有的表偏见。

例如,让我们考虑一下图 8-4 中的查询。我们可以将这些查询改为CREATE MATERIALIZED VIEW¹¹语句:

CREATE MATERIALIZED VIEW TeamAndScoreView AS
    SELECT team, score
    FROM UserScores;
    CREATE MATERIALIZED VIEW LargeTeamTotalsView AS
    SELECT team, SUM(score) as total
    FROM TeamAndScoreView
    GROUP BY team
    HAVING SUM(score) > 100;

通过这样做,我们将它们转换为连续的、持续的查询,以流式方式持续处理UserScores表的更新。即使如此,物化视图的物理执行图与一次性查询的执行图几乎完全相同;在查询执行过程中,流并没有被显式地转换为显式的一流对象来支持这种流式物化视图的概念。物理执行计划中唯一值得注意的变化是替换了不同的触发器:SCAN-AND-STREAM而不是SCAN,如图 8-5 所示。

图 8-5。物化视图中的表偏差

这个SCAN-AND-STREAM触发器是什么?SCAN-AND-STREAM开始时像SCAN触发器一样,将表的全部内容在某个时间点发射到流中。但它不会在此停止并声明流已完成(即有界),而是继续触发对输入表的所有后续修改,产生一个捕获表随时间演变的无界流。在一般情况下,这些修改不仅包括新值的INSERT,还包括先前值的DELETE和现有值的UPDATE(实际上被视为同时的DELETE/INSERT对,或者在 Flink 中称为undo/redo值)。

此外,如果我们考虑物化视图的表/流转换规则,唯一的真正区别是使用的触发器:

  • 输入表通过SCAN-AND-STREAM触发器隐式触发,而不是SCAN触发器。其他一切都与经典批处理查询相同。
  • 输出表与经典批处理查询处理方式相同。
  • 分组/取消分组操作与经典批处理查询相同,唯一的区别是使用SCAN-AND-STREAM触发器而不是SNAPSHOT触发器进行隐式取消分组操作。

通过这个例子,很明显可以看出 SQL 固有的表偏向不仅仅是 SQL 被限制在批处理中的产物:¹²物化视图使 SQL 能够执行一种特定类型的流处理,而不需要进行任何重大的方法变更,包括对表的固有偏向。经典 SQL 只是一个偏向表的模型,无论你是用它进行批处理还是流处理。

展望未来:朝着强大的流 SQL

我们现在已经看过了时变关系,表和流提供不同的时变关系呈现方式,以及 Beam 和 SQL 模型在流和表理论方面的固有偏见。那么这一切对我们意味着什么?也许更重要的是,我们需要在 SQL 中做出哪些改变或添加以支持强大的流处理?令人惊讶的答案是:如果我们有好的默认值,就不需要太多。

我们知道,关键的概念变化是用时变关系替换经典的瞬时关系。我们之前看到,这是一个非常无缝的替换,适用于已经存在的关系代数的全部范围,这要归功于保持关系代数的关键闭包性质。但我们也看到,直接处理时变关系通常是不切实际的;我们需要能够以我们两种更常见的物理表现形式:表和流进行操作。这就是一些简单的带有良好默认值的扩展发挥作用的地方。

我们还需要一些工具来稳健地推理时间,特别是事件时间。这就是时间戳、窗口和触发器等东西发挥作用的地方。但同样,明智的默认选择将是重要的,以最小化这些扩展在实践中的必要性。

很棒的是,我们实际上不需要比这更多。所以现在让我们最终花一些时间详细研究这两类扩展:流/表选择时间操作符

流和表选择

当我们通过时变关系示例工作时,我们已经遇到了与流和表选择相关的两个关键扩展。它们是我们在SELECT关键字之后放置的TABLESTREAM关键字,以指示我们对给定时变关系的期望物理视图。

*12:07> SELECT TABLE Name,                 12:01> SELECT STREAM Name*
 *SUM(Score) as Total,                      SUM(Score) as Total,*                      
 *MAX(Time)                                 MAX(Time)* 
*FROM UserScores                           FROM UserScores*
 *GROUP BY Name;                            GROUP BY Name;*
-------------------------                 -------------------------
| Name  | Total | Time  |                 | Name  | Total | Time  |
-------------------------                 -------------------------
| Julie | 12    | 12:07 |                 | Julie | 7     | 12:01 |
| Frank | 3     | 12:03 |                 | Frank | 3     | 12:03 |
-------------------------                 | Julie | 8     | 12:03 |
                                          | Julie | 12    | 12:07 |
                                          ..... [12:01, 12:07] ....

这些扩展相对直接,需要时易于使用。但是,关于流和表选择的真正重要的事情是选择好的默认值,以便在没有明确提供时使用。这样的默认值应该尊重 SQL 的经典、偏向表的行为,这是每个人都习惯的,同时在包括流的世界中也能直观地操作。它们也应该容易记住。这里的目标是帮助系统保持自然的感觉,同时大大减少我们必须使用显式扩展的频率。满足所有这些要求的默认值的好选择是:

  • 如果所有的输入都是,输出是*TABLE*。
  • 如果任何输入都是,输出是*STREAM*。

这里另外需要指出的是,这些时变关系的物理呈现只有在你想以某种方式使 TVR 物化时才是真正必要的,无论是直接查看它还是将其写入某个输出表或流。鉴于 SQL 系统在全保真度时变关系方面的运行,中间结果(例如WITH ASSELECT INTO语句)可以保持为系统自然处理的全保真度 TVR,无需将它们呈现为其他更有限的具体表现形式。

这就是流和表选择的全部内容。除了直接处理流和表的能力之外,如果我们想要在 SQL 中支持强大的、无序的流处理,我们还需要一些更好的工具来推理时间。现在让我们更详细地看看这些工具包含了什么。

时间操作符

强大的、无序处理的基础是事件时间戳:这个小的元数据片段捕获了事件发生的时间,而不是观察到它的时间。在 SQL 世界中,事件时间通常只是给定 TVR 的另一列数据,它在源数据中是本地存在的。在这个意义上,将记录的事件时间实现在记录本身中的想法是 SQL 自然地处理的,通过将时间戳放在一个常规列中。

在我们继续之前,让我们看一个例子。为了帮助将所有这些 SQL 的东西与我们之前在书中探讨过的概念联系起来,我们重新使用我们运行示例,将团队各成员的九个分数相加,得出团队的总分。如果你回忆一下,当这些分数在 X=事件时间/Y=处理时间轴上绘制时,看起来像图 8-6。

图 8-6. 我们运行示例中的数据点

如果我们把这些数据想象成一个经典的 SQL 表,它们可能看起来像这样,按事件时间排序(图 8-6 中从左到右):

*12:10> SELECT TABLE *, Sys.MTime as ProcTime
       FROM UserScores ORDER BY EventTime;*
------------------------------------------------
| Name  | Team  | Score | EventTime | ProcTime |
------------------------------------------------
| Julie | TeamX |     5 |  12:00:26 | 12:05:19 |
| Frank | TeamX |     9 |  12:01:26 | 12:08:19 |
| Ed    | TeamX |     7 |  12:02:26 | 12:05:39 |
| Julie | TeamX |     8 |  12:03:06 | 12:07:06 |
| Amy   | TeamX |     3 |  12:03:39 | 12:06:13 |
| Fred  | TeamX |     4 |  12:04:19 | 12:06:39 |
| Naomi | TeamX |     3 |  12:06:39 | 12:07:19 |
| Becky | TeamX |     8 |  12:07:26 | 12:08:39 |
| Naomi | TeamX |     1 |  12:07:46 | 12:09:00 |
------------------------------------------------

如果你回忆一下,我们在第二章的时候就看到了这张表,那时我第一次介绍了这个数据集。这个渲染提供了比我们通常展示的更多关于数据的细节,明确地突出了这九个分数属于七个不同用户,每个用户都是同一个团队的成员。在我们开始深入示例之前,SQL 提供了一个很好的、简洁的方式来看到数据的完整布局。

这种数据视图的另一个好处是,它完全捕获了每条记录的事件时间和处理时间。你可以想象事件时间列只是原始数据的另一部分,而处理时间列是系统提供的东西(在这种情况下,使用一个假设的Sys.MTime列记录给定行的处理时间修改时间戳;也就是说,记录本身进入系统的时间)。

SQL 的有趣之处在于它可以很容易地以不同的方式查看数据。例如,如果我们希望以处理时间顺序查看数据(图 8-6 中从下到上),我们可以简单地更新ORDER BY子句:

*12:10> SELECT TABLE *, Sys.MTime as ProcTime*
 *FROM UserScores ORDER BY ProcTime;*
-----------------------------------------------
| Name  | Team  | Score | EventTime | ProcTime |
-----------------------------------------------
| Julie | TeamX |     5 |  12:00:26 | 12:05:19 |
| Ed    | TeamX |     7 |  12:02:26 | 12:05:39 |
| Amy   | TeamX |     3 |  12:03:39 | 12:06:13 |
| Fred  | TeamX |     4 |  12:04:19 | 12:06:39 |
| Julie | TeamX |     8 |  12:03:06 | 12:07:06 |
| Naomi | TeamX |     3 |  12:06:39 | 12:07:19 |
| Frank | TeamX |     9 |  12:01:26 | 12:08:19 |
| Becky | TeamX |     8 |  12:07:26 | 12:08:39 |
| Naomi | TeamX |     1 |  12:07:46 | 12:09:00 |
------------------------------------------------

正如我们之前学到的,这些数据的表格渲染实际上是对完整底层 TVR 的部分保真视图。如果我们改为查询完整的面向表的TVR(但为了简洁起见,只查询三个最重要的列),它会扩展到像这样:

*12:10> SELECT TVR Score, EventTime, Sys.MTime as ProcTime
       FROM UserScores ORDER BY ProcTime;*
-----------------------------------------------------------------------
|         [-inf, 12:05:19)         |       [12:05:19, 12:05:39)       | 
| -------------------------------- | -------------------------------- | 
| | Score | EventTime | ProcTime | | | Score | EventTime | ProcTime | |
| -------------------------------- | -------------------------------- |
| -------------------------------- | |     5 |  12:00:26 | 12:05:19 | |
|                                  | -------------------------------- |
|                                  |                                  |
-----------------------------------------------------------------------
|       [12:05:39, 12:06:13)       |       [12:06:13, 12:06:39)       | 
| -------------------------------- | -------------------------------- | 
| | Score | EventTime | ProcTime | | | Score | EventTime | ProcTime | |
| -------------------------------- | -------------------------------- |
| |     5 |  12:00:26 | 12:05:19 | | |     5 |  12:00:26 | 12:05:19 | |
| |     7 |  12:02:26 | 12:05:39 | | |     7 |  12:02:26 | 12:05:39 | |
| -------------------------------- | |     3 |  12:03:39 | 12:06:13 | |
|                                  | -------------------------------- |
-----------------------------------------------------------------------
|       [12:06:39, 12:07:06)       |       [12:07:06, 12:07:19)       |
| -------------------------------- | -------------------------------- |
| | Score | EventTime | ProcTime | | | Score | EventTime | ProcTime | |
| -------------------------------- | -------------------------------- |
| |     5 |  12:00:26 | 12:05:19 | | |     5 |  12:00:26 | 12:05:19 | |
| |     7 |  12:02:26 | 12:05:39 | | |     7 |  12:02:26 | 12:05:39 | |
| |     3 |  12:03:39 | 12:06:13 | | |     3 |  12:03:39 | 12:06:13 | |
| |     4 |  12:04:19 | 12:06:39 | | |     4 |  12:04:19 | 12:06:39 | |
| -------------------------------- | |     8 |  12:03:06 | 12:07:06 | |
|                                  | -------------------------------- |
-----------------------------------------------------------------------
|       [12:07:19, 12:08:19)       |       [12:08:19, 12:08:39)       | 
| -------------------------------- | -------------------------------- | 
| | Score | EventTime | ProcTime | | | Score | EventTime | ProcTime | |
| -------------------------------- | -------------------------------- |
| |     5 |  12:00:26 | 12:05:19 | | |     5 |  12:00:26 | 12:05:19 | |
| |     7 |  12:02:26 | 12:05:39 | | |     7 |  12:02:26 | 12:05:39 | |
| |     3 |  12:03:39 | 12:06:13 | | |     3 |  12:03:39 | 12:06:13 | |
| |     4 |  12:04:19 | 12:06:39 | | |     4 |  12:04:19 | 12:06:39 | |
| |     8 |  12:03:06 | 12:07:06 | | |     8 |  12:03:06 | 12:07:06 | |
| |     3 |  12:06:39 | 12:07:19 | | |     3 |  12:06:39 | 12:07:19 | |
| -------------------------------- | |     9 |  12:01:26 | 12:08:19 | |
|                                  | -------------------------------- |
|                                  |                                  |
-----------------------------------------------------------------------
|       [12:08:39, 12:09:00)       |         [12:09:00, now)          |
| -------------------------------- | -------------------------------- |
| | Score | EventTime | ProcTime | | | Score | EventTime | ProcTime | |
| -------------------------------- | -------------------------------- |
| |     5 |  12:00:26 | 12:05:19 | | |     5 |  12:00:26 | 12:05:19 | |
| |     7 |  12:02:26 | 12:05:39 | | |     7 |  12:02:26 | 12:05:39 | |
| |     3 |  12:03:39 | 12:06:13 | | |     3 |  12:03:39 | 12:06:13 | |
| |     4 |  12:04:19 | 12:06:39 | | |     4 |  12:04:19 | 12:06:39 | |
| |     8 |  12:03:06 | 12:07:06 | | |     8 |  12:03:06 | 12:07:06 | |
| |     3 |  12:06:39 | 12:07:19 | | |     3 |  12:06:39 | 12:07:19 | |
| |     9 |  12:01:26 | 12:08:19 | | |     9 |  12:01:26 | 12:08:19 | |
| |     8 |  12:07:26 | 12:08:39 | | |     8 |  12:07:26 | 12:08:39 | |
| -------------------------------- | |     1 |  12:07:46 | 12:09:00 | |
|                                  | -------------------------------- |
-----------------------------------------------------------------------

这是很多数据。另外,STREAM版本在这种情况下会更紧凑地呈现;由于关系中没有显式的分组,它看起来与之前的点时间TABLE呈现基本相同,另外还有一个尾部描述了迄今为止流中捕获的处理时间范围,以及系统仍在等待流中的更多数据(假设我们将流视为无界;我们很快将看到流的有界版本):

*12:00> SELECT STREAM Score, EventTime, Sys.MTime as ProcTime FROM UserScores;*
--------------------------------
| Score | EventTime | ProcTime |
--------------------------------
|     5 |  12:00:26 | 12:05:19 |
|     7 |  12:02:26 | 12:05:39 |
|     3 |  12:03:39 | 12:06:13 |
|     4 |  12:04:19 | 12:06:39 |
|     8 |  12:03:06 | 12:07:06 |
|     3 |  12:06:39 | 12:07:19 |
|     9 |  12:01:26 | 12:08:19 |
|     8 |  12:07:26 | 12:08:39 |
|     1 |  12:07:46 | 12:09:00 |
........ [12:00, 12:10] ........

但这只是查看原始输入记录,没有任何形式的转换。当我们开始改变关系时,更有趣的是。在过去的探索中,我们总是从经典的批处理开始,对整个数据集的分数进行求和,所以让我们在这里也这样做。第一个示例管道(之前作为示例 6-1 提供)在 Beam 中看起来像示例 8-1。

示例 8-1. 求和管道
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals =
  input.apply(Sum.integersPerKey());

在世界的流和表视图中呈现,该流水线的执行看起来像图 8-7。

<assets/stsy_0807.mp4>

图 8-7. 经典批处理的流和表视图

鉴于我们已经将数据放入了适当的模式中,我们不会在 SQL 中进行任何解析;相反,我们专注于解析转换之后的所有流水线中的一切。因为我们采用的是传统的批处理模型,在处理完所有输入数据之后才会检索单个答案,所以求和关系的TABLESTREAM视图看起来基本相同(请记住,对于这些初始的批处理样例,我们处理的是数据集的有界版本;因此,这个STREAM查询实际上以一行短横线和一个END-OF-STREAM标记终止):

*12:10> SELECT TABLE SUM(Score) as Total, MAX(EventTime),*
 *MAX(Sys.MTime) as "MAX(ProcTime)" FROM UserScores GROUP BY Team;*
------------------------------------------
| Total | MAX(EventTime) | MAX(ProcTime) |
------------------------------------------
|    48 |       12:07:46 |      12:09:00 |
------------------------------------------
*12:00> SELECT STREAM SUM(Score) as Total, MAX(EventTime),*
 *MAX(Sys.MTime) as "MAX(ProcTime)" FROM UserScores GROUP BY Team;*
------------------------------------------
| Total | MAX(EventTime) | MAX(ProcTime) |
------------------------------------------
|    48 |       12:07:46 |      12:09:00 |
------ [12:00, 12:10] END-OF-STREAM ------

更有趣的是当我们开始将窗口加入到混合中时。这将让我们有机会更仔细地查看需要添加到 SQL 中以支持鲁棒流处理的时间操作。

何处:窗口

正如我们在第六章中学到的,窗口是对按键分组的修改,其中窗口成为分层键的次要部分。与经典的程序化批处理一样,你可以通过简单地将时间作为GROUP BY参数的一部分,很容易地在现有的 SQL 中将数据窗口化。或者,如果所涉及的系统提供了,你可以使用内置的窗口操作。我们马上看一下两者的 SQL 示例,但首先,让我们重新访问第三章中的程序化版本。回想一下例子 6-2,窗口化的 Beam 流水线看起来就像例子 8-2 中所示的那样。

例子 8-2. 求和流水线
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES)))
  .apply(Sum.integersPerKey());

而该流水线的执行(从图 6-5 中的流和表呈现),看起来像图 8-8 中呈现的图表。

<assets/stsy_0808.mp4>

图 8-8. 批处理引擎上窗口求和的流和表视图

正如我们之前看到的,从图 8-7 到 8-8 的唯一实质性变化是由SUM操作创建的表现现在被分成了固定的两分钟时间窗口,最终产生了四个窗口化的答案,而不是之前的单个全局总和。

在 SQL 中做同样的事情,我们有两个选择:通过在GROUP BY语句中包含窗口的某个唯一特征(例如结束时间戳)来隐式地进行窗口操作,或者使用内置的窗口操作。让我们来看看两者。

首先是临时窗口。在这种情况下,我们在 SQL 语句中自己执行计算窗口的数学运算:

*12:10> SELECT TABLE SUM(Score) as Total,* 
 *"" || EventTime / INTERVAL '2' MINUTES || ", " ||* 
 *(EventTime / INTERVAL '2' MINUTES) + INTERVAL '2' MINUTES ||*
 *")" as Window,* 
 *MAX(Sys.MTime) as "MAX(ProcTime)"*
 *FROM UserScores*
 *GROUP BY Team, EventTime / INTERVAL '2' MINUTES;*
------------------------------------------------
| Total | Window               | MAX(ProcTime) |
------------------------------------------------
| 14    | [12:00:00, 12:02:00) | 12:08:19      |
| 18    | [12:02:00, 12:04:00) | 12:07:06      |
| 4     | [12:04:00, 12:06:00) | 12:06:39      |
| 12    | [12:06:00, 12:08:00) | 12:09:00      |
------------------------------------------------

我们也可以使用显式的窗口语句来实现相同的结果,比如 Apache Calcite 支持的那些。

*12:10> SELECT TABLE SUM(Score) as Total,*
 *TUMBLE(EventTime, INTERVAL '2' MINUTES) as Window,*
 *MAX(Sys.MTime) as 'MAX(ProcTime)'* 
 *FROM UserScores*
 *GROUP BY Team, TUMBLE(EventTime, INTERVAL '2' MINUTES);*
------------------------------------------------
| Total | Window               | MAX(ProcTime) |
------------------------------------------------
| 14    | [12:00:00, 12:02:00) | 12:08:19      |
| 18    | [12:02:00, 12:04:00) | 12:07:06      |
| 4     | [12:04:00, 12:06:00) | 12:06:39      |
| 12    | [12:06:00, 12:08:00) | 12:09:00      |
------------------------------------------------

这就引出了一个问题:如果我们可以使用现有的 SQL 构造隐式地进行窗口操作,为什么还要支持显式的窗口构造呢?有两个原因,这个例子中只有第一个原因是明显的(我们将在本章后面看到另一个原因):

  1. 窗口化为你处理窗口计算数学。当你直接指定基本参数如宽度和滑动时,要保持一致地正确得到结果要容易得多,而不是自己计算窗口数学。¹⁴
  2. 窗口允许简洁地表达更复杂、动态的分组,比如会话。尽管 SQL 在技术上能够表达定义会话窗口的另一个元素时间间隔内的每个元素的关系,但相应的表达式是一团乱麻的分析函数、自连接和数组展开,普通人不可能合理地自己构造出来。

这两个都是支持 SQL 中提供一流的窗口构造的有力论据,除了已经存在的临时窗口功能。

到目前为止,当我们将数据作为表格消耗时,我们已经从经典的批处理/经典关系的角度看到了窗口的样子。但是,如果我们想将数据作为流来消耗,我们就回到了 Beam 模型中的第三个问题:在处理时间中,我们何时实现输出?

何时:触发器

与以前一样,这个问题的答案是触发器和水印。然而,在 SQL 的上下文中,有一个强有力的论点支持使用不同的默认值,而不是我们在第三章中引入的 Beam 模型的默认值:与其默认使用单个水印触发器,不如从物化视图中获取灵感,并在每个元素上触发。换句话说,每当有新的输入到达时,我们就会产生相应的新输出。

SQL 风格的默认值:每条记录触发器

使用每条记录触发器作为默认值有两个强有力的好处:

简单性

每条记录更新的语义易于理解;物化视图多年来一直以这种方式运作。

忠实度

与变更数据捕获系统一样,每条记录触发产生了给定时变关系的完整保真度流呈现;在转换过程中没有丢失任何信息。

缺点主要是成本:触发器总是应用于分组操作之后,而分组的性质通常提供了减少通过系统流动的数据的基数的机会,从而相应地减少了下游处理这些聚合结果的成本。即便如此,在成本不是禁锢的用例中,清晰和简单的好处可以说超过了默认使用非完整保真度触发器的认知复杂性。

因此,对于我们首次尝试将团队得分作为流来消耗的情况,让我们看看使用每条记录触发器会是什么样子。Beam 本身没有精确的每条记录触发器,因此,如示例 8-3 所示,我们使用重复的AfterCount(1)触发器,每当有新记录到达时就会立即触发。

示例 8-3。每条记录触发器
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(Repeatedly(AfterCount(1)))
  .apply(Sum.integersPerKey());

然后,这个管道的流和表格呈现将看起来像图 8-9 中所示的样子。

<assets/stsy_0809.mp4>

图 8-9。流和表格视图的窗口求和在具有每条记录触发的流引擎上

使用每条记录触发器的一个有趣的副作用是,它在某种程度上掩盖了数据被静止的效果,因为触发器立即将其重新激活。即使如此,来自分组的聚合物件仍然静止在表中,而未分组的值流则从中流走。

回到 SQL,我们现在可以看到将相应的时间-值关系呈现为流的效果会是什么样子。它(不出所料)看起来很像图 8-9 中动画中的值流:

*12:00> SELECT STREAM SUM(Score) as Total,* 
 *TUMBLE(EventTime, INTERVAL '2' MINUTES) as Window,*
 *MAX(Sys.MTime) as 'MAX(ProcTime)''* 
 *FROM UserScores*
 *GROUP BY Team, TUMBLE(EventTime, INTERVAL '2' MINUTES);*
------------------------------------------------
| Total | Window               | MAX(ProcTime) |
------------------------------------------------
| 5     | [12:00:00, 12:02:00) | 12:05:19      |
| 7     | [12:02:00, 12:04:00) | 12:05:39      |
| 10    | [12:02:00, 12:04:00) | 12:06:13      |
| 4     | [12:04:00, 12:06:00) | 12:06:39      |
| 18    | [12:02:00, 12:04:00) | 12:07:06      |
| 3     | [12:06:00, 12:08:00) | 12:07:19      |
| 14    | [12:00:00, 12:02:00) | 12:08:19      |
| 11    | [12:06:00, 12:08:00) | 12:08:39      |
| 12    | [12:06:00, 12:08:00) | 12:09:00      |
................ [12:00, 12:10] ................

但即使对于这个简单的用例来说,它也是相当啰嗦的。如果我们要构建一个处理大规模移动应用程序数据的管道,我们可能不希望为每个上游用户分数的下游更新付出成本。这就是自定义触发器发挥作用的地方。

水印触发器

如果我们将 Beam 管道切换为使用水印触发器,例如,我们可以在 TVR 的流版本中每个窗口获得一个输出,如示例 8-4 所示,并如图 8-10 所示。

示例 8-4。水印触发器
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(AfterWatermark())
  .apply(Sum.integersPerKey());

<assets/stsy_0810.mp4>

图 8-10。带水印触发的窗口求和

要在 SQL 中获得相同的效果,我们需要语言支持来指定自定义触发器。类似于EMIT *<when>*语句,比如EMIT WHEN WATERMARK PAST *<column>*。这将向系统发出信号,即聚合创建的表应该在输入水印超过指定列中的时间戳值时触发一次流,这在这种情况下恰好是窗口的结束时间。

让我们看一下这个关系呈现为流。从理解触发器触发发生的时间的角度来看,停止依赖于原始输入的MTime值,并且捕获流中的行发出的当前时间戳也是很方便的:

*12:00> SELECT STREAM SUM(Score) as Total,*
 *TUMBLE(EventTime, INTERVAL '2' MINUTES) as Window,*
 *CURRENT_TIMESTAMP as EmitTime*
 *FROM UserScores*
 *GROUP BY Team, TUMBLE(EventTime, INTERVAL '2' MINUTES)*
 *EMIT WHEN WATERMARK PAST WINDOW_END(Window);*
-------------------------------------------
| Total | Window               | EmitTime |
-------------------------------------------
| 5     | [12:00:00, 12:02:00) | 12:06:00 |
| 18    | [12:02:00, 12:04:00) | 12:07:30 |
| 4     | [12:04:00, 12:06:00) | 12:07:41 |
| 12    | [12:06:00, 12:08:00) | 12:09:22 |
............. [12:00, 12:10] ..............

这里的主要缺点是由于启发式水印的使用而导致的延迟数据问题,正如我们在前几章中遇到的那样。考虑到延迟数据,一个更好的选择可能是在每次出现延迟记录时立即输出更新,使用支持重复延迟触发的水印触发器的变体,如示例 8-5 和图 8-11 所示。

示例 8-5。带有延迟触发的水印触发器
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(AfterWatermark()
                   .withLateFirings(AfterCount(1))))
  .apply(Sum.integersPerKey());

<assets/stsy_0811.mp4>

图 8-11。带有准时/延迟触发的窗口求和

我们可以通过允许指定两个触发器来在 SQL 中做同样的事情:

  • 一个水印触发器给我们一个初始值:WHEN WATERMARK PAST *<column>*,窗口的结束时间被用作时间戳*<column>*
  • 用于延迟数据的重复延迟触发器:AND THEN AFTER *<duration>*,其中*<duration>*为 0,以给出每条记录的语义。

现在我们每个窗口可以获得多行,还可以有另外两个系统列可用:每行/窗格相对于水印的时间(Sys.EmitTiming),以及每个窗口的窗格/行的索引(Sys.EmitIndex,用于标识给定行/窗口的修订序列):

*12:00> SELECT STREAM SUM(Score) as Total,*
 *TUMBLE(EventTime, INTERVAL '2' MINUTES) as Window,*
 *CURRENT_TIMESTAMP as EmitTime,*
 *Sys.EmitTiming, Sys.EmitIndex* 
 *FROM UserScores*
 *GROUP BY Team, TUMBLE(EventTime, INTERVAL '2' MINUTES)*
 *EMIT WHEN WATERMARK PAST WINDOW_END(Window)*
 *AND THEN AFTER 0 SECONDS;*
----------------------------------------------------------------------------
| Total | Window               | EmitTime | Sys.EmitTiming | Sys.EmitIndex |
----------------------------------------------------------------------------
| 5     | [12:00:00, 12:02:00) | 12:06:00 | on-time        | 0             |
| 18    | [12:02:00, 12:04:00) | 12:07:30 | on-time        | 0             |
| 4     | [12:04:00, 12:06:00) | 12:07:41 | on-time        | 0             |
| 14    | [12:00:00, 12:02:00) | 12:08:19 | late           | 1             |
| 12    | [12:06:00, 12:08:00) | 12:09:22 | on-time        | 0             |
.............................. [12:00, 12:10] ..............................

使用这个触发器,对于每个窗格,我们能够得到一个准时的答案,这很可能是正确的,这要归功于我们的启发式水印。对于任何延迟到达的数据,我们可以得到一行的更新版本,修正我们之前的结果。

重复延迟触发器

你可能想要的另一个主要时间触发器用例是重复延迟更新;也就是说,在任何新数据到达后的一分钟(在处理时间上)触发窗口。请注意,这与在微批处理系统中触发对齐边界是不同的。正如示例 8-6 所示,通过相对于窗口/行的最近新记录到达的延迟触发,有助于更均匀地分散触发负载,而不像突发的对齐触发那样。它也不需要任何水印支持。图 8-12 呈现了结果。

示例 8-6。重复触发,延迟一分钟
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(Repeatedly(UnalignedDelay(ONE_MINUTE)))
  .apply(Sum.integersPerKey());

<assets/stsy_0812.mp4>

图 8-12。带有重复一分钟延迟触发的窗口求和

使用这样的触发器的效果与我们最初开始的每条记录触发非常相似,但由于触发中引入了额外的延迟,稍微减少了一些冗余,这使得系统能够省略产生的某些行。调整延迟可以让我们调节生成的数据量,从而平衡成本和及时性的张力,以适应使用情况。

作为 SQL 流呈现,它可能看起来像这样:

*12:00> SELECT STREAM SUM(Score) as Total,*
 *TUMBLE(EventTime, INTERVAL '2' MINUTES) as Window,*
 *CURRENT_TIMESTAMP as EmitTime,*
 *Sys.EmitTiming, SysEmitIndex*
 *FROM UserScores*
 *GROUP BY Team, TUMBLE(EventTime, INTERVAL '2' MINUTES)*
 *EMIT AFTER 1 MINUTE;*
----------------------------------------------------------------------------
| Total | Window               | EmitTime | Sys.EmitTiming | Sys.EmitIndex |
----------------------------------------------------------------------------
| 5     | [12:00:00, 12:02:00) | 12:06:19 | n/a            | 0             |
| 10    | [12:02:00, 12:04:00) | 12:06:39 | n/a            | 0             |
| 4     | [12:04:00, 12:06:00) | 12:07:39 | n/a            | 0             |
| 18    | [12:02:00, 12:04:00) | 12:08:06 | n/a            | 1             |
| 3     | [12:06:00, 12:08:00) | 12:08:19 | n/a            | 0             |
| 14    | [12:00:00, 12:02:00) | 12:09:19 | n/a            | 1             |
| 12    | [12:06:00, 12:08:00) | 12:09:22 | n/a            | 1             |
.............................. [12:00, 12:10] ..............................
数据驱动触发器

在进入梁模型的最后一个问题之前,值得简要讨论“数据驱动触发器”的概念。由于 SQL 中处理类型的动态方式,似乎数据驱动触发器会是提议的EMIT *<when>*子句的一个非常自然的补充。例如,如果我们想在总分超过 10 时触发我们的总和,类似EMIT WHEN Score > 10的东西会非常自然地工作吗?

嗯,是的和不。是的,这样的构造会非常自然。但是当你考虑这样一个构造实际上会发生什么时,你基本上会在每条记录上触发,然后执行Score > 10谓词来决定触发的行是否应该向下游传播。你可能还记得,这听起来很像HAVING子句的情况。实际上,你可以通过简单地在查询的末尾添加HAVING Score > 10来获得完全相同的效果。在这一点上,它引出了一个问题:值得添加显式的数据驱动触发器吗?可能不值得。即便如此,看到使用标准 SQL 和精心选择的默认值如何轻松地获得所需的数据驱动触发器效果仍然令人鼓舞。

如何:累积

到目前为止,在本节中,我们一直忽略了我在本章开头介绍的Sys.Undo列。因此,我们默认使用累积模式来回答窗口/行的细化如何相互关联的问题。换句话说,每当我们观察到聚合行的多个修订时,后续的修订都建立在前面的修订之上,将新的输入与旧的输入累积在一起。我选择这种方法是因为它与早期章节中使用的方法相匹配,并且相对于表世界中的工作方式,这是一个相对简单的转换。

也就是说,累积模式有一些主要缺点。实际上,正如我们在第二章中讨论的那样,对于具有两个或更多分组操作序列的任何查询/管道来说,它对于过度计数是明显错误的。在允许包含多个序列分组操作的查询的系统中,允许对行的多个修订进行消耗的唯一明智的方法是默认情况下以累积和撤销模式运行。否则,由于对单行的多个修订的盲目合并,会出现一个给定输入记录在单个聚合中被多次包含的问题。

因此,当我们考虑将累积模式语义纳入 SQL 世界时,最符合我们提供直观和自然体验目标的选项是系统在底层默认使用撤销。正如我之前介绍Sys.Undo列时所指出的,如果你不关心撤销(就像直到现在本节中的示例一样),你不需要要求它们。但是如果你要求它们,它们应该在那里。

在 SQL 世界中的撤销

为了说明我的意思,让我们看另一个例子。为了适当地激发问题,让我们看一个相对不切实际的用例,即构建会话窗口并将它们逐步写入到 HBase 等键值存储中。在这种情况下,我们将从聚合中产生增量会话,但在许多情况下,给定的会话只是一个或多个先前会话的演变。在这种情况下,您真的希望删除先前的会话,并用新的会话替换它们。但是你该怎么做呢?判断给定的会话是否替换了另一个会话的唯一方法是将它们进行比较,看看新会话是否与旧会话重叠。但这意味着在管道的另一个部分中复制一些会话构建逻辑。更重要的是,这意味着您不再具有幂等输出,因此如果要保持端到端的一次性语义,就需要跳过一系列额外的步骤。更好的方法是,管道直接告诉您哪些会话被删除,哪些会话被替换。这就是撤销给您的东西。

要看到这个示例的效果(以及 SQL 中的效果),让我们修改我们的示例管道,计算具有一分钟间隔的会话窗口。为了简单和清晰起见,我们回到使用默认的每条记录触发。请注意,我还将处理时间内的一些数据点移动到这些会话示例中,以使图表更清晰;事件时间戳保持不变。更新后的数据集如下(用黄色突出显示了移动的处理时间戳):

*12:00> SELECT STREAM Score, EventTime, Sys.MTime as ProcTime* 
 *FROM UserScoresForSessions;*
--------------------------------
| Score | EventTime | ProcTime |
--------------------------------
|     5 |  12:00:26 | 12:05:19 |
|     7 |  12:02:26 | 12:05:39 |
|     3 |  12:03:39 | 12:06:13 |
|     4 |  12:04:19 | 12:06:46 |  # Originally 12:06:39
|     3 |  12:06:39 | 12:07:19 |
|     8 |  12:03:06 | 12:07:33 |  # Originally 12:07:06
|     8 |  12:07:26 | 12:08:13 |  # Originally 12:08:39
|     9 |  12:01:26 | 12:08:19 |
|     1 |  12:07:46 | 12:09:00 |
........ [12:00, 12:10] ........

首先,让我们看一下没有撤销的管道。在清楚了为什么该管道对于将增量会话写入键/值存储的用例是有问题之后,我们将看一下带有撤销的版本。

不撤销管道的 Beam 代码看起来像示例 8-7。图 8-13 显示了结果。

示例 8-7。具有每条记录触发和累积但没有撤销的会话窗口
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(Sessions.withGapDuration(ONE_MINUTE))
               .triggering(Repeatedly(AfterCount(1))
               .accumulatingFiredPanes())
  .apply(Sum.integersPerKey());

<assets/stsy_0813.mp4>

图 8-13。使用累积但没有撤销的会话窗口总结

最后,在 SQL 中呈现的输出流将如下所示:

*12:00> SELECT STREAM SUM(Score) as Total,*
 *SESSION(EventTime, INTERVAL '1' MINUTE) as Window,*
 *CURRENT_TIMESTAMP as EmitTime*
 *FROM UserScoresForSessions*
 *GROUP BY Team, SESSION(EventTime, INTERVAL '1' MINUTE);*
-------------------------------------------
| Total | Window               | EmitTime |
-------------------------------------------
| 5     | [12:00:26, 12:01:26) | 12:05:19 |
| 7     | [12:02:26, 12:03:26) | 12:05:39 |
| 3     | [12:03:39, 12:04:39) | 12:06:13 |
| 7     | [12:03:39, 12:05:19) | 12:06:46 |
| 3     | [12:06:39, 12:07:39) | 12:07:19 |
| 22    | [12:02:26, 12:05:19) | 12:07:33 |
| 11    | [12:06:39, 12:08:26) | 12:08:13 |
| 36    | [12:00:26, 12:05:19) | 12:08:19 |
| 12    | [12:06:39, 12:08:46) | 12:09:00 |
............. [12:00, 12:10] ..............

在这里要注意的重要事情(在动画和 SQL 渲染中)是增量会话流的样子。从我们的整体观点来看,很容易在动画中直观地识别出哪些后续会话取代了之前的会话。但是想象一下,逐个接收这个流中的元素(就像在 SQL 列表中一样),并需要以一种最终使 HBase 表只包含两个最终会话(值为 36 和 12)的方式将它们写入 HBase。你会怎么做呢?嗯,你需要进行一系列的读取-修改-写入操作,读取一个键的所有现有会话,将它们与新会话进行比较,确定哪些会话重叠,删除过时的会话,最后为新会话发出写入操作——所有这些都需要额外的成本,并且会丧失幂等性,最终导致无法提供端到端的、一次性的语义。这是不切实际的。

然后,将这个与启用撤销的相同管道进行对比,就像示例 8-8 和图 8-14 中所示的那样。

示例 8-8。具有每条记录触发、累积和撤销的会话窗口
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(Sessions.withGapDuration(ONE_MINUTE))
               .triggering(Repeatedly(AfterCount(1))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

<assets/stsy_0814.mp4>

图 8-14。使用累积和撤销的会话窗口总结

最后,在 SQL 形式上。对于 SQL 版本,我们假设系统默认情况下正在使用撤销,并且每当我们请求特殊的Sys.Undo列时,单独的撤销行就会在流中实现。正如我最初描述的那样,该列的价值在于它允许我们区分撤销行(在Sys.Undo列中标记为“撤销”)和正常行(在这里Sys.Undo列中未标记,以便更清晰地对比,尽管它们也可以被标记为“重做”):

*12:00> SELECT STREAM SUM(Score) as Total,*
 *SESSION(EventTime, INTERVAL '1' MINUTE) as Window,*
 *CURRENT_TIMESTAMP as EmitTime,*
 *Sys.Undo as Undo*
 *FROM UserScoresForSessions*
 *GROUP BY Team, SESSION(EventTime, INTERVAL '1' MINUTE);*
--------------------------------------------------
| Total | Window               | EmitTime | Undo |
--------------------------------------------------
| 5     | [12:00:26, 12:01:26) | 12:05:19 |      |
| 7     | [12:02:26, 12:03:26) | 12:05:39 |      |
| 3     | [12:03:39, 12:04:39) | 12:06:13 |      |
| 3     | [12:03:39, 12:04:39) | 12:06:46 | undo |
| 7     | [12:03:39, 12:05:19) | 12:06:46 |      |
| 3     | [12:06:39, 12:07:39) | 12:07:19 |      |
| 7     | [12:02:26, 12:03:26) | 12:07:33 | undo |
| 7     | [12:03:39, 12:05:19) | 12:07:33 | undo |
| 22    | [12:02:26, 12:05:19) | 12:07:33 |      |
| 3     | [12:06:39, 12:07:39) | 12:08:13 | undo |
| 11    | [12:06:39, 12:08:26) | 12:08:13 |      |
| 5     | [12:00:26, 12:01:26) | 12:08:19 | undo |
| 22    | [12:02:26, 12:05:19) | 12:08:19 | undo |
| 36    | [12:00:26, 12:05:19) | 12:08:19 |      |
| 11    | [12:06:39, 12:08:26) | 12:09:00 | undo |
| 12    | [12:06:39, 12:08:46) | 12:09:00 |      |
................. [12:00, 12:10] .................

包括撤销在内,会话流不再仅包括新会话,还包括已被替换的旧会话的撤销。有了这个流,随着时间的推移,逐步构建 HBase 中的会话集变得微不足道:您只需在新会话到达时写入新会话(未标记为“重做”行),并在它们被撤销时删除旧会话(“撤销”行)。好得多!

丢弃模式,或者缺乏丢弃模式

通过这个例子,我们展示了如何简单而自然地将撤销纳入 SQL 中,以提供累积模式累积和撤销模式语义。但是丢弃模式呢?

对于特定用例,例如通过单个分组操作部分聚合高容量输入数据,然后将其写入支持聚合的存储系统(例如类似数据库的系统),丢弃模式可以作为节省资源的选项非常有价值。但在那些相对狭窄的用例之外,丢弃模式是令人困惑和容易出错的。因此,将其直接纳入 SQL 可能并不值得。需要它的系统可以在 SQL 语言本身之外提供它作为一个选项。那些不需要的系统可以简单地提供更自然的默认值累积和撤销模式,并在不需要时忽略撤销。

总结

这是一个漫长而迷人的旅程。我们在本章中涵盖了大量信息,让我们花点时间来反思一下。

首先,我们推断出流处理和非流处理数据处理之间的关键区别是时间的增加维度。我们观察到关系(关系代数的基础数据对象,它本身是 SQL 的基础)本身随时间演变,并从中推导出了TVR的概念,它将关系的演变捕捉为经典快照关系的序列。从这个定义中,我们能够看到关系代数的闭包性质在 TVR 的世界中保持完整,这意味着整套关系运算符(因此也是 SQL 构造)在我们从瞬时快照关系的世界转移到流兼容的 TVR 的世界时继续像预期的那样运行。

其次,我们探讨了 Beam 模型和经典 SQL 模型中固有的偏见,得出结论 Beam 具有面向流的方法,而 SQL 采用面向表的方法。

最后,我们看了一下需要对 SQL 进行语言扩展以支持健壮流处理的假设性语言扩展,以及一些精心选择的默认值,这些默认值可以大大减少对这些扩展的需求:

表/流选择

鉴于任何时变关系都可以以两种不同的方式呈现(表或流),我们需要在实现查询结果时选择所需的呈现方式。我们引入了TABLESTREAMTVR关键字,以提供一种明确选择所需呈现方式的方式。

更好的是不需要明确指定选择,这就是好的默认值的作用。如果所有输入都是表,那么输出为表是一个很好的默认值;这给了您所习惯的经典关系查询行为。相反,如果任何输入是流,则输出为流是一个合理的默认值。

窗口化

虽然你可以使用现有的 SQL 构造声明一些类型的简单窗口,但是具有显式窗口化运算符仍然具有价值:

  • 窗口化运算符封装了窗口计算数学。
  • 窗口化允许简洁地表达复杂的、动态的分组,比如会话。

因此,添加用于分组的简单窗口化构造可以帮助使查询更少出错,同时还提供了(例如会话)在现有的声明性 SQL 中难以表达的功能。

水印

这不仅仅是 SQL 的扩展,而是一个系统级特性。如果所涉及的系统在内部集成了水印,它们可以与触发器一起使用,以在相信该行的输入已经完成后生成包含单个、权威版本的流。这对于那些不可能为结果轮询物化视图表的用例至关重要,而是必须直接将管道的输出作为流进行消耗。示例包括通知和异常检测。

触发器

触发器定义了从 TVR 创建的流的形状。如果未指定,默认应该是每条记录触发,这提供了与物化视图相匹配的直接和自然的语义。除了默认值,基本上有两种主要类型的有用触发器:

  • 水印触发器,用于在相信该窗口的输入已经完成时,为每个窗口产生单个输出。
  • 重复延迟触发器,用于提供周期性更新。

这两者的组合也可能很有用,特别是在启发式水印的情况下,以提供我们之前看到的早期/准时/迟的模式。

特殊系统列

当将 TVR 作为流进行消耗时,有一些有趣的元数据可能会很有用,而且最容易暴露为系统级列。我们看了四个:

Sys.MTime

给定行在 TVR 中上次修改的处理时间。

Sys.EmitTiming

行发出相对于水印的时间(早、准时、迟)。

Sys.EmitIndex

该行的发出版本的从零开始的索引。¹⁹

Sys.Undo

该行是正常行还是撤销(undo)。默认情况下,系统应该在内部使用撤销,这在可能存在一系列多个分组操作的任何时候是必要的。如果在将 TVR 呈现为流时未投影Sys.Undo列,那么只会返回正常行,这提供了在累积累积和撤销模式之间切换的简单方法。

使用 SQL 进行流处理并不需要很困难。事实上,SQL 中的流处理已经相当普遍,以物化视图的形式存在。真正重要的部分实际上归结为捕获数据集/关系随时间的演变(通过时变关系),提供在物理表或流表示之间进行选择的手段,以及提供关于时间的推理工具(窗口化、水印和触发器),这些我们在本书中一直在讨论的。而且,至关重要的是,你需要很好的默认值,以最小化这些扩展在实践中需要被使用的频率。

¹ 这里我所说的“有效关系”简单地是指对于给定操作符的应用是良好形式的关系。例如,对于 SQL 查询SELECT x FROM y,一个有效的关系 y 将是任何包含名为 x 的属性/列的关系。任何不包含这样命名属性的关系将是无效的,并且在实际数据库系统的情况下,将产生查询执行错误。

² 对 Julian Hyde 的这个名称和概念的简洁表达表示非常感谢。

³ 请注意,这里使用的Sys.Undo名称是在Apache Flink 的撤销/重做命名法的基础上进行的,我认为这是捕捉撤销和非撤销行的想法的一种非常简洁的方式。

⁴ 现在,在这个例子中,很容易发现新值 8 应该替换旧值 7,因为映射是 1:1。但当我们谈论会话时,我们将在稍后看到一个更复杂的例子,没有撤销作为指导,处理起来会更加困难。

⁵ 而且,这是一个需要记住的关键点。有一些系统主张将流和表视为相同,声称我们可以简单地将流视为永不结束的表。这种说法在某种程度上是准确的,因为真正的基础原语是时变关系,所有关系操作都可以等同地应用于任何时变关系,无论实际的物理表现形式是流还是表。但这种方法混淆了表和流为给定的时变关系提供的两种非常不同的视图类型。假装两个非常不同的东西是相同的,表面上看起来很简单,但这不是通向理解、清晰和正确的道路。

⁶ 这里指的是随时间变化的表;也就是我们一直在看的基于表的 TVR。

⁷ 这是朱利安·海德的礼貌。

⁸ 尽管各个项目中有许多正在进行的工作,试图简化触发/取消分组语义的规范。在 Flink 和 Beam 社区内部独立提出的最具说服力的建议是,触发器应该简单地在管道的输出处指定,并自动在整个管道中传播。通过这种方式,只需描述实际创建物化输出的流的期望形状;所有其他流的形状将从那里隐式地派生出来。

⁹ 当然,单个 SQL 查询的表达能力远远超过单个 MapReduce,因为它具有更少限制的操作和组合选项。

¹⁰ 请注意,我们在这里是在概念上讨论;当然,在实际执行中可以应用许多优化;例如,通过索引查找特定行而不是扫描整个表。

¹¹ 有多次提到这些查询的“MATERIALIZED”方面只是一种优化:从语义上讲,这些查询可以很容易地用通用的CREATE VIEW语句替换,这种情况下,数据库可能会在每次引用时重新生成整个视图。这是真的。我在这里使用MATERIALIZED变体的原因是,物化视图的语义是根据变化流增量更新视图表,这表明了它们背后的流式特性。也就是说,你可以根据发生的变化增量处理变化,也可以不时地重新处理整个输入数据集。这两种方式都是处理不断变化的数据表的有效方式。

¹² 虽然可以说 SQL 的表偏向可能是 SQL 在批处理中的根源。

¹³ 对于某些用例,捕获和使用给定记录的当前处理时间作为其未来事件时间可能是有用的(例如,当直接将事件记录到 TVR 中时,入口时间就是该记录的自然事件时间)。

¹⁴ 数学很容易出错。

15 默认情况下,使用撤回就足够了,而不仅仅是因为系统只需要选择使用撤回。有特定的用例;例如,具有单个分组操作的查询,其结果正在写入支持按键更新的外部存储系统,系统可以检测到不需要撤回并将其禁用作为优化。

16 请注意,仅仅在SELECT语句中简单添加新列就导致查询中出现新行有点奇怪。一个很好的替代方法是在不需要时通过WHERE子句过滤掉Sys.Undo行。

17 请注意,这种琐事只适用于最终一致性足够的情况。如果您需要始终在任何给定时间具有全局一致的视图,则必须 1)确保在其发出时间写入/删除(通过墓碑)每个会话,并且 2)仅从 HBase 表中的时间戳读取,该时间戳小于管道的输出水印(以使读取与会话合并时发生的多个独立写入/删除同步)。或者更好的是,直接从状态表中提供会话,而不是中间人。

18 明确地说,它们并非都是假设的。Calcite 支持本章描述的窗口构造。

19 请注意,在像会话这样的合并窗口的情况下,“索引”的定义变得复杂。一个合理的方法是取所有先前合并在一起的会话的最大值,并递增一。


相关文章
|
1月前
|
存储 编解码 算法
音视频编程ffmepg中的关键术语与概念:深度解析与实践(二)
音视频编程ffmepg中的关键术语与概念:深度解析与实践
114 0
|
1月前
|
存储 编解码 网络协议
音视频编程ffmepg中的关键术语与概念:深度解析与实践(一)
音视频编程ffmepg中的关键术语与概念:深度解析与实践
95 0
|
1月前
流式系统:第五章到第八章
流式系统:第五章到第八章
29 0
|
1月前
|
测试技术 数据处理 Apache
流式系统:前言到第四章
流式系统:前言到第四章
89 0
|
1月前
构建安全可靠的系统:第六章到第十章
构建安全可靠的系统:第六章到第十章
184 0
|
1月前
|
存储 传感器 数据挖掘
什么是流计算?请简要解释其概念和特点。
什么是流计算?请简要解释其概念和特点。
64 0
|
1月前
|
机器学习/深度学习 运维 算法
流计算中的流式机器学习是什么?请解释其作用和常用算法。
流计算中的流式机器学习是什么?请解释其作用和常用算法。
68 0
|
存储 数据建模 开发者
数仓建模理论与规范(二)| 学习笔记
快速学习数仓建模理论与规范。
207 1
数仓建模理论与规范(二)| 学习笔记
|
存储 传感器 算法
第八章 实验平台
第八章 实验平台
169 0
第八章 实验平台
|
Dubbo NoSQL Java
架构:第八章:查询的资料
架构:第八章:查询的资料