如何将 Apache Airflow 用于机器学习工作流

简介: Apache Airflow 是一个流行的平台,用于在 Python 中创建、调度和监控工作流。 它在 Github 上有超过 15,000 颗星,被 Twitter、Airbnb 和 Spotify 等公司的数据工程师使用。如果您使用的是 Apache Airflow,那么您的架构可能已经根据任务数量及其要求进行了演变。 在 Skillup.co 工作时,我们首先有几百个 DAG 来执行我们所有的数据工程任务,然后我们开始做机器学习。

使用执行器扩展 Apache Airflow

Apache Airflow 具有基于调度程序、工作节点、元数据数据库、Web 服务器和队列服务的多节点架构。

使用 Airflow 时的首要选择之一是执行器的类型。 执行程序与调度程序通信,以便在每个任务排队时为其分配资源。 执行器之间的差异归结为他们可用的资源。

Airflow 配置示例如下:

网络异常,图片无法展示
|


顺序执行器

默认执行器可以轻松地在本地测试 Airflow。 它在一台机器上按顺序运行任务,并使用 SQLite 存储任务的元数据。

本地执行者

本地执行器可以并行运行任务,并且需要像 PostgreSQL 这样支持并行的数据库。 虽然您可以在生产环境中运行本地执行器,但通常迁移到 Celery 执行器以提高可用性和可扩展性。


Celery 执行者

Celery 执行器需要设置 Redis 或 RabbitMQ 来将消息分发给工作人员。 Airflow 然后将任务分配给可以在一台或多台机器上运行的 Celery workers。 我们在 Skillup.co 使用的执行器,能够运行多达 256 个并发数据工程任务。

Kubernetes 执行器

Kubernetes 执行器为每个任务实例创建一个新的 pod。 它允许您根据任务要求动态扩展和缩减。


使用 operators 扩展 Apache Airflow

另一种扩展 Airflow 的方法是使用 operators 远程执行一些任务。 2018 年,Jessica Laughlin 认为我们都在错误地使用 Airflow,正确的方法是只使用 Kubernetes Operator。她认为应该有一个没有错误的 Operator 来执行任何任意任务,而不是越来越多的特定于功能的 Operators。

Kubernetes Operator

Kubernetes Operator 将在新 pod 中启动任务。当您有一组需要定期运行的任务时,我发现将 Kubernetes Operator 仅用于具有特定要求的任务是一个更好的主意。

我看到 Kubernetes Operator 的主要问题是您仍然需要了解 Kubernetes 配置系统并设置集群。例如,Dailymotion 在 Google Kubernetes Engine 上的集群中部署了 Airflow,并决定使用 KubernetesPodOperator 扩展 Airflow 以执行机器学习任务

在我们的案例中,我们是一个小型数据团队,几乎没有资源来设置 Kubernetes 集群。我们希望专注于构建机器学习模型,而不是管理基础设施。


机器学习任务与 ETL 任务有何不同?


在 Skillup.co,我们作为一个小团队必须在一年内构建和部署多个数据产品。 我们知道我们想使用开源库构建我们的模型,从经典机器学习模型到深度学习。 我们还在寻找一个机器学习平台来帮助我们对所有模型进行扩展和版本控制。

Airflow 可以很好地跟踪元数据数据库中的每个任务细节,但机器学习任务与 ETL 任务有不同的要求。 机器学习任务与数据、代码、环境、参数和指标相关联。 Airflow 不会收集和显示该信息。 而 Kubernetes 只在基础设施方面为您提供帮助。

在一个地方收集每次执行的所有相关信息有助于调试机器学习模型。 在下表中,您可以看到我们为更快地迭代机器学习模型而跟踪的信息。

网络异常,图片无法展示
|


我们如何选择用于扩展机器学习任务

您已经可以在 Google DataFlow、Amazon SageMaker 和 Databricks 等机器学习平台找到多个 Airflow operators。 这些 operators 的问题在于它们都有不同的规范,并且仅限于在这些平台上执行代码。

