从旧系统迁移到新系统总是一个痛苦的过程,这篇文章介绍了 Pinterest 怎样定义自动化的迁移层帮助用户从旧系统迁移到新的工作流系统。原文:Spinner: The Mass Migration to Pinterest’s New Workflow Platform[1]
我们在上一篇文章中讨论了如何做出决定并采取行动,将旧系统 Pinball[2]迁移到 j 基于 Apache Airflow 的新系统 Spinner 上。提醒一下,我们从 Airflow 1.10-stable 版本创建了自定义分支,并且从主干上 cherry-pick 了部分功能。
本文将介绍我们如何设计和完成迁移,明确需求,并与工程师团队协调,无缝的将 3000 多个工作流迁移到 Airflow。我们将深入探讨所做的权衡,但在此之前,我们先介绍一下学到的经验教训。
按照我们的标准,一个成功的迁移过程的关键是:
- 理解并填补之前的内部工作流系统与 Airflow 之间的差距,确定特性差异、安全差异和用户习惯使用的术语。
- 提供以低成本方式同时大规模迁移工作流的工具,并提供验证方法。
- 与用户进行清晰持续的沟通,并提供相关材料,如 wiki、培训课程、积极的用户支持和公告。
- 启用无状态 UI 分区调度器,支持 kubernetes 集成,以提供定制化、可伸缩的解决方案。
- 提供清晰的 CI/CD 流水线,帮助系统一致、可靠、高效的维护多个基础架构分支。
- 严格测试,在预发环境里进行单元测试和集成测试,以阻止破坏性变更,并在部署时采取谨慎的方法。
- 维护运行状况检查和综合指标,并在负载增加时对告警进行微调。
最需要避免的问题包括:
- 确保迁移之前和之后的调度一致。由于定义调度器的方式不同(旧系统并非完全基于 cron),因此旧系统和 Spinner 系统的调度间隔并不总是一致的。因此,要防止操作被忽略或者被过度操作。
- 为每个任务分配内存等资源,以防止任务启动失败。
- Kubernetes pod 的初始化成本是非预期的,pod 启动延时确实有不小的成本,必须被团队内所有用例所接受。
- Kubernetes pod 内冗余的 sidecar 会增加延时与网络问题,并会增加工作流的调度延时。
- 在用户教育和支持方面的投资可能会很高。
- 为维护旧的 DSL 和新的 Airflow DSL 而增加的混合解决方案的成本开销并不低。
接下来看看我们是怎么解决这些挑战的。
方法与需求
我们将平台的迁移需求定义为:
- 用户只需做出最少的代码更改
- 迁移时不需要中断生产工作流的执行
- 设置遗留系统的下线日期并完成系统下线
考虑到这些需求,我们可以有两种方式进行迁移:
- 请求工作流的所有者在 Arflow DSL 中重写旧工作流,并在这个转换过程中提供支持
- 平台提供直接进行 DSL 转换的工具
使用方法 1,可以减少我们和用户的技术债务,平台不必维护额外的基础设施,但由于所有定制的用户逻辑和依赖关系已经被添加到遗留的 Pinball 任务中,因此存在一些挑战。不过即使没有这些挑战,我们也没有让用户接受这个提议,因为每个团队都得花费大量工程时间来做迁移,而这些都是成本。最后,由于需要依赖用户来完成工作,可能会推迟遗留系统的下线时间,因此这种方式并不可行。
因此,我们采用的方法更接近方法 2:我们在 Airflow 调度器中构建了一个迁移层,动态解析 DAG 文件,将遗留工作流系统中的工作流定义转换为 Airflow 的 DAG。这意味着不需要用户修改代码,为用户提供了透明的迁移体验。遗留工作流中的每个作业都被转换为包装操作器类型,该类型是专门为支持工作流迁移用例而实现的。在执行期间,操作器启动新的 k8s pod,使用遗留系统的镜像启动遗留作业的实际逻辑。通过这种方式,我们可以为迁移后的任务模拟遗留系统的执行环境。
迁移层
重申一下,这个项目的目标是用最少的用户工作量尽量透明的推动工作流迁移。下图展示了这一过程的端到端体验,后续我们将更深入研究各个组件。
Pinterest 迁移调度器/组件以及原生调度器/组件。
左侧的组件是 Pinterest 迁移调度程序,这是基于原生 Airflow 调度器构建的,并利用了之前编写的多分区调度器。
PinterestDagBag
当调度器在迁移模式下启动时,使用定制的 DagBag 类,命名为 PinterestDagBag,其负责从迁移元数据文件(而不是 python 的 DAG 文件)解析 DAG。为了更好理解,我们需要介绍之前的 Pinball[2]系统是如何工作的。
Pinball 有令牌(Token)的概念:当 Pinball 工作流做好运行准备,工作流解析器将把工作流定义转换成令牌,存储所有必需的运行时信息。PinterestDagBag 从遗留系统的工作流解析器中检索工作流定义(也就是令牌),遗留系统托管在名为令牌获取器(Token Fetcher)的容器中。然后将传统的工作流定义转换为原生 Airflow DAG 和运行中的任务(例如操作器或感应器)。
完成这种抽象和转换(不需要 DAG 文件)的方法实际上非常简单,一个 DAG 文件本质上只是一个或多个 DAG 的标识符。对于原生 Airflow 来说,DAG 文件恰好携带了工作流定义,但完全有可能以一种不直接包含工作流定义,而是指向定义所在源代码的方式来组成 DAG 文件。我们编写了一个“dag 文件”,表示遗留工作流定义的托管位置,并确保定制的 PinterestDagBag 模块能够从中解析出 DAG 对象。迁移元数据文件的示例如下:
{ “cluster_name”: “core001”, “workflow_name”: “test_workflow”, “migration_date”: “2020–01–01 00:00:00” }
调度器能够发现和处理的元数据是在工作流迁移启动时生成的(我们将在后面详细描述),每个迁移元数据文件都表示如何在 Token Fetcher 容器的帮助下获取遗留工作流的定义,下一节将讨论这部分内容。
Token Fetcher
一旦迁移调度器发现元数据,Token Fetcher 容器就开始发挥作用,运行遗留系统的解析器,并与迁移调度器一起工作,其开放 API 可以用来检索遗留工作流规范以及解析作业。遗留工作流中的每个作业都被解析成一个作业令牌数据结构,其中包含规范,以及最重要的作业执行命令,如下所示:
python data_job_runner.py — job_full_class=reporter.examples.ExampleSparkJob — executor=prod_011 — job_args=”end_date=2021–12–30"
通过 Toke Fetcher 容器,PinterestDagBag 模块可以调用相应的 API,根据迁移元数据文件检索工作流规范和作业令牌。
解析迁移文件的组件之间的交互。
PinboardOnK8sOperator
在深入研究这个特殊的操作器之前,我们回顾一下 Pinboard 是什么。在上一篇文章里,我们介绍过 Pinboard 是 Pinterest 的 python 代码单一存储库,在之前的系统中,所有工作流和作业都定义在这个存储库里。
一旦我们获得了来自 Token 的数据,就使用定制的 PinboardOnK8sOperator 操作器来包装遗留作业的 Token 抽象。每个作业令牌被转换为该操作器的一个实例,存储从检索到的令牌中获取的执行命令。在执行期间,启动一个 k8s pod,加载 pinboard 来执行命令,以模拟遗留系统的作业执行环境。这也可以防止对 Airflow 执行环境造成任何干扰。
Airflow 的序列化 DAG 特性被用来序列化迁移后的 DAG 和任务,有助于减少 DAG 的解析开销。PinterestDagBag 只在工作流的序列化 DAG 不存在时才调用 Token Fetcher 来检索作业 Token 并进行转换。同样,当迁移的 DAG 的要被调度执行时,DagFileProcessor 再次调用 Token Fetcher 来检索最新的作业 Token 并刷新序列化的 DAG。这个序列化 DAG 也用在 UI 渲染中,所以不需要在 Web 服务器上启动 Token Fetcher 容器。此外,由于执行 PinboardOnK8sOperator 所需的属性都是可序列化的,所以在执行迁移任务时也使用了序列化 DAG 特性。
迁移工具
为了简化工作流迁移的过程,我们构建了一个 UI 工具,让用户可以将现有的工作流迁移到 Airflow。只需几次单击,就可以在旧系统上停止调度工作流,并将其调度到新的 Spinner 集群上。一旦工作流被迁移,迁移元数据文件将被上传到 s3,并且可以被迁移调度器发现。该工具还支持迁移回遗留系统、发布高级迁移报告并提供管理员角色,以帮助用户管理迁移记录。
我们还将迁移 API 公开给其他系统的下游服务,帮助这些系统用可编程的方式构建工作流。
显示不同的迁移阶段的 UI。
通过这个工具,迁移工作流只需要几分钟时间,而不需要用户花上几个小时重写代码,减轻了用户负担。用户只需要登录 UI,选择工作流,将其调度到 Spinner 中运行,验证输出是否有效(这需要手动操作),最后通过关闭迁移记录来结束迁移工作。这个工具对于平台和用户来说是非常重要,如果没有它,我们就不可能在一年内完成迁移的目标。
动态 DAG
我们的迁移工作需要但是 Airflow 不支持的一个主要功能是动态 DAG。动态 DAG 可以根据调度器处理的不同动态生成不同的 DAG 布局。例如,如果 DAG 布局是基于某些外部服务或数据的状态生成的,那么就会和从 DAG 文件加载的布局有所不同,并且和解析 DAG 文件的时间相关。我们期望当新的 DAG 被调度执行时,就能够确定 DAG 布局。计算出来的布局将会被保存下来,并且 DAG 的执行将会与保存的布局保持一致。worker 可以基于保存的布局加载任务,而不需要再次进行 DAG 解析,而再次解析可能会生成一个不同的 DAG 布局。
Airflow 原生并不支持这个功能,其潜在问题就在于当 worker 试图执行 DAG 任务时,从 DAG 文件检索到的布局和 DAG 执行是创建的布局是不一致的。在这种情况下,worker 无法从 DAG 获得特定的任务。
我们基于 Airflow 构建了这一功能,设计了一种名为 DynamicDAG 的新型 DAG,并公开了 compute_layout 方法。任务实例化逻辑封装在 compute_layout 方法中,而不是在 DAG 文件的最外层定义任务。这个方法只在创建新的 DAG 运行时被调用生成 DAG 布局,布局快照将被保存并绑定到这个 DAG 运行中,所以当需要获得某个特定 DAG 运行时的任务时,系统能够从保存的 DAG 布局中检索,而不是从 DAG 文件中加载。下面的代码片段展示了如何使用 DynamicDAG 接口构建动态 DAG。
dag = DynamicDAG( dag_id=”dynamic_test”, compute_layout=compute_layout, skip_early_layout_compute=True,…. ) def compute_layout(dynamic_dag: DynamicDAG, execution_date: datetime = None, dagrun_conf: dict = None) -> None: “”” Compute layout for dynamic DAG “”” # Use random int rand_int = random.randint(1, 3) for i in range(rand_int): python_task_1 = dynamic_dag.add_task_into_dynamic_dag(operator_class=PythonOperator, task_id=f’python_task_{i}’, python_callable=python_callable, op_kwargs={‘task_id’: f’python_task_{i}’, ‘execution_date’: execution_date}) python_task_2 = dynamic_dag.add_task_into_dynamic_dag(operator_class=PythonOperator, task_id=f’python_task_{i}_v2', python_callable=python_callable, op_kwargs={‘task_id’: f’python_task_{i}’, ‘execution_date’: execution_date}) python_task_1 >> python_task_2
请注意,虽然我们是为了帮助迁移而创建了这个类,但它也适用于本地工作流,我们可以将这个类用于业务逻辑。
我们修改了 Airflow 中的主要组件,如调度器、Web 服务器和执行器,以支持我们的版本特性,下面的流程图显示了有/没有 DAG 版本特性的调度器处理逻辑的差异。在新的设计中,调度器模块有两个主要的变化:
- DAG 布局将在 DAG 运行时创建期间被重新生成并被序列化,所以一个新创建的 DAG 运行时将总是被绑定到一个特定的 DAG 布局。
- 当调度器处理特定版本的 DAG 布局时(例如验证 DAG 完整性或调度任务实例),系统将加载并反序列化 DAG 布局。通过这种方式,确保总是能够调度和执行正确版本的 DAG。
普通 DAG 和动态 DAG 的比较。
Kubernetes 对迁移的支持
概述
基础设施层利用内部 Kubernetes 集群,从而提供“无限”的可伸缩性、与其他任务的隔离、易于维护和升级,以及改进的安全性。
Spinner 和 Pinterest Kubernetes 集群如何通过 DB 交互。
在上图中,可以看到迁移的任务用例经历了两个迭代。迁移的任务用例有一个 Airflow 的工作 pod,然后启动一个迁移的任务 pod 来加载调用和运行命令所需的环境。这种 pod over pod 场景增加了额外的 2-4 分钟的启动时间,可能会给用户作业带来沉重的成本。稍后,我们将介绍增强的迁移任务用例,可以在原来的工作 pod 中运行迁移后的 pod 逻辑,从而节省了启动第二个 pod 的成本。执行增强的迁移任务工作 pod 时,生命周期如下所示。
迁移任务工作 Pod
迁移任务工作 pod 运行逻辑和完成执行
Airflow 工作容器启动 Airflow 任务命令,生成迁移后的任务命令,该命令将被发送到 pinboard 容器,而 pinboard 容器只是一个可以调用旧 DSL 逻辑并返回输出状态的容器。Airflow 工作容器只是监视 pinboard 容器的活跃度,等待其退出并返回状态。从用户的角度来看,当 UI 试图获取活动任务日志时,它是一个独立进程,使用 kubernetes API 从主机提取日志。Airflow 工作容器轮询查询状态,直到任务完成。
Pod 构建
最后我们介绍一下工作 Pod 的生成方式。
构建 Pod 执行 K8S 操作的端到端流程。
下面会更详细的解释帮助容器生成规范 yaml 的不同组件及其各自的任务操作。在 Kubernetes 执行器中,当一个任务计划运行时,会生成 Airflow 工作 Pod 规范。因为是迁移任务,也会生成 pinboard 容器规范,并将其合并到迁移的 Airflow 工作 Pod 规范中。最终,该规范将提交给 kubernetes 集群,以启动一个带有工作容器和 pinboard 容器的 Airflow 工作 Pod。
从序列图中,你可能还会注意到一些资源分配步骤。在 kubernetes 环境中,我们需要预定义 Pod 的资源。因此,我们还利用一些历史数据以及可以直接从 UI 更新为配置的托管数据来帮助我们为每个迁移的任务进行智能资源分配。我们在内部创建了一个流程来跟踪任务 Pod 的资源使用情况,以便更好理解他们的行为,并最大限度的节约资源。
部署
a关于系统如何启动新构建的决策树
如前一节所述,在执行迁移期间,将启动单独的 k8s pod/容器,使用遗留系统的镜像(即 pinboard 镜像)运行实际业务逻辑,从而确保任务的行为在迁移后保持不变。因此,我们构建了专用的 CI/CD 流水线来生成、验证和发布镜像。
迁移后的 pinboard 镜像的部署生命周期遵循以下步骤:
- 触发 Jenkins 作业,基于最新的提交构建 pinboard 镜像。我们的 Teletraan[3]管理工具会按预定节奏触发,或者手动触发。
- 发布工件,另一个 Jenkins 作业会查看是否发布了任何变更,如果有变更,就运行 DAG 单元测试并发布预发镜像。
- 然后,定期执行的验证工作流将预发镜像发布到金丝雀环境中,并执行一组触发作业,这些作业触发其他监测工作流来验证预发镜像,并查询使用 ExternalTaskSensors 的预发镜像的状态。这些金丝雀工作流是为了测试旧系统中常见的作业类型,以覆盖最常用的操作器。金丝雀工作流套件还包括用户贡献的工作流,以保护他们的关键工作流不受可能破坏其流水线的问题镜像的影响。一旦所有的感应器任务接收到成功状态,就发布生产镜像,供 Web 服务器、调度器和 worker 在生产中使用。如果在金丝雀验证测试期间出现故障,工作流团队将得到自动通知,并需要手动检查问题以纠正并重新部署。
金丝雀工作流启动许多工作流,进行检查,然后启动新的构建
这个部署流水线还允许发布热修复版本,以保护所有用户在几个小时后不受完整部署的影响,而只是发布一个特定的提交。单一存储库有时具有复杂的依赖,可能导致许多意想不到的任务失败。金丝雀验证流水线允许我们在任何变更影响到生产环境之前捕获潜在的问题。
DAG 文件同步
正如迁移工具一节中提到的,Spinner 自动迁移工具生成一个将发布到 s3 的迁移元数据文件,该文件是调度器的标识符,用于查找迁移后的工作流和作业令牌。同步服务在 Airflow Web 服务器和调度器上运行,同步主机上的迁移元数据文件与来自 s3 的 DAG 文件,并且也基于调度器层和分区号。正如在上一篇文章中提到的,有多个调度器同时服务于迁移工作流和原生工作流,但是一个调度器只能其中一种 DAG,不能同时处理两者。任何新的迁移元数据文件都会在 8 秒内同步到调度器,然后由 PinterestDagBag 模块处理。下面是我们现有的迁移调度程序分区。
可用的不同迁移集群
指标
在工作流迁移项目开始时,系统上运行的大多数工作流都是从遗留系统迁移过来的。为了度量系统的健康程度,需要更重视这些工作流。正如在上一篇文章中提到的,我们的系统级 SLO 是跨多个集群承载的所有调度器的聚合加权平均值。因此,迁移后的调度器具有更高的权重,因为包含更多和更高层次的工作流。SLO 是通过每 15 分钟为每个调度程序运行一个预定的 DAG 来测量的,这个 DAG 会发出统计信息。如果任何指标丢失了一个点,加权平均值将下降,我们测量该指标的总体正常运行时间不会低于 98%。任何指标丢失一个数据点,都会通知工作流团队,除非速率低于阈值(通常意味着更大的问题),并不会通知所有用户。
跟踪系统运行状况 SLO 的度量图
结束语
我们与其他想要探索如何增强主要组件来定制业务需求的 Airflow 爱好者分享我们的发现,我们采用基本的 Airflow 系统,加入自定义修改,支持其与我们的迁移层协同工作,协调客户工作流。
希望这篇关于如何从旧系统迁移到 Pinterest Airflow 系统的文章对你有所帮助。
References:[1] Spinner: The Mass Migration to Pinterest’s New Workflow Platform: https://medium.com/pinterest-engineering/spinner-the-mass-migration-to-pinterests-new-workflow-platform-997d9243f56a
[2] Pinball: Building workflow management: https://medium.com/@Pinterest_Engineering/pinball-building-workflow-management-88a044c9b9e3
[3] Teletraan: https://github.com/pinterest/teletraan