如何将 Apache Airflow 用于机器学习工作流

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: Apache Airflow 是一个流行的平台,用于在 Python 中创建、调度和监控工作流。 它在 Github 上有超过 15,000 颗星,被 Twitter、Airbnb 和 Spotify 等公司的数据工程师使用。如果您使用的是 Apache Airflow,那么您的架构可能已经根据任务数量及其要求进行了演变。 在 Skillup.co 工作时,我们首先有几百个 DAG 来执行我们所有的数据工程任务,然后我们开始做机器学习。

使用执行器扩展 Apache Airflow

Apache Airflow 具有基于调度程序、工作节点、元数据数据库、Web 服务器和队列服务的多节点架构。

使用 Airflow 时的首要选择之一是执行器的类型。 执行程序与调度程序通信,以便在每个任务排队时为其分配资源。 执行器之间的差异归结为他们可用的资源。

Airflow 配置示例如下:

网络异常,图片无法展示
|


顺序执行器

默认执行器可以轻松地在本地测试 Airflow。 它在一台机器上按顺序运行任务,并使用 SQLite 存储任务的元数据。

本地执行者

本地执行器可以并行运行任务,并且需要像 PostgreSQL 这样支持并行的数据库。 虽然您可以在生产环境中运行本地执行器,但通常迁移到 Celery 执行器以提高可用性和可扩展性。


Celery 执行者

Celery 执行器需要设置 Redis 或 RabbitMQ 来将消息分发给工作人员。 Airflow 然后将任务分配给可以在一台或多台机器上运行的 Celery workers。 我们在 Skillup.co 使用的执行器,能够运行多达 256 个并发数据工程任务。

Kubernetes 执行器

Kubernetes 执行器为每个任务实例创建一个新的 pod。 它允许您根据任务要求动态扩展和缩减。


使用 operators 扩展 Apache Airflow

另一种扩展 Airflow 的方法是使用 operators 远程执行一些任务。 2018 年,Jessica Laughlin 认为我们都在错误地使用 Airflow,正确的方法是只使用 Kubernetes Operator。她认为应该有一个没有错误的 Operator 来执行任何任意任务,而不是越来越多的特定于功能的 Operators。

Kubernetes Operator

Kubernetes Operator 将在新 pod 中启动任务。当您有一组需要定期运行的任务时,我发现将 Kubernetes Operator 仅用于具有特定要求的任务是一个更好的主意。

我看到 Kubernetes Operator 的主要问题是您仍然需要了解 Kubernetes 配置系统并设置集群。例如,Dailymotion 在 Google Kubernetes Engine 上的集群中部署了 Airflow,并决定使用 KubernetesPodOperator 扩展 Airflow 以执行机器学习任务

在我们的案例中,我们是一个小型数据团队,几乎没有资源来设置 Kubernetes 集群。我们希望专注于构建机器学习模型,而不是管理基础设施。


机器学习任务与 ETL 任务有何不同?


在 Skillup.co,我们作为一个小团队必须在一年内构建和部署多个数据产品。 我们知道我们想使用开源库构建我们的模型,从经典机器学习模型到深度学习。 我们还在寻找一个机器学习平台来帮助我们对所有模型进行扩展和版本控制。

Airflow 可以很好地跟踪元数据数据库中的每个任务细节,但机器学习任务与 ETL 任务有不同的要求。 机器学习任务与数据、代码、环境、参数和指标相关联。 Airflow 不会收集和显示该信息。 而 Kubernetes 只在基础设施方面为您提供帮助。

在一个地方收集每次执行的所有相关信息有助于调试机器学习模型。 在下表中,您可以看到我们为更快地迭代机器学习模型而跟踪的信息。

网络异常,图片无法展示
|


我们如何选择用于扩展机器学习任务

您已经可以在 Google DataFlow、Amazon SageMaker 和 Databricks 等机器学习平台找到多个 Airflow operators。 这些 operators 的问题在于它们都有不同的规范,并且仅限于在这些平台上执行代码。

在我们开始在 Skillup.co 进行任何机器学习之前,我们将 Airflow 用于所有数据工程,这些数据工程主要由 Airflow BashOperator 调用的 Python CLI 组成。

然后我们决定使用基于开放标准的机器学习平台 Valohai 来帮助我们远程启动机器学习任务并获得自动版本控制。

有了混合解决方案,我们可以在 Airflow 安装中保留敏感数据,并将机器学习委托给 Valohai。

下图是使用 Airflow DAG 的机器学习工作流程。由于 Valohai 的原因,蓝色任务可以远程执行。

网络异常,图片无法展示
|


感谢 Valohai 的开放 API,我们开发了开源的 airflow-valohai-plugin 来集成两个平台。去年,我们用它在生产中发布了四个机器学习模型。

Valohai Operator

Valohai 算子背后的想法类似于 Kubernetes 算子。优点是您不需要了解 Kubernetes,并且您还可以获得机器学习的自动版本控制。

Valohai 将根据您的要求、代码和数据负责启动和停止云实例。 Valohai operator 只需在 Docker 容器中执行命令,轮询是否完成并返回最终状态代码。