在我们开始在 Skillup.co 进行任何机器学习之前,我们将 Airflow 用于所有数据工程,这些数据工程主要由 Airflow BashOperator 调用的 Python CLI 组成。

然后我们决定使用基于开放标准的机器学习平台 Valohai 来帮助我们远程启动机器学习任务并获得自动版本控制。

有了混合解决方案,我们可以在 Airflow 安装中保留敏感数据,并将机器学习委托给 Valohai。

下图是使用 Airflow DAG 的机器学习工作流程。由于 Valohai 的原因,蓝色任务可以远程执行。

网络异常,图片无法展示
|


感谢 Valohai 的开放 API,我们开发了开源的 airflow-valohai-plugin 来集成两个平台。去年,我们用它在生产中发布了四个机器学习模型。

Valohai Operator

Valohai 算子背后的想法类似于 Kubernetes 算子。优点是您不需要了解 Kubernetes,并且您还可以获得机器学习的自动版本控制。

Valohai 将根据您的要求、代码和数据负责启动和停止云实例。 Valohai operator 只需在 Docker 容器中执行命令,轮询是否完成并返回最终状态代码。

通过提供 Docker 镜像和代码存储库,您可以执行任何语言和库的代码。您还可以访问 AWS、Google 和 Azure 中的 50 多个云环境。

要在 Airflow 上创建任务,您只需要指定 Valohai 项目和要执行的步骤。如果需要,您还可以覆盖默认的云环境、输入和参数。

from airflow.operators.valohai import ValohaiSubmitExecutionOperator
train = ValohaiSubmitExecutionOperator(
  task_id='train_model',
  # Specify the project and step from Valohai
  project_name='tensorflow-example',
  step='Train model (MNIST)',
  # Override defaults
  environment='aws-eu-west-1-g3-4xlarge',
  inputs={
    'training-set-images': ValohaiSubmitExecutionOperator.get_output_uri(
      task=preprocess,
      name='mnist-train-images.gz'),
    'training-set-labels': ValohaiSubmitExecutionOperator.get_output_uri(
      task=preprocess,
      name='mnist-train-labels.gz'),
    'test-set-images': ValohaiSubmitExecutionOperator.get_output_uri(
      task=preprocess,
      name='mnist-test-images.gz'),
    'test-set-labels': ValohaiSubmitExecutionOperator.get_output_uri(
      task=preprocess,
      name='mnist-test-labels.gz'),
  },
  parameters={
    'dropout': 0.9,
    'learning_rate': 0.001,
    'max_steps': 300,
    'batch_size': 200,
  },
  # Associate a task to a previously created DAG
  dag=dag,
)
# Set dependencies between tasks
preprocess >> train
复制代码


上面从 Airflow 向 Valohai 提交执行的示例代码。

另一方面,您需要通过创建 valohai.yaml 在 Valohai 端进行一些轻型配置。 valohai.yaml 用作配置文件,用于设置默认值并验证机器环境、docker 镜像、运行命令、参数和每次执行的输入数据文件。

从一开始就有了机器版本控制,这有助于我们调试数据、代码和参数,从而实现预测,并更快地修复预测。就像您希望您的 Airflow 任务是幂等的以避免重新启动它们时产生副作用一样,您希望您的机器学习模型基于代码和数据的审计版本。如果您总是根据存储在数据湖中的文件训练模型,这很容易做到。下面您可以在 Valohai UI 中看到已解析的配置以执行。

Valohai 执行详细信息 UI 如下

网络异常,图片无法展示
|


Valohai 构建在两种智能选择之上,这两种选择使其易于与任何编程语言和库集成。

首先,选择适用于所有编程语言的 CLI 优先的接口。 CLI 是一种流行的接口,用于包装函数以在本地执行它们。 CLI 也是 Bash、Kubernetes 和 Valohai operators 的接口。

其次,从标准输出中收集执行指标,而不必为每种语言安装自定义库。 所有语言都有将 JSON 对象写入标准输出的工具。 Valohai 将自动解析该对象,例如,帮助您比较每个模型的准确性。

具有机器学习执行参数和准确性的 Valohai UI 如下所示

网络异常,图片无法展示
|


