Spinner 是 Pinterest 基于 Airflow 构建的工作流引擎,这篇文章介绍了 Spinner 的构建背景和设计原则。原文:Spinner: Pinterest’s Workflow Platform[1]
Pinterest 在迁移到 Airflow 之前的工作流规模
自成立以来,Pinterest 始终将数据定义为自己的哲学。作为一家数据驱动公司,所有数据都被存储起来以供将来使用。因此每天大概会有 600TB 的新数据,总数据量已经超过了 500PB。在这种规模下,大数据工具在帮助公司收集有意义的见解方面发挥着关键作用,这就是工作流团队的用武之地。我们支撑了超过 4000 个工作流,平均每天执行 10,000 个流程和 38,000 个作业。
背景
早在 2013 年,Pinterest 就创建了一个名为 Pinball[2]的内部调度器框架。当时这是一个适合公司需求的解决方案,但随着需求的增加,它无法进一步扩展,以便于服务内外部其他产品和服务。下列限制日益显著:
- 性能:
— 调度/作业启动时延(从计划开始到实际开始的时间)高于预期值。
- 可扩展性:
— 系统的组件是有状态的,增加了水平扩展的复杂性。
— 中心化的元数据存储服务成为单点故障源。
- 可维护性:
— 基础设施升级需要关闭所有工作进程,需要设置备用主机承载任务以及其他操作。
— 支持负载分区的多个(10)集群会增加升级和监控的额外开销。
— 工作负载的增长增加了有状态主节点的负担,导致延迟或瓶颈问题。
- 隔离性:
— 因为 Pinterest 的 Python 代码都在一个 mono-repo 里,因此无法隔离用户代码。
— 导入有问题的代码会因为复杂的依赖问题破坏很多工作流。
- 功能:
— 缺少 ACL 和审计日志。
— 缺少执行统计的可视化。
— 还缺少其他主要功能,比如服务器认证、快速导航等等。
- 文档:
— 由于是内部项目,缺少文档和示例,只能查找类似工作流的代码。
— 有一个分享解决方案的社区,持续改进产品,为用户提供更多支持。
- 测试:
— 用户很难部署自己的开发集群进行端到端测试。
由于上述痛点,我们很明显需要进行根本性的改变。这给我们创造了一个机会,可以在开始另一个内部项目之前,探索是否有解决方案可以解决当前系统的问题。
为什么是 Airflow?
2019 年,我们对 Spotify 的 Luigi[3]、LinkedIn 的 Azkaban[4]以及其他选项做了一些初步分析,最终我们选择了 Apache 的 Airflow[5],原因如下:
- 目标对齐: 用户需要的功能要么已经内置在 Airflow, 要么可以添加到插件中。
- 生产级: 被许多组织广泛采用,有积极的社区支持,有大量的讨论,并有很好的文档供用户查看。
- DSL: 基于 python,与我们之前的系统一致,降低用户采用的成本。
- 代码: Airflow 是模块化的,可以方便的使用独立组件连接定制系统。
- 可伸缩性: 采用无状态组件,可以重启恢复,UI 从中心数据库中获取数据,并允许通过插件操作 kubernetes 基础设施和可分区调度器。
- 声誉: 整个社区似乎对 Airflow 提供的能力非常满意。
基准测试
为了判断 Airflow 是否能够满足需求,我们基于不同的参数做了一系列性能、负载和压力测试,这些参数包括:DAG 的数量,每个 DAG 中任务的数量,并发执行的任务以及任务实例,不同的并发设置,数据库记录的数量。这些测试的目标是测量模拟某些生产负载的性能,确定需要多大的规模,估计需要的集群范围,并确定我们采用此框架需要解决的瓶颈和限制。
在执行测试场景之前,需要对源代码进行修改,以建立有效的概念证明(POC)集群。首先,我们修改了 Airflow 的版本,我们基于 v1.10-stable 拉了一个分支,并从主分支中 cherry pick 了一些修改,以支持 SerializedDAG[6]模型。其次,由于 Pinterest 的 kubernetes (k8s)环境的设置不同,我们创建了一个修改后的 k8s 执行器,以便将任务提交到内部 k8s 集群。最后,我们还构建了一个修改后的调度器来进行分区,并且添加了额外的 opentsdb[7]统计状态数据收集来记录准确的测量值。下面是我们的一个性能测试示例。
所有节点都运行在 AWS 上,包括两个 UI 节点,一个调度节点(c5.2xlarge),一个 mysql 主机(1 个主/1 个备,c5d.4xlarge),带有 300 Mhz CPU 和 300 MB 内存工作节点的定制 k8s 调度器。
在进行测试后,我们得到了以下信息:
- UI 节点性能良好,延迟与不同视图上的 db 记录数量有关
- Mysql 不是我们之前假设的瓶颈,它是一个独立的中心化节点,通过调度器、Web 服务器和工作节点连接
- 调度器在维护超过 1000 个 DAG 后,性能会下降
- 性能测试期间检测到的瓶颈是在单个调度器处于极端负载下时 kubernetes 的提交和结果解析
- 定制的 kubernetes 执行器在系统处于高负载时构建了一个任务队列,从而导致了延迟
- 为了达到调度延迟时间的目标,必须将 DAG 的数量保持在 1000 以下,以确保单个调度程序达到可以接受的性能。因此,需要部署多个调度器。
性能测试的结果让我们非常清楚的了解到系统在不同设置下是如何执行的,以及需要在哪里努力消除瓶颈。
与之前的 Pinball 系统进行比较,我们发现:
- 在类似的负载下,Airflow POC 集群相对 Pinball 集群的调度延迟更小(50 秒 vs 180 秒)
- 如果将总负载合并到一个集群中,该系统可以容纳 3000 个 DAG,每个 DAG 有 25 个任务,每个任务的延迟约为 10 分钟,比 Pinball 要好 5 倍
- 95%用户的 UI 页面加载时间<1s,相当于 250 个并发页面访问负载,并且当分页时,性能受延迟数量的影响最小,而 Pinball 集群的延迟随着 DAG 的增加而增加
在性能测试的基础上,如果我们可以优化 kubernetes 执行器的性能和稳定性,提供生产级的可伸缩性,同时扩展多调度器方案以解决分片/分区负载性能问题,我们觉得完全可以用一个集群承载所有负载。
站在用户的角度来看,我们可以自定义设置,并利用开箱即用的 Airflow 能力来满足功能要求,大多数负载运行在单一集群中(出于安全考虑,我们必须为 pii 和 sox 工作流提供单独的集群),以便工作流数据可以在同一个集群中搜索。
Pinterest 工作流系统
Spinner 架构介绍
上图展示了端到端工作流系统,每个组件将在相关部分中进行详细解释。示意图中所示的外部客户端(如 EasyFlow、Galaxy、FlowHub 和 Monarch)与 Spinner 进行交互,本文不做详细介绍。
序列化 DAG(Serialized DAG)
序列化的 DAG 模型很重要,有两个主要原因:
- 性能:一个集群中有数千个 DAG,在数据库中缓存 DAG 模型比每次调用都处理 DAG 文件可以获得更好的性能。
- 迁移:需要将数千个工作流从遗留的 Pinball 系统迁移到 Airflow 系统。对于不同的 DSL,需要将工作流模型存储到数据库中,以支持所有的 UI 特性,如代码视图、呈现视图等,否则这些视图就需要依赖我们没有的 DAG 文件。迁移后的工作流将在后面进行更详细的讨论。
Webs 服务器
无状态 Web 服务器是用户查看工作流状态和历史记录的入口。我们在集群上启用了 DAG 级别的访问控制(DLAC),并强制每个 DAG 必须通过至少一个角色授权(如下所示),这样只有对工作流有权限的用户才能访问。spinnre-users 是默认角色,所有工作流都设置有这一角色。这个默认角色拥有对每个 DAG 的读访问权限,这为平台维护人员提供了读取日志、查看历史记录等能力。此外,平台团队提供给用户创建额外角色的能力,从而可以将特定工作流分配给他们,并阻止其他用户查看或操作这些工作流。创建角色的过程在当前需要手动操作,我们希望将创建并向用户分配角色的过程自动化。
dag = DAG( dag_id='example_dag', access_control={SPINNER_USERS: {'can_dag_read','can_dag_edit'}}, ... )
当调度器解析 DAG,例如覆盖 DagModel 时,权限将从 DAG 文件同步到 DB。用户还可以通过刷新 UI 中的 DAG 来强制更新。当然,如果 Web 服务进程重启,会重新构建 webapp,并在启动时同步所有的工作流权限。我们在所有这些时间点都添加了同步,而不是只在 DagModel 更新时同步。
此外,通过开箱即用的事件日志,我们将用户对工作流的操作添加到审计日志中,从而对工作流发生的任何事情都拥有完整的跟踪历史。
最后,为了可伸缩性,我们设置了多个 Web 服务节点托管 UI 服务,每个 Web 服务节点有 4 个线程来处理请求。我们通过滚动部署方式部署系统,从而避免用户停机。
多分区调度器
我们的目标是让 Spinner 集群有一致的接口来查看所有的 DAG,以减少维护、额外的开销以及在旧系统中遇到的多集群的混乱问题。用单一调度器管理所有 DAG 并不可行,即使垂直扩展更多资源,也会遇到瓶颈,因此增加 DAG 解析的并行性并不是一个长期解决方案。我们需要建立多调度器方案,每个调度器查看不同的 DAG 分区。我们考虑过 AIP-15[8],但其执行的时间框架与我们的需求不符,所以我们构建了一个内部解决方案,以支持对工作流分级的需求。
新旧系统的调度器流分区的比较。
由于当前的调度器是有状态的,并且不允许同时运行更多实例,因此为了实现这一方案,我们修改了调度器。与此同时,还需要防止不同的调度器干扰到同一个 DAG/任务。我们实现了基于规则的分区器,从 DAG 位置检索出层和分区号,将给定的 DAG 标记为特定分区,并将每个分区分配给一个调度器来管理。通过这种方式,每个调度器只解析、分发和维护它所负责的 DAG 的状态,调度器节点不会发生冲突。通过多调度器方案,我们满足了平台的需求,我们将更高优先级的工作流放在具有更严格负载限制的专用调度器分区中,从而为高优先级工作流提供更高的 SLA。
每个调度器配置指向特定分区的 DAG 文件夹路径,但 UI 加载了所有 DAG 路径,因此仍将呈现所有 DAG,从而为用户提供一致的入口体验。UI 将显示任务实例的分区标记,并确保无论在任务上执行什么操作,都有正确的调度器负责执行。
Spinner 工作流和 Pinterest 分层
在深入研究存储库结构之前,我们需要讨论一下 Pinterest 特别的分层结构。工作流和系统基于重要性进行标记,Tier1 最高,Tier3 最低。在我们的数据组织中,给予 tier1 工作流更高的优先级和更多的资源,这将体现为存储库结构中的 DAG 路径。我们修改了 DAG 的定义,要求传入层字段,如下所示:
dag = DAG( dag_id='example_dag', tier=Tier.TIER1, ... )
我们为用户创建了一个新的单一存储库来添加工作流,从而得以在如下方面获得控制权:
- 我们需要在调度器、Web 服务器和 kubernetes pods 之间同步工作流,所以有单独的地方来同步可以让这个过程更简单、更可控
- 我们需要建立基于层的路径,以便 tier1 工作流的用户可以将工作流放在 tier1 的基本路径中,tier2 的放在 tier2 的路径,以此类推
- 作为平台的维护者,我们需要为代码审查设置规则
- 我们要求所有代码在提交时通过一个快速的单元测试套件,以确保代码安全、代码质量和代码控制
“dags”文件夹包含用户创建的 DAG,下面的子文件夹由工作流团队按照命名约定维护:
{cluster_type}_tier_{tier_index}_{partition_index}
下面是这个结构的一个例子。
- 集群类型将是 spinner、pii、test 或 sox。
- 分层(Tier)索引如上所述:1、2 或 3。
- 分区(Partition)索引从 0 开始,并根据需要增加。这个值表示当某个 cluster_type 的某个层中的 DAG 数量超过定义的阈值时,将创建一个新分区,并需要将新 DAG 签入到新的分区路径中。
dags/ spinner_tier_1_0/ team1/ my_dag.py spinner_tier_2_0/ spinner_tier_3_0/ pii_tier_1_0/
这一设置有助于增强我们的多分区调度器组件,这些 DAG 文件中的公共模块将被放在 plugins 目录中,在所有集群/层/分区之间共享。
持续集成和持续部署
任何工作流平台的很大一部分都是处理用户和系统级代码的部署,在 Pinterest,我们使用 Teletraan[9]作为部署系统。
Spinner 的设置涉及到三个不同的代码库:
- Spinner Repo:这是来自 v1.10-stable 的 Airflow 源代码,包含了 cherry-pick 的子集以及许多内部插件。
- Spinner workflow:上面提到的原生 Airflow 工作流代码库。
- Pinboard:这是 Pinterest 现有的单一存储库,承载了所有跨系统和平台的 python 代码,包含大量自定义函数和实现,以及所有遗留的工作流和作业。
示意图显示了不同的 ci/cd 管道。
如上图所示,基础架构代码和用户代码有独立的存储库。在构建镜像过程中,有基础设施代码的构建(pinairflow),有 Pinterest 单一代码库的构建(pinboard)。有一个单独的流程负责为 DAG (spinner workflows)拉入代码,并确保同步并执行了最新的代码。
该图详细概述了每个组件及其部署周期,但关于基础设施(UI/Scheduler/K8s worker)的 CI/CD,有几点需要注意:
- 基础架构代码变更触发将触发 Jenkins 作业,以生成新的构建并将新镜像推送到仓库
- 另一个 Jenkins 作业将通知 Teletraan 刷新节点上的镜像和容器
- Kubernetes pods 用相同的镜像拉起容器并执行任务
- 正在进行本地测试的用户能够加载最新的镜像来测试他们的 DAG
关于 DAG 部署 CI/CD,有几点需要注意:
- 用户代码的变更会触发 Jenkins 任务,将所有主机上的 DAG 同步到 s3
- 运行在 Web 服务器和调度器上的服务进程,以非常短的时间间隔(30 秒)从 s3 同步文件更改。
- 当 Worker pod 启动最新的 DAG 并执行任务时,也会从 s3 同步
Pinterest Kubernetes 执行器
在 Pinterest,我们有一个内部团队致力于提供 Kubernetes 服务。因此,必须重新构建开箱即用的 k8s 执行器才能与我们的系统一起工作。我们重写了所有与 kubernetes 交互的 API,重写了 kubernetes 执行器、kubernetes 调度器等。我们保留了开源执行器的基本业务逻辑,同时适合于我们的特定环境,我们在后端部署了这一执行器插件。
这个执行器使我们的集群具有完全的运行时隔离,并可以根据负载需求进行伸缩。每个任务在自己的 pod 中运行,有安全的隔离设置,所有 pod 也与 Pinterest 中的其他服务共享 k8s 节点。pod 在任务执行后立即被移除,以释放资源。这是对我们之前的系统的一个巨大改进,在之前我们只能提供独占的主机和固定数量的工作槽,从而很难扩展。
Pinterest kubernetes 集群和 Spinner 集群交叉通信。
kubernetes 执行器会构建一个 pinterest watcher 拉取 pod 状态,这是 pinterest k8s 环境特定的东西。我们用标签隔离不同调度器启动的 pod,因此需要重新构建这一状态拉起机制。
然后,对于任何任务实例,dag_id、task_id、execution_date 和 try_number 的组合将创建唯一的 pod 名。可以对 pod 进行定制,以设置额外的环境变量、申请更多资源、安装额外的包等,以满足不同的用例场景,如下所示。
customized_task1 = PythonOperator( task_id='customized_task1', dag=dag, python_callable=print_context, executor_config={ "KubernetesExecutor": { 'request_memory': '350Mi', 'request_cpu': '200m', 'limit_memory': '1200Mi', 'limit_cpu': '500m', 'requirements': 'example_requirements.txt', }})
我们还在 pod 启动逻辑中构建了一个“服务准备就绪检查”脚本,该脚本将在 pod 真正开始运行任务逻辑之前执行。这个准备就绪检查脚本将确保 s3 通信已经建立,mysql 通信被接受,可以完成 knox 检索,等等。因为每次都动态拉起 pod,需要确保这些需求满足才能正确运行任务。
我们还优化了实时任务的日志提取机制。pod 中的每个任务在完成后会将日志推送到 s3,但在运行时,日志会同步到 Pinterest Elastic Search (ES)。当用户转到 UI 来提取任务日志时,日志将在写入标准路径时从 pod 中提取出来,如果有任何问题,那么将退回到从 ES 读取。这对于 pod 的 readiness 日志同样有效。如果没有故障发生,则从 UI 上显示的任务日志中清除 readiness 日志。
Pinterest Kubernetes 执行器端到端工作流。
从上图中,可以看到任务如何从创建到执行再到完成,以及执行器如何处理这些任务。主要组件是 K8s 执行器、K8s 调度器和 K8s 作业监视器,每个组件都用蓝色突出显示。每个阶段引出下一个阶段。
用户体验
自定义操作器和感应器(Custom Operators and Sensors)
我们的每一个决定都围绕用户做出,目标是用最少的工作量为用户提供最大的利益。我们提供了超过 30 个定制操作器。如下图所示,我们有很多自定义逻辑,被包装在一个操作器或感应器中。我们在内部有一个叫做作业提交服务(Job Submission Service, JSS)的系统,它将流量路由到一个内部的 yarn 集群,该集群执行 hadoop、spark、sparksql、hive、pyspark 等作业。因此,所有的提交操作都需要重写,才能在我们的环境中工作。最重要的是,需要将一些常见操作封装起来,从而提供与传统工作流系统相同的功能,以激励用户采用 Airflow。
尽管新旧系统之间的 DSL 有很大的不同,但其目标是尽可能减轻用户负担,以便他们只需要输入执行所需的参数。
我们必须克服的最大障碍之一是为 pinboard 代码提供支持,如前所述,这里有很多用户自定义逻辑,他们不希望将代码复制到位于不同存储库中的新工作流中。由于我们的基础设施不强制打包子模块,因此需要打包整个代码库并将其添加到 python 路径中,这意味着需要将其打包到构建的映像中。出于不同的原因,我们最初想要禁止用户导入旧代码库,但这似乎是用户的痛点,所以我们将其添加到镜像中,以减少用户编写本地工作流的阻碍。
支持 Pinterest 工作流的自定义操作器。
测试
用户打开自定义测试容器后显示的提示符。
在旧系统中测试工作流不是很友好,这是我们希望通过 Spinner 解决的问题。我们开发了脚本,帮助用户在开发主机上启动容器,运行 Airflow 调度器和 Web 服务进程,并模仿生产环境。在此有一些细微差别,比如使用本地执行器而不是 Kubernetes 执行器(需要从开发主机获得一些配置),但体验非常相似。它可以还设置本地 mysql 主机,可以从容器中连接,并且可以从本地工作空间(我们使用 Pycharm)中同步文件到远程开发主机,以同步更改并测试最新逻辑。文件可以被 dev 调度器发现,用户将在 dev UI 上查看标记。这个脚本由我们在用户编写工作流的同一个存储库中维护,以方便用户熟悉。它极大提高了用户的开发体验和开发速度。
单元测试
显示用户代码提交、触发单元测试以及如何将代码推送到生产主机的流程。
Pinterest 使用 Phabricator[10]和 Arcanist[11]进行代码审查和推送。当用户进行代码变更,在变更代码进入生产之前,必须由自己的团队(可能还有平台团队)对其进行评审,然后每一次差异评审都会启动一个 jenkins 工作,这个 jenkins 工作会启动我们编写的超过 25 个自动化单元测试。
这些测试包括验证 DAG 的某些属性,如访问角色设置、层、有效的开始日期、通知设置等,还包括我们不想设置的某些属性,比如 catchup 属性等。还会验证某些属性是否在可接受的范围内,例如 start_date、executor_config 里的内存和 cpu 配置以及并发设置。我们在系统方面也有限制,希望在两方面都能有所加强。我们还有测试 Pinterest 特定逻辑的单元测试,例如,为了防止顶层调用 Hive Metastore(HMS)的所有 DAG 文件(这会影响 DAG 解析时间,对 HMS 服务造成问题),我们通过对常见函数打补丁的方式捕获试图这么做的用户。
当用户提交更改时,我们并不会做代码审查,因此这些验证测试有助于获得更高程度的信心。虽然不能涵盖所有的问题,但这些测试确实涵盖了大多数问题,并帮助我们了解了常见的问题,以便可以添加更多单元测试,从而成为更好的门禁。此外,用户还可以利用这个测试流水线为自己的逻辑添加单元测试。
监控
用于监视生产系统组件运行状况的统计信息仪表板。
我们广泛使用名为 Statsboard 的内部工具,这是一个度量可视化和告警框架。如上所示,对每个组件都有足够的统计数据,例如:
- 调度器
- Web 服务器
- 执行器
- API
- DAG 同步
- Mysql
- 功能及其他
Overall 选项卡是所有集群中所有组件的聚合统计视图。我们可以查看每个集群、每个组件或更高级别的统计数据。通过这种方式,如果我们看到某些系统指标似乎有关联,就可以控制统计数据的粒度。不同的调度器有加权平均,以表示它们的健康检查的重要性(即更高的层以及负载更高的调度有更多的权重)。每个调度器的健康检查实际上是运行在调度器上的一个工作流,如果成功就会发出统计信息,这一统计以 15 分钟为间隔发生,如果成功率低于某个阈值,任何缺失的数据点都将向团队发出告警。我们还对数据进行了延时检查,以防出现重复。
收集的许多统计数据都是现成的,但我们在自己的组件中添加了一些额外的统计数据,以便对系统健康状况提供更精确的可见性,我们希望对服务实现预期式管理而不是反应式管理。
最后
在这篇文章中,我们想与同行分享我们的痛点,我们对不同产品的调研,我们如何确定怎样处理需求,怎样测试帮助我们在增加投资之前更有信心,怎样改进帮助我们达成目标。我们还讨论了如何帮助用户并改善他们的体验。
下一篇文章我们将详细讨论如果将遗留工作流迁移到新的 Spinner 平台上,在这个平台上,我们将承担支撑 3000 个工作流的迁移,并与各自团队的领导进行协调。我们构建了自动化工具、自定义 API 和一个转换层来处理这个问题。
希望这篇关于我们如何在 Pinterest 改进和部署 Airflow 的文章对你有所帮助。
References:[1] Spinner: Pinterest’s Workflow Platform: https://medium.com/pinterest-engineering/spinner-pinterests-workflow-platform-c5bbe190ba5
[2] Pinball: Building workflow management: https://medium.com/@Pinterest_Engineering/pinball-building-workflow-management-88a044c9b9e3
[3] Luigi: https://github.com/spotify/luigi
[4] Azkaban: https://github.com/azkaban/azkaban
[5] Airflow: https://github.com/apache/airflow
[6] https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py#L897
[7] opntsdb: http://opentsdb.net/
[8] AIP-15: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
[9] Teletraan: https://github.com/pinterest/teletraan
[10] Phabricator: https://www.phacility.com/phabricator/
[11] Arcanist: https://secure.phabricator.com/book/phabricator/article/arcanist/