通过提供 Docker 镜像和代码存储库,您可以执行任何语言和库的代码。您还可以访问 AWS、Google 和 Azure 中的 50 多个云环境。

要在 Airflow 上创建任务,您只需要指定 Valohai 项目和要执行的步骤。如果需要,您还可以覆盖默认的云环境、输入和参数。

from airflow.operators.valohai import ValohaiSubmitExecutionOperator
train = ValohaiSubmitExecutionOperator(
  task_id='train_model',
  # Specify the project and step from Valohai
  project_name='tensorflow-example',
  step='Train model (MNIST)',
  # Override defaults
  environment='aws-eu-west-1-g3-4xlarge',
  inputs={
    'training-set-images': ValohaiSubmitExecutionOperator.get_output_uri(
      task=preprocess,
      name='mnist-train-images.gz'),
    'training-set-labels': ValohaiSubmitExecutionOperator.get_output_uri(
      task=preprocess,
      name='mnist-train-labels.gz'),
    'test-set-images': ValohaiSubmitExecutionOperator.get_output_uri(
      task=preprocess,
      name='mnist-test-images.gz'),
    'test-set-labels': ValohaiSubmitExecutionOperator.get_output_uri(
      task=preprocess,
      name='mnist-test-labels.gz'),
  },
  parameters={
    'dropout': 0.9,
    'learning_rate': 0.001,
    'max_steps': 300,
    'batch_size': 200,
  },
  # Associate a task to a previously created DAG
  dag=dag,
)
# Set dependencies between tasks
preprocess >> train
复制代码


上面从 Airflow 向 Valohai 提交执行的示例代码。

另一方面,您需要通过创建 valohai.yaml 在 Valohai 端进行一些轻型配置。 valohai.yaml 用作配置文件,用于设置默认值并验证机器环境、docker 镜像、运行命令、参数和每次执行的输入数据文件。

从一开始就有了机器版本控制,这有助于我们调试数据、代码和参数,从而实现预测,并更快地修复预测。就像您希望您的 Airflow 任务是幂等的以避免重新启动它们时产生副作用一样,您希望您的机器学习模型基于代码和数据的审计版本。如果您总是根据存储在数据湖中的文件训练模型,这很容易做到。下面您可以在 Valohai UI 中看到已解析的配置以执行。

Valohai 执行详细信息 UI 如下

网络异常,图片无法展示
|


Valohai 构建在两种智能选择之上,这两种选择使其易于与任何编程语言和库集成。

首先,选择适用于所有编程语言的 CLI 优先的接口。 CLI 是一种流行的接口,用于包装函数以在本地执行它们。 CLI 也是 Bash、Kubernetes 和 Valohai operators 的接口。

其次,从标准输出中收集执行指标,而不必为每种语言安装自定义库。 所有语言都有将 JSON 对象写入标准输出的工具。 Valohai 将自动解析该对象,例如,帮助您比较每个模型的准确性。

具有机器学习执行参数和准确性的 Valohai UI 如下所示

网络异常,图片无法展示
|


您还可以在 Valohai UI 中手动启动执行,而不会产生任何副作用。在 Airflow 中,清除任务的状态将触发下游任务。

最后但同样重要的是,新的 Valohai operator 让您可以轻松地将一个执行的输出作为下一个执行的输入传递。这帮助我们创建了数据在 S3 上自动进行版本控制的管道。此外,每个新的执行都在与 S3 存储桶相同的云提供商和区域上运行,这使得 Valohai 可以快速将其下载到 AWS EC2 实例上。


总结

Apache Airflow 是一个强大的工具,用于创建、调度和监控工作流,但它是为 ETL 任务而构建的。机器学习任务需要特定的资源,并且它们的执行细节应该是版本控制的。

如果您有资源来维护 Kubernetes 集群,您可以使用 KubernetesPodOperator 扩展机器学习任务。

如果您想专注于构建模型,您可以使用 ValohaiSubmitExecutionOperator为机器学习任务扩展 Airflow。这样,您还将获得每次执行的自动版本控制。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
6月前
|
存储 机器学习/深度学习 Apache
如何将Apache Hudi应用于机器学习
如何将Apache Hudi应用于机器学习
59 0
|
9天前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
3月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
203 0
|
5月前
|
监控 数据处理 调度
使用Apache Airflow进行工作流编排:技术详解与实践
【6月更文挑战第5天】Apache Airflow是开源的工作流编排平台,用Python定义复杂数据处理管道,提供直观DAGs、强大调度、丰富插件、易扩展性和实时监控。本文深入介绍Airflow基本概念、特性,阐述安装配置、工作流定义、调度监控的步骤,并通过实践案例展示如何构建数据获取、处理到存储的工作流。Airflow简化了复杂数据任务管理,适应不断发展的数据技术需求。
1098 3
|
6月前
|
存储 机器学习/深度学习 Apache
Apache Hudi与机器学习特征存储
Apache Hudi与机器学习特征存储
82 0
|
6月前
|
存储 机器学习/深度学习 分布式计算
Apache Hudi在Hopsworks机器学习的应用
Apache Hudi在Hopsworks机器学习的应用
125 0
|
3月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
45 1
|
28天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
575 13
Apache Flink 2.0-preview released
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
63 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

推荐镜像

更多