简述
Prefect 是一种新的工作流管理系统,专为现代基础设施而设计,由开源的 Prefect Core 工作流引擎提供支持。 用户只需将任务组织成流程,Prefect 负责其余的工作,可让您非常容易使用数据工作流并添加重试、日志记录、动态映射、缓存、失败通知等语义。
架构
Prefect 独特的混合执行模型使您的代码和数据完全私密,同时可以充分利用我们的托管编排服务。
当您注册工作流时,您的代码会安全地存储在您的基础设施中。 工作流元数据被发送到 Prefect Cloud 以进行调度和编排。
Prefect Agent在用户的部署架构内运行,负责启动和监控工作流运行。 Agent向 Prefect Cloud API 发送请求以更新工作流运行元数据。
Prefect Cloud 的实时更新的用户界面(UI)允许您实时监控工作流执行并利用 Prefect Cloud 广泛的集成功能。
通过这种方式,将最大限度地减少您的负面工程负担,并使您专注于真正想要编写的代码。
调度和管理工作流元数据(Prefect Cloud平台/Prefect Server)
Prefect Cloud平台
Prefect Cloud 是一个完全托管的、可用于生产的 Prefect Core 后端服务。 如果您使用过 Prefect Core 的后端服务,Prefect Cloud 是替代品,它提供了一些增强功能,包括:
- 支持权限和授权
- 当大规模工作流时,性能更强
- 支持Agent监控
- 运行时加密安全
- 支持团队管理
- 支持服务等级协议
Prefect Server
除了 Prefect Cloud 平台,Prefect 还包括一个用于编排和管理工作流的开源后端服务,主要由 Prefect Server 和 Prefect UI 组成。此本地服务将工作流元数据存储在 Postgres 数据库中并暴露 GraphQL API进行访问。
在第一次运行本地后端服务之前,请先运行 prefect backend server
来配置 Prefect 进行本地编排。 请注意,本地后端服务需要运行 Docker 和 Docker Compose。(或者你的笔记本有Docker Desktop,你仍然可以运行本地后端服务;同时,Prefect Server 还有一个实验性的 helm chart用于向 Kubernetes 进行更大规模的部署。)
要启动本地后端服务、UI 和所有必需的基础架构,请运行:prefect server start
当所有组件都运行后,您可以通过访问http://localhost:8080
查看 UI。
请注意,从本地后端服务执行工作流需要至少运行一个Prefect Agent :
prefect agent local start
最后,要向本地后端服务注册任何工作流,请调用 flow.register()
。
Prefect Server架构
Prefect Server 由几个不同的服务组成:
- UI:用户界面,提供用于转换和查询元数据的可视化仪表板。
- Apollo:这个服务是交互的主要端点。
- PostgreSQL:用于存储元数据的数据库持久层
- Hasura:这个服务位于 Postgres 之上的 GraphQL API,用于查询元数据。
- GraphQL:这个服务的业务逻辑是暴露 GraphQL 转换
- Towel:运行负责服务维护的实用程序。
- 调度程序:调度并创建新的工作流运行。
- Zombie Killer:如果任务无法心跳,则将运行的任务标记为失败。
- Lazarus:重新调度运行在一段时间内保持异常状态的工作流。
注:用户和代理只需要访问 Apollo 端点,所有其他服务驻留在防火墙后面。
Prefect Server 与 Prefect Cloud 应该选择哪个?
Prefect Server 和 Prefect Cloud 有许多相似之处, 事实上, Prefect Server 的一些服务在 Prefect Cloud 中的运行几乎没有变化!尽管如此,仍有一些差异(如下所述)值得在您的决定中加以考虑。因为 Prefect Cloud 建立在 Prefect Server 的代码库之上,所以我们将完全专注于Prefect Cloud( Prefect Server之外 )的额外优点。
支持授权和权限
Prefect Cloud 支持用户作为一等概念,它允许:
- 通过
Auth0
获得对 UI 的许可访问 - 可定制的访问控制
- 经过身份验证的 GraphQL API,只能通过特殊的授权令牌访问
- 完整的团队管理体验
API网络可访问性和自定义部署
由于可以从任何可以访问 api.prefect.io
的位置访问 Prefect Cloud 的 API。因此,定制您的工作流和部署工作流要容易得多,而无需维护网络端点。从使用您最喜欢的 CI/CD 工具注册流到在不同的集群和机器上运行多个Agent,您只需要确保的是您的工具和服务具有恰当许可的云授权令牌。
支持企业特性
除了用户角色和授权之外,Prefect Cloud 还附带许多业务部署通常需要的附加功能:
- 用于配置特殊警报的 SLA 服务
- 单点登录 (SSO) 集成
- 代理监控
- 用于通知的特殊类型的 Cloud Hooks
- 所有租户活动的审计跟踪
- 高可用性
- 企业支持
- 运行时加密保护
规模和性能
Prefect Cloud 的设计考虑了规模和性能。 根据您日常运行的工作量,您可能会注意到Prefect Server工作流运行和Prefect Cloud工作流运行之间存在明显差异。 特别是我们观察到在使用Prefect Cloud运行时,相对于典型的Prefect Server部署,并发运行约 10-20 个任务时,可能会快 3 倍。 随着规模的扩大,这种差异会被放大:一旦您同时运行约 50-100 个任务,速度差异可能高达 6 倍。 最终,这些差异归结为 API 响应能力,因此 UI 的性能也直接受到影响。
客户支持
Prefect Cloud 客户可以获得多种支持,从专门的咨询时间到共享的 Slack 渠道。 Prefect Server 不提供高级支持。
本地工作流编排
对于新手来说,Prefect 的 Python风格的API 会应该感到熟悉。 将函数标记为任务并相互调用以建立工作流。
from prefect import task, Flow, Parameter @task(log_stdout=True) def say_hello(name): print("Hello, {}!".format(name)) with Flow("My First Flow") as flow: name = Parameter('name') say_hello(name) flow.run(name='world') # "Hello, world!" flow.run(name='Marvin') # "Hello, Marvin!" 复制代码
Agent-启动和监控工作流
Agent负责启动和监控工作流运行。Prefect 支持多种不同的代理类型,用于在不同平台上进行部署。
- 本地:本地代理作为本地进程执行工作流。
- Docker:Docker 代理在 Docker 容器中执行工作流。
- Kubernetes:Kubernetes 代理将工作流作为 Kubernetes 作业执行。
- AWS ECS:ECS 代理将工作流运行作为 AWS ECS 任务执行。
主要特点
细节问题
Prefect 是一个工作流引擎,这意味着用户需要有绝对的信心 。1) 它可以工作,2) 它可以很好地工作。 出于这个原因,Prefect 的设计以强大的数据工程理念为后盾,我们将其代码保持在高标准。
Prefect 已经拥有比任何其他工作流引擎(包括整个 Airflow 平台)更多的单元测试和更大的测试覆盖率。 文档是最重要的:每个模块、类和函数不仅有一个文档描述,文档描述本身也经过了一致性测试。 类型注释用于捕获错误。
Prefect甚至对变量和函数的名称进行了用户测试,以确保它们清晰。
用户可以相信,这种关注标准甚至可以扩展到他们自己可能永远不会查看的代码库部分。
任务即函数
简单来说,Prefect 任务就是何时运行有特殊规则的函数:它们可以选择接受输入,执行一些工作,并可以选择返回输出。
任务可以直接处理数据,或编排外部系统,或调用其他环境甚至语言——对任务可以做什么几乎没有限制。
此外,每个任务在运行之前都会收到有关其上游依赖项的元数据,即使它没有收到任何明确的数据输入,也可以根据工作流的状态改变其行为。
因为 Prefect 是一个逆向工程框架,它与每个任务运行的代码无关。 没有限制输入和输出。
工作流即容器
工作流是任务的容器。 工作流表示任务之间的依赖结构,但不执行任何逻辑。
模块化
Prefect 的每个组件都采用模块化设计,可以轻松定制或替换从执行引擎到日志记录、数据序列化和存储,再到状态处理本身的任何内容。 作为一种逆向工程工具,Prefect 旨在支持正向工程,而不是取代它。
通过状态进行通信
Prefect使用State
作为一个正式概念来随时反映工作流的行为。 任务和工作流都会产生States
。
海量并发Prefect的工作流可以在任意时间、出于任何原因、以任何并发量进行运行。 按计划运行工作流只是一种特殊情况。
支持幂等(非必需)
我们将幂等性称为工作流管理系统的“救星”。当任务保证业务幂等时,工程挑战将变得非常容易。然而,构建幂等任务极其困难,因此我们在没有任何幂等假设的情况下设计了 Prefect。用户应该更喜欢幂等任务,因为它们天生就是健壮的,但是 Prefect 不强制要求任务幂等。
自动化框架
一个像样的自动化框架应该具有三个关键组件:
- 工作流定义
- 工作流引擎
- 工作流状态
Prefect Core满足上述三条,而且每个组件的设计都来源于用户实践研究和应用经验。
工作流定义
从多方面看,定义工作流程是最容易的部分。这是一个描述所有逆向工程的机会:任务如何相互依赖,它们应该在什么情况下运行,以及对基础设施有什么要求。
许多工作流框架都设计成在配置文件或详细数据结构中定义工作流。**在设计Prefect时,我们进行的关于用户实践的所有研究中,没有一个人说他想要更明确的显式工作流定义。也没有一次听到要求更多的YAML配置。**从未有人自愿重写其代码来适配工作流框架的API。
因此,Prefect将这种用于工作流定义的方法视为失败的设计。虽然Prefect能为每个工作流都构建一个完全自省和可定制化的DAG定义,但如果用户不希望与之打交道,就永远不需要它。对应的,Python就是API。用户定义Python普通函数并像在任何脚本中一样调用它们,Prefect框架负责收集task依赖,生成工作流结构。
工作流引擎
一旦定义了工作流,我们就需要执行它。这是逆向工程的核心。
Prefect 的引擎是一个健壮的pipeline,它嵌入了工作流执行的逻辑。它本质上是一个规则系统,用于决定任务是否应该运行;运行时会发生什么;以及停止时该怎么办。
仅仅按顺序“启动”每个任务是不够的;任务可能会因为成功、失败、跳过、暂停甚至崩溃而停止运行!这些结果中的每一个都可能需要来自引擎本身或下游任务做出不同响应。引擎的作用是检查工作流定义的依赖,并保证每个任务都遵循分配给它的规则。
工作流状态
可以说,Prefect 的主要创新不是简化的工作流定义系统或健壮的工作流引擎,而是工作流状态的丰富抽象。
大多数工作流框架的标准是默认工作流会成功。这意味着如果任务停止而没有崩溃,则认为它是“成功的”;否则才是“失败”。这是一种极其有限的世界观。
如果你想跳过一个任务怎么办?如果你想暂停它怎么办?如果整个系统出现故障,您能否将其恢复原样?你能保证一个任务只会运行一次吗,即使在不相交环境中的多个系统试图运行它?
通过为每一个任务和工作流专门设计状态的抽象概念,使得Prefect能在工作流执行前、执行中、执行后的任何时刻描述它。
注意:
Prefect Core 根据 Apache 软件许可2.0版本 获得许可。 请注意,Prefect Core 包括用于运行 Prefect Server 和 Prefect UI 的实用程序,它们本身是根据 Prefect 社区许可证获得许可的。
示例
import aircraftlib as aclib from prefect import task, Flow @task def extract_reference_data(): print("fetching reference data...") return aclib.fetch_reference_data() @task def extract_live_data(): # Get the live aircraft vector data around Dulles airport dulles_airport_position = aclib.Position(lat=38.9519444444, long=-77.4480555556) area_surrounding_dulles = aclib.bounding_box(dulles_airport_position, radius_km=200) print("fetching live aircraft data...") raw_aircraft_data = aclib.fetch_live_aircraft_data(area=area_surrounding_dulles) return raw_aircraft_data @task def transform(raw_aircraft_data, ref_data): print("cleaning & transform aircraft data...") live_aircraft_data = [] for raw_vector in raw_aircraft_data: vector = aclib.clean_vector(raw_vector) if vector: aclib.add_airline_info(vector, ref_data.airlines) live_aircraft_data.append(vector) return live_aircraft_data @task def load_reference_data(ref_data): print("saving reference data...") db = aclib.Database() db.update_reference_data(ref_data) @task def load_live_data(live_aircraft_data): print("saving live aircraft data...") db = aclib.Database() db.add_live_aircraft_data(live_aircraft_data) def main(): with Flow("etl") as flow: reference_data = extract_reference_data() live_data = extract_live_data() transformed_live_data = transform(live_data, reference_data) load_reference_data(reference_data) load_live_data(transformed_live_data) # 启动工作流 flow.run() if __name__ == "__main__": main()