01一年前的流式基础设施
回顾一年前,OpenAI 的流式基础设施主要围绕 Kafka 及其生产者和消费者服务构建。Kafka 被广泛用于数据摄入、异步处理和服务间通信。随着 ChatGPT 的上线,Kafka 需求迅速增长,已成为支撑众多关键业务的核心基础设施之一。
我们面临的主要挑战之一是确保 Kafka 基础设施的可用性和可靠性。我们的 Kafka 基础设施构建在云上,曾一度拥有数十个 Kafka 集群。在云环境中,集群可能崩溃、区域可能失效、网络光缆也可能被切断。因此,单个 Kafka 集群可能成为依赖它的使用场景的单点故障。我们确实经历过单个 Kafka 集群故障对业务造成严重影响的实例。
为应对这一挑战,我们引入了“高可用组”(high availability group)的概念。一个高可用组将跨区域的多个物理 Kafka 集群组合在一起,以提供高可用性。这样,当某个集群故障时,我们可以绕过它,为生产者和消费者服务提供 HA 保障。例如,典型的 HA 组配置包括一个 West US 集群、一个 Central 集群和一个 East US 集群。
然而,HA 组中多集群的引入也为生产者和消费者服务带来了不小的复杂性,因为它们必须理解 Kafka 基础设施的底层拓扑。为解决这一问题,我们构建了生产者和消费者代理(proxy)进程,将基础设施细节对用户隐藏,所有复杂性都封装在代理之后。该代理为 Kafka 的生产和消费提供了一个简单且一致的接口。
例如,当 East US 集群开始故障时,生产者和消费者端的代理会将流量绕过故障集群。同样,我们也可以向 HA 组中添加一个新集群(例如 South US),而这一切对生产者和消费者服务都是透明的。
关于 Kafka 基础设施设置的更多细节,请参考我们团队在本次会议上关于 Kafka 迁移以及 OpenAI 如何简化 Kafka 消费的演讲。
02为何需要流处理?
随着 Kafka 使用量的增加,我们自然开始思考:流处理(stream processing)或 Apache Flink 能带来什么?
场景一:数据飞轮(Data Flywheel)
从高层次看,数据飞轮是一个自强化系统,其中数据生成、模型改进和产品使用不断相互促进,以推动性能和价值的提升。我们发现,更快地将产品使用数据反馈给大模型,实际上能带来有意义的差异。流处理技术可以通过提供一个可扩展的框架,在 OpenAI 的规模上近乎实时地处理和转换数据,从而帮助实现数据飞轮的目标。
场景二:实验数据处理与摄入
在当今的 AI 开发中,快速实验和迭代对模型开发至关重要。能够快速处理、关联并可视化实验结果,对于加速模型开发非常重要。在流处理出现之前,工程师和研究人员有时会为处理大量实验数据而构建自定义的临时系统。这些系统通常涉及复杂的关联或状态管理,并且由于在大规模运行系统时的挑战,也容易出现数据新鲜度问题。这正是 Apache Flink 等流处理技术可以大放异彩的地方——它为预处理实验数据提供了一个稳健且可扩展的基础。
除此之外,还有其他业务使用场景,我们稍后会详细介绍。
03构建流处理平台的挑战
接下来,我将谈谈我们在 OpenAI 构建流处理平台时遇到的一些挑战。
挑战一:Python
虽然大多数开源流处理技术都是基于 JVM 的,但在 AI 领域,Python 是事实上的标准语言。在 OpenAI,许多业务处理逻辑和服务都是用 Python 编写的,几乎没有 Java 支持。尽管 Apache Flink 提供了 Python 支持,但总体而言,其开发和采用相对较新,与 Java 版本相比也还不够成熟。
挑战二:云厂商的限制
我们经常发现,云厂商宣传的 Kubernetes 集群最大规模往往过于乐观——在实际生产中,受限于控制平面的性能瓶颈,我们很难稳定运行接近该上限的集群。此外,在实践中,由于某些区域的物理限制,我们很难从这些区域获得足够的容量。为了满足运行流处理工作负载的容量需求,我们从一开始就不得不在多个 Kubernetes 集群之上构建我们的平台。而且,正如之前提到的,在云环境中,集群和区域都可能失效,因此我们的平台也必须能够跨区域可靠运行。
挑战三:高可用 Kafka 集群带来的复杂性
最后但同样重要的是,我们之前提到的 HA Kafka 集群设置,也为运行 Apache Flink 等框架带来了挑战。在 Kafka HA 组设置中读取一个主题(Topic),实际上会转化为并行地从多个物理 Kafka 集群进行多次消费,如果实现不当,反而可能导致可用性降低。
04平台架构概览
在设计流处理平台时,我们始终牢记上述挑战。以下是我们的整体架构概览。
首先,我们决定使用 PyFlink 作为主要的流处理框架,并与 Flink 社区合作,持续改进 PyFlink。这使我们的所有用户都能利用 Apache Flink 提供的流处理技术,同时还能复用所有现有的 Python 库来构建他们的流处理管道。事实证明,使用 Python 也帮助提高了我们用户的开发速度和生产力。
其次,在每个 Flink Kubernetes 集群内部,我们使用开源的 Flink Kubernetes Operator 来管理 Flink 作业。我们在跨区域的活跃 Flink 集群之上构建了一个控制平面(control plane)抽象层。这使我们能够通过单一的控制平面集中管理所有 Flink 作业。
最后,我们还将 Flink 与 OpenAI 的 Kafka 生态系统进行了深度集成,以确保 Flink 能够与我们上面讨论的 Kafka HA 设置可靠地协同工作。
05平台架构细节
从宏观角度看,用户和其他平台(例如机器学习平台)通过控制平面抽象层与流处理平台交互。这里的控制平面旨在为管理所有流处理管道提供一个统一的入口。
为了让 Flink 对我们的工程师更易用,我们将其与现有的服务脚手架、测试、构建和部署基础设施进行了深度集成,使用户可以遵循与微服务开发相同的工作流程。控制平面将负责跨不同区域的多个 Kubernetes 集群协调作业管理。
在每个 Kubernetes 集群内部,我们使用开源的 Flink Kubernetes Operator 来编排 Flink 作业。该 Operator 为 Kubernetes 集群内的管道提供生命周期管理。我们将每个 Flink 作业作为 Flink Deployment 自定义资源运行。Flink 部署通过 Kubernetes 命名空间在不同团队和组织之间进行隔离。我们为每个命名空间运行一个专用的 Flink Kubernetes Operator。
虽然 Flink Kubernetes Operator 处理了 Flink 的大部分管道生命周期管理,但为了满足 OpenAI 的特定需求,我们还设置了一个跨集群的看门狗(watchdog)服务,用于监控 Flink 作业所依赖的 OpenAI 特定配置变更。例如,看门狗服务会定期检查每个 Flink 作业的主题的 Kafka 拓扑。如果我们发现有新的物理集群被添加或移除,看门狗将触发 Flink 作业的重启,以便它能获取最新的 Kafka 拓扑变更,从而避免数据丢失或延迟。
对于有状态的管道,我们使用本地 RocksDB 来存储状态,并为每个命名空间设置 Azure Blob Storage 账户,并为该账户启用异地复制(geo-replication)。在主区域发生故障时,我们可以初步故障转移到辅助区域。目前,平台为所有团队管理 Azure Blob Storage 账户,但我们也允许用户选择提供自己的 Blob 存储账户。
在构建过程中,我们遇到了一个需要注意的问题:目前开源的 Apache Flink 实际上并不支持 Azure Workload Identity 身份验证,而这是 Azure 推荐的用于安全访问存储账户的方式。为了解决这个问题,我们内部将 hadoop-azure 库升级到了 3.4.1,以启用 Azure Workload Identity 身份验证。我们也计划将此贡献回社区。
06深入 PyFlink
现在,让我们深入探讨几个关键话题。
首先,我们来看看 Python。开源的 PyFlink 提供了 DataStream API 和 Table/SQL API。在 OpenAI 内部,我们将 PyFlink 与我们的单体仓库(monorepo)系统集成,使用户可以像开发常规 Python 项目一样,复用所有现有的 Python 库。
PyFlink 使用了大部分 Flink JVM 栈,并在 Flink SDK 和运行时中增加了对运行 Python 函数的支持。在 SDK 侧,它基本上使用 Py4J 将新的 Python DataStream 和 Table/SQL API 映射到 Java 版本。在运行时侧,Python 函数被映射到 Java 图中的自定义 Python 算子。该 Python 算子由运行用户 Python 逻辑的 Python Worker 以及与 Python Worker 通信的自定义 Java 算子组成,后者负责处理检查点(checkpointing)、水印(watermarking)以及与 Python Worker 的数据和状态交换。
PyFlink 目前支持两种不同的执行模式来运行 Python 用户自定义函数:进程模式(process mode)和线程模式(thread mode)。默认模式是进程模式。在进程模式下,用户的 Python 函数作为单独的进程运行,并使用 Apache Beam 的可移植性框架与 JVM 算子通信。它具有良好的资源隔离性,总体上也更成熟。然而,其局限性在于 IPC 开销,因为它们使用 gRPC 在 JVM 进程和 Python 进程之间进行通信。这会带来序列化和反序列化的开销。此外,这也需要更多的调优参数来适应不同类型的工作负载,例如批处理大小(batch size)和批处理超时(batch timeout)。
PyFlink 也支持线程模式。在线程模式下,用户的 Python 函数在与 JVM 线程相同的进程中运行。它带来了吞吐量、延迟的提升以及更短的检查点时间。然而,其局限性在于目前仅支持 CPython 和应用模式(application mode),总体上不如进程模式成熟。我们实际上与社区委员会合作,修复了线程模式中的几个问题,包括日志记录以及 JVM 中的共享对象加载。
到目前为止,我们已经在 OpenAI 将 PyFlink 投入生产。然而,我们也观察到了一些挑战,首先是效率问题。基本上,正如我们所见,所有的 Python 函数(用户逻辑)都在 Python 中运行,并且在进程模式下 IPC 期间会产生额外的序列化和反序列化成本。因此,对于大规模作业,我们也支持用户用 Java 实现他们的处理函数。PyFlink 实际上支持从 Python DataStream API 调用它们,因此我们可以支持用 Python 编排流处理逻辑,但实际代码将在 JVM 中运行。
此外,异步 I/O(async I/O)和流式关联(streaming join)在 Python 的 DataStream API 中尚未得到支持。我们计划与社区合作,增加这些支持。最后,PyFlink 目前还不支持 Python 3.12,我们也在内部和社区中努力增加这一支持。
后续精彩内容将于下周继续更新,敬请关注公众号!
来源 | Apache Flink公众号