您还可以在 Valohai UI 中手动启动执行,而不会产生任何副作用。在 Airflow 中,清除任务的状态将触发下游任务。

最后但同样重要的是,新的 Valohai operator 让您可以轻松地将一个执行的输出作为下一个执行的输入传递。这帮助我们创建了数据在 S3 上自动进行版本控制的管道。此外,每个新的执行都在与 S3 存储桶相同的云提供商和区域上运行,这使得 Valohai 可以快速将其下载到 AWS EC2 实例上。


总结

Apache Airflow 是一个强大的工具,用于创建、调度和监控工作流,但它是为 ETL 任务而构建的。机器学习任务需要特定的资源,并且它们的执行细节应该是版本控制的。

如果您有资源来维护 Kubernetes 集群,您可以使用 KubernetesPodOperator 扩展机器学习任务。

如果您想专注于构建模型,您可以使用 ValohaiSubmitExecutionOperator为机器学习任务扩展 Airflow。这样,您还将获得每次执行的自动版本控制。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
人工智能
一键生成视频!用 PAI-EAS 部署 AI 视频生成模型 SVD 工作流(清晰的实例)
用 PAI-EAS 部署 AI 视频生成模型 SVD 工作流(清晰的实例)
554 2
|
存储 机器学习/深度学习 Apache
如何将Apache Hudi应用于机器学习
如何将Apache Hudi应用于机器学习
256 0
|
机器学习/深度学习 数据采集 人工智能
容器化机器学习流水线:构建可复用的AI工作流
本文介绍了如何构建容器化的机器学习流水线,以提高AI模型开发和部署的效率与可重复性。首先,我们探讨了机器学习流水线的概念及其优势,包括自动化任务、确保一致性、简化协作和实现CI/CD。接着,详细说明了使用Kubeflow Pipelines在Kubernetes上构建流水线的步骤,涵盖安装、定义流水线、构建组件镜像及上传运行。容器化流水线不仅提升了环境一致性和可移植性,还通过资源隔离和扩展性支持更大规模的数据处理。
|
机器学习/深度学习 数据采集 数据处理
Scikit-learn Pipeline完全指南:高效构建机器学习工作流
Scikit-learn管道是构建高效、鲁棒、可复用的机器学习工作流程的利器。通过掌握管道的使用,我们可以轻松地完成从数据预处理到模型训练、评估和部署的全流程,极大地提高工作效率。
457 2
Scikit-learn Pipeline完全指南:高效构建机器学习工作流
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
人工智能 编解码 对象存储
一键生成视频,用 PAI-EAS 部署 AI 视频生成模型 SVD 工作流
本教程将带领大家免费领取阿里云PAI-EAS的免费试用资源,并且带领大家在 ComfyUI 环境下使用 SVD的模型,根据任何图片生成一个小短视频。
|
机器学习/深度学习 数据采集 分布式计算
构建一个高效的机器学习工作流:技术实践与优化策略
【8月更文挑战第12天】构建一个高效的机器学习工作流是一个复杂而持续的过程,需要综合考虑数据、模型、算法、平台等多个方面。通过遵循上述步骤和优化策略,可以显著提高机器学习项目的开发效率和质量,为业务带来更大的价值。未来,随着技术的不断进步和应用场景的不断拓展,我们有理由相信机器学习工作流将变得更加高效、智能和灵活。
|
监控 数据处理 调度
使用Apache Airflow进行工作流编排:技术详解与实践
【6月更文挑战第5天】Apache Airflow是开源的工作流编排平台,用Python定义复杂数据处理管道,提供直观DAGs、强大调度、丰富插件、易扩展性和实时监控。本文深入介绍Airflow基本概念、特性,阐述安装配置、工作流定义、调度监控的步骤,并通过实践案例展示如何构建数据获取、处理到存储的工作流。Airflow简化了复杂数据任务管理,适应不断发展的数据技术需求。
2887 3
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
725 0
|
存储 机器学习/深度学习 Apache
Apache Hudi与机器学习特征存储
Apache Hudi与机器学习特征存储
301 0

推荐镜像

更多