现代数据工程和自动化任务管理的领域中,选择合适的工具对于提升开发效率、确保流程稳定性和促进团队协作至关重要。在这个背景下,Argo Workflows 和 Apache Airflow 作为两大最流行的分布式任务调度系统,各自凭借着独特的设计理念和强大的功能集,在业界引起了广泛的关注与应用。本文旨在深入探讨这两者的核心特性、应用场景以及它们之间的区别,旨在为寻求高效分布式任务调度解决方案的数据工程师、IT 决策者及开发者们提供一个全面而客观的比较指南。
Apache Airflow 简介
Apache Airflow 于 2015 年开源,它是一个著名的开源自动化和工作流管理平台,用于编写、调度和监控工作流。Airflow 允许使用 Python 语言将工作流编写为 DAG,它的设计原则,如代码优先配置、动态工作流和强大的可视化界面,使得 Airflow 在数据工程师和数据科学家之间非常受欢迎。它是 Apache 顶级项目。
它的主要特性有:
- Pythonic DSL:Airflow 允许用户将工作流定义为 Python 代码,从而帮助开发人员灵活、轻松地创建和管理工作流。
- DAG 支持:Airflow 使用有向无环图(DAG)来表示工作流中的任务及其依赖关系。这种结构使得复杂的任务调度变得直观且易于理解。
- 丰富三方库:Airflow 社区提供了大量的 Operator 和其他组件,覆盖了从数据处理到机器学习等多种场景,这些都极大地扩展了其功能范围。
- 良好的用户界面:Airflow 拥有一个直观的 Web 界面,让用户能够方便地监控 DAG 的状态、触发任务执行以及查看日志等。
- 平台集成:Airflow 可以很容易地与各种不同的系统和服务进行集成,比如数据库、云服务提供商等,这得益于它高度可配置的设计。
- 可扩展性:无论是增加更多的 Worker 节点以处理更多任务还是通过编写自定义 Operator 来满足特定需求,Airflow 都非常灵活,允许根据具体应用场景调整配置或扩展功能。
Argo Workflows 简介
Argo Workflows 于 2018 年开源,专为 Kubernetes 编排并行 Job 而设计。它引入了一种声明式的方式来定义和执行多步骤的工作流,这些工作流可以包含复杂的依赖关系、并行执行、条件分支等特性。通过 YAML 文件来定义工作流模板,使得工作流易于编写、维护和重复使用。它是云原生计算基金会(CNCF)下的毕业项目。
它的主要特性有:
- 云原生:Argo Workflows 是完全开源的,专为 Kubernetes 而设计,可以无缝与 Kubernetes 生态集成并利用其功能。
- 轻量化:Argo Workflows 被设计成轻量级解决方案,仅依赖于 Kubernetes 作为运行时环境,不需要额外的组件或服务来操作。这意味着用户可以在不增加系统复杂性的情况下快速启动和管理任务。
- 可扩展性:该工具提供了良好的水平扩展能力,能够根据需求自动调整资源使用情况。通过 Kubernetes 的强大调度能力和弹性伸缩特性,Argo Workflows 能轻松处理从小规模到大规模的各种工作负载。
- 并发性:Argo Workflows 支持高并发执行多个任务的能力,这对于需要同时运行大量独立或相关作业的应用场景尤为重要。它允许定义并行步骤,从而显著提高工作效率。
- DAG 支持:支持复杂的 DAG 工作流,可以创建具有复杂依赖关系的工作流。
- 用户界面:用于可视化地监控和管理你的工作流。这个图形化界面让非技术人员也能方便地查看工作流的状态,并进行简单的管理和调试操作。
Airflow vs Argo Workflows 共同点
DAG 支持:均支持复杂的 DAG 任务编排逻辑,无论多么复杂的依赖关系均能被编排的井然有序。
社区支持:均有非常庞大的社区支持,Argo Workflows 是 CNCF 毕业项目,Airflow 是 Apache 顶级项目。
任务调度:均能控制任务的定时触发、自动化 Retry 等工作编排需要的各项基础能力。
用户界面:均有非常出色的 UI 控制,能够非常好的观测到任务的运行情况。
Airflow vs Argo Workflows 区别
1、架构和性能
a) Airflow架构
Airflow 的系统架构如上所示,可以单独部署,或者部署在 Kuberntes 集群中,如下所示:
airflow % kubectl get pod -n airflow NAME READY STATUS RESTARTS AGE airflow-postgresql-0 1/1 Running 0 18h airflow-scheduler-54ff57bcb6-tgtqs 2/2 Running 0 5h54m airflow-statsd-769b757665-7pqjr 1/1 Running 0 18h airflow-triggerer-0 2/2 Running 0 5h53m airflow-webserver-75b749479b-97llm 1/1 Running 0 5h54m
核心系统组件包括 Scheduler、Web Server、Exector、Metadata DB 等。
Airflow 任务调度主要涉及三个关键的部分,包括 scheduler、metadata DB 以及 Exector,任务调度流程如下:
1. DAG 定义与解析:用户编写 Python 脚本定义 DAG,包含任务(Operators)及依赖关系(如 >> 或 set_upstream)
2. 生成任务实例:Scheduler 周期性扫描 DAGs 目录(默认 30 秒),解析 DAG 文件并更新元数据库(PostgreSQL)中的元数据,根据 schedule_interval(如 @daily)生成 DAG Run,每个任务实例(Task Instance)关联唯一的 execution_date
3. 调度决策与触发:Scheduler 检查任务的上游依赖是否完成,并验证时间条件(如周期结束触发,例如 2023-01-01 的任务在 2023-01-02 00:00 触发),满足条件的任务实例状态从 None 转为 SCHEDULED,进入执行队列。
4. 任务执行:Executor(如 KubernetesExector、CeleryExecutor)从队列中拉取任务并执行,状态流转为 RUNNING,结束后更新为 SUCCESS/FAILED。
b) Argo Workflows架构
Argo Workflows 直接部署在 Kubernetes 集群之上。
argo % kubectl get pod -n argo NAME READY STATUS RESTARTS AGE argo-server-75d76d9996-sqx9z 1/1 Running 0 34s workflow-controller-798c4b99bb-vl8rx 1/1 Running 0 34s
Argo Workflows 调度流程:
1. DAG 文件定义:通过 YAML 或者 Python 定义,Step 或者 DAG 类型的文件,描述各个任务之间的依赖关系。
2. 任务生成:workflow-controller 根据 workflow 产生的事件驱动,读取 etcd 中的 workflow crd 信息,检查 DAG 依赖关系,生成新的任务 Pod。
3. 任务调度:Kubernetes 调度器直接调度 Pod。
4. 任务执行:exector 中执行任务,包括三个 container,initContainer 初始化信息,主要收集该任务的前置依赖,例如前序任务传递到该任务中的文件等,main container 执行任务,waitcontaienr 观测 maincontainer 任务执行状态,并收集输出结果。
c) 总结
部署架构:Airflow 组件较多,运维较为复杂,但是可以部署在本地或者云服务器等非容器化架构。Argo Workflows 可直接部署在 Kubernetes,组件架构较为简单,易于使用。
调度性能:Airflow 调度依赖于 Scheduler 对 DAG File 的定时轮巡和 Metadata DB 的频繁交互,有性能瓶颈,超过 300 DAG file 的时候调度速度便会明显变慢。Argo Workflows 依据事件驱动和 ETCD 存储,可并发调度数千工作流、数万子任务,并发调度性能较好。
2、语言与多用户
Airflow 原生支持 Python 语言,可以让任何了解 Python 语言的开发人员快速提交和运行工作流。
Argo Workflow 原生支持 YAML,可以让熟悉了解 YAML 规则的开发人员编写 DAG 工作流。同时也支持 Python 语言编排任务,被数据科学家们广泛使用。
下面是一个 Diamond Workflow 分别使用 Airflow 和 Argo Workflows 实现例子:
# The following workflow executes a diamond workflow # # A # / \ # B C # \ / # D
a)Airflow Diamond Workflow
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator # 定义默认参数 default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 5, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), } # 创建DAG对象 dag = DAG( 'dag_diamond', default_args=default_args, description='A simple diamond-shaped workflow', schedule_interval=timedelta(days=1), # 可根据需要调整调度间隔 ) # 定义一个简单的打印函数 def print_message(message): print(f"Message: {message}") # 创建任务A task_a = PythonOperator( task_id='A', python_callable=print_message, op_kwargs={'message': 'A'}, dag=dag, ) # 创建任务B task_b = PythonOperator( task_id='B', python_callable=print_message, op_kwargs={'message': 'B'}, dag=dag, ) # 创建任务C task_c = PythonOperator( task_id='C', python_callable=print_message, op_kwargs={'message': 'C'}, dag=dag, ) # 创建任务D task_d = PythonOperator( task_id='D', python_callable=print_message, op_kwargs={'message': 'D'}, dag=dag, ) # 设置任务依赖关系 task_a >> [task_b, task_c] # A完成后同时执行B和C [task_b, task_c] >> task_d # B和C都完成后才执行D
b)Argo Diamond Workflow
# 导入相关包 from hera.workflows import DAG, Workflow, script from hera.shared import global_config import urllib3 urllib3.disable_warnings() # 配置访问地址和token global_config.host = "https://{argoserverip}:2746" global_config.token = "abcdefgxxxxxx" # 填入之前获取的token global_config.verify_ssl = "" # 装饰器函数script是 Hera 实现近乎原生的 Python 函数编排的关键功能。 # 它允许您在 Hera 上下文管理器(例如Workflow或Steps上下文)下调用该函数, # 该函数在任何 Hera 上下文之外仍将正常运行,这意味着您可以在给定函数上编写单元测试。 # 该示例是打印输入的信息。 () def echo(message: str): print(message) # 构建workflow,Workflow是 Argo 中的主要资源, # 也是 Hera 的关键类,负责保存模板、设置入口点和运行模板。 with Workflow( generate_name="dag-diamond-", entrypoint="diamond", ) as w: with DAG(name="diamond"): A = echo(name="A", arguments={"message": "A"}) # 构建template B = echo(name="B", arguments={"message": "B"}) C = echo(name="C", arguments={"message": "C"}) D = echo(name="D", arguments={"message": "D"}) A >> [B, C] >> D # 构建依赖关系,B、C任务依赖A,D依赖B和C # 创建workflow w.create()
YAML:
apiVersion argoproj.io/v1alpha1 kind Workflow metadata generateName dag-diamond- spec entrypoint diamond templates container command echo '{{inputs.parameters.message}}' image alpine3.7 inputs parameters name message name echo dag tasks arguments parameters name message value A name A template echo arguments parameters name message value B depends A name B template echo arguments parameters name message value C depends A name C template echo arguments parameters name message value D depends B && C name D template echo name diamond
c) 总结
语言:Airflow 仅支持 Python 语言编写工作流,由于其开源第一天就支持 Python,其 Python 语言支持更为成熟。Argo Workflows 既支持 YAML 编排,也能够支持 Python 语言构建工作流,其 Python 语言的支持在逐步完善,并且有生产大规模使用案例。
多用户场景:Airflow 的多用户场景支持不够完善,通常使用在数据科学家场景。Argo Workflows 得益于 YAML 和 Python 双语支持,和 Kubernetes 的 RBAC、命名空间等隔离能力,在多用户支持场景有天然优势。能够让不同的团队(数据科学家、MLOps/Devops 工程师),不同的用户共享集群,并有精细化的权限控制和隔离能力。
3、大数据和 AI
在离线任务编排领域,大数据 AI 生态是必须要考虑的问题。Argo Workflows 和 Airflow 在大数据和 AI 都有非常好的生态:
Airflow 有丰富的 Operator 能够让用户在工作流中编排便捷的结合大数据/Al 的框架来处理任务,包括 SparkKubernetesOperator、HiveOperator、SQLExecuteQueryOperator、PrestoToMySqlOperator,以及和云厂商结合的 EmrServerlessSparkStartJobRunOperator、GenieOperator 等。
Argo Workflows 有 Resource Template 可以和大数据和 AI 场景下各种框架兼容,以及多种的 Plugin 扩展,包括 Spark、Volcano、Pytorch、Ray Plugin 等,可以在 YAML 或者 Python Code 中嵌入大数据和 AI 任务。
a) Spark with Airflow
其中 spark-k8s.yam 里面内容是 SparkApplications:
from datetime import timedelta import os from airflow import DAG from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator from airflow.utils.dates import days_ago from airflow.models import Variable default_args = { 'owner': 'Howdy', 'retries': 3, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'spark_on_k8s_airflow', start_date=days_ago(1), catchup=False, schedule_interval=timedelta(days=1), template_searchpath='/opt/airflow/dags/repo/dags/spark8s/' ) spark_k8s_task = SparkKubernetesOperator( task_id='n-spark-on-k8s-airflow', trigger_rule="all_success", depends_on_past=False, retries=0, application_file='spark-k8s.yaml', namespace="spark-apps", kubernetes_conn_id="kubernetes_default", do_xcom_push=True, dag=dag ) spark_k8s_task
本例为通过 Airflow 提交一个 Spark Pi 的 Job。
b) Spark with Argo Workflows
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: spark-pi-demo- spec: entrypoint: spark-pi templates: - name: spark-pi plugin: spark: # SparkApplication definition (Spark Operator must be installed in advance) apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-pi-demo namespace: argo spec: type: Scala mode: cluster image: "gcr.io/spark-operator/spark:v3.3.1" mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.3.1.jar" sparkVersion: "3.3.1" restartPolicy: type: Never driver: cores: 1 memory: "2g" serviceAccount: spark-sa labels: version: 3.3.1 executor: cores: 2 instances: 2 memory: "4g" labels: version: 3.3.1 arguments: - "1000"
Argo Workflows 可以通过 ResourceTemplate 或者 Plugin 的方式创建提交 Spark Job。本例为通过 Argo Workflows YAML 编排一个 Spark Pi 的 Job。
c) Pytorch with Airflow
import datetime import pendulum from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunType from airflow.models.dag import DAG from airflow.decorators import task DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC") DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1) from torchx.schedulers.ids import make_unique @task(task_id=f'hello_torchx') def run_torchx(message): """This is a function that will run within the DAG execution""" from torchx.runner import get_runner with get_runner() as runner: # Run the utils.sh component on the local_cwd scheduler. app_id = runner.run_component( "utils.sh", ["echo", message], scheduler="local_cwd", ) # Wait for the the job to complete status = runner.wait(app_id, wait_interval=1) # Raise_for_status will raise an exception if the job didn't succeed status.raise_for_status() # Finally we can print all of the log lines from the TorchX job so it # will show up in the workflow logs. for line in runner.log_lines(app_id, "sh", k=0): print(line, end="") with DAG( dag_id=make_unique('example_python_operator'), schedule_interval=None, start_date=DATA_INTERVAL_START, catchup=False, tags=['example'], ) as dag: run_job = run_torchx("Hello, TorchX!") dagrun = dag.create_dagrun( state=DagRunState.RUNNING, execution_date=DATA_INTERVAL_START, data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END), start_date=DATA_INTERVAL_END, run_type=DagRunType.MANUAL, ) ti = dagrun.get_task_instance(task_id="hello_torchx") ti.task = dag.get_task(task_id="hello_torchx") ti.run(ignore_ti_state=True) assert ti.state == TaskInstanceState.SUCCESS
在 Airflow 中使用 TorchX,可以进行管道编排并在远程 GPU 集群上运行 PyTorch 应用程序(即分布式训练)。
d) Pytorch with Argo Workflow
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: pytorch-deepspeed-demo- spec: entrypoint: main arguments: parameters: - name: checkpoint-dir value: /root/deepspeed_data - name: instance-spec value: ecs.gn7i-c8g1.2xlarge - name: image value: mirrors-ssl.aliyuncs.com/kubeflow/pytorch-deepspeed-demo:latest templates: - name: main steps: - - name: run-pytorch-job template: pytorch-job-template arguments: parameters: - name: checkpoint-dir value: "{{workflow.parameters.checkpoint-dir}}" - - name: check-pytorch-job-status template: check-status arguments: parameters: - name: job-name value: "{{steps.run-pytorch-job.outputs.parameters.job-name}}" - name: pytorch-job-template inputs: parameters: - name: checkpoint-dir resource: action: create successCondition: status.replicaStatuses.Worker.succeeded = 1 failureCondition: status.replicaStatuses.Worker.failed > 0 manifest: | apiVersion: "kubeflow.org/v1" kind: PyTorchJob metadata: generateName: pytorch-deepspeed-demo- spec: pytorchReplicaSpecs: Master: replicas: 1 restartPolicy: OnFailure template: metadata: name: deepspeed-master spec: containers: - name: pytorch image: "{{workflow.parameters.image}}" command: - torchrun - /train_bert_ds.py - --checkpoint_dir - {{inputs.parameters.checkpoint-dir}} resources: limits: nvidia.com/gpu: 1 Worker: replicas: 1 restartPolicy: OnFailure template: metadata: name: deepspeed-worker spec: containers: - name: pytorch image: "{{workflow.parameters.image}}" command: - torchrun - /train_bert_ds.py - --checkpoint_dir - {{inputs.parameters.checkpoint-dir}} resources: limits: nvidia.com/gpu: 1 outputs: parameters: - name: job-name valueFrom: jsonPath: '{.metadata.name}' - name: check-status inputs: parameters: - name: job-name script: image: mirrors-ssl.aliyuncs.com/bitnami/kubectl:latest command: [bash] source: | #!/bin/bash set -ex JOB_NAME="{{inputs.parameters.job-name}}" STATUS=$(kubectl get pytorchjob $JOB_NAME -o jsonpath='{.status.conditions[?(@.type=="Succeeded")].status}') if [ "$STATUS" = "True" ]; then echo "PyTorch Job succeeded." exit 0 else echo "PyTorch Job failed or is still running." exit 1 fi
本例为通过 Argo Workflows 的 Resouece Template 编排一个 Pytorch 分布式训练任务并检查其运行状态的示例。
e) 总结
大数据:Airflow 在大数据场景下有众多的 Operator,生态非常丰富,并且擅长和云厂商的服务进行结合。Argo Workflow 擅长在 Kubernetes 生态下和 Kubeflow/SparkOperator 结合进行任务编排。
机器学习 Pipeline: Argo Workflows 可以通过 Plugin、ResourceTemplate 等方式编排 Pytorch、Volcano 等 Job 类型。Airflow 更擅长直接编写 Python 类型的 AI 作业任务。
两者在大数据 AI 场景中均具备独特优势,实际选型需结合技术栈与业务需求。
技术选型建议
总的来讲,Airflow 和 Argo Workflows 在都在批量数据处理、机器学习 Pipeline、自动化等场景有非常好的能力与应用,对于企业的决策者来说,选择哪种调度引擎需要根据团队的技术栈、服务的场景、使用的用户多少来决定。 尤其的,在以下三种场景,建议选择 Argo Workflows:
场景 |
选型建议 |
优势 |
容器化、Kubernetes 环境 |
Argo Workflows |
Cloud Native,良好的扩展性、 可观测性、并发性。 |
多用户场景(数据处理+ MLOps+Devops) |
Argo Workflows |
YAML/Python 多语言支持, RBAC、Namespace 级别权限控制 |
大规模高性能计算、 并行计算 |
Argo Workflows |
事件驱动,可并行运行数千工作流, 数万子任务 Pod,调度性能高。 |
Airflow vs Argo Workflow 核心概念对照
Airflow 和 Argo Workflows 都是通用的任务编排引擎,其核心能力基本一致,有时候当选择了其中一个框架后,随着业务增长或者系统架构升级,比如资源调度从 Hadoop Yarn 升级到 Kubernetes 生态、或者需要更高性能的大规模调度,这时候原来的调度任务引擎不能满足要求,就需要从 Airflow 迁移到 Argo Workflows。下表即是一个 Airflow DAG 定义和 Argo Workflow DAG 核心概念的对比,可以帮助理清楚这两种任务编排引擎的任务定义关系,便于快速迁移。
核心特性 |
Airflow |
Argo Workflows |
Bash命令 |
BashOperator |
Command:[sh, -c] |
Python命令 |
PythonOperator |
Command:[python] |
任务定义 |
Task |
Template |
依赖关系 |
DAG |
DAG |
调度策略 |
Schedule Time |
CronWorkflows |
任务参数 |
Params |
Inputs Parameters |
任务变量 |
Variable |
ENV |
任务中间结果传递 |
Xcom |
Artifacts |
事件感知 |
Sensors |
Eventbinding |
回填 |
Airflow Backfill |
Argo Backfill |
扩展生态 |
SparkOperator |
SparkPlugin |
Serverless Argo Workflows
为了进一步降低用户在大规模任务编排场景下运维成本、增强调度效率,阿里云容器服务推出了 Serverless 的离线工作流平台:
采用全托管 Kubernetes 控制面(复用阿里云容器服务 ACK Pro【1】 控制面)与 Argo Workflows 控制面,并针对单体大工作流、大规模子任务 Pod 并行调度运行做了针对性的性能优化,同时使用阿里云容器服务 ACS【2】 提供的 Serverless Pod 运行工作流。通过控制面性能调优和 ACS 的 Serverless 极致弹性算力,工作流规模对比开源自建提升 10 倍,单体大工作流可以支持数万子任务 Pod,集群整体可以支持数千工作流和数万子任务的并行运行。
目前,阿里云 Serverless Argo Workflows 集群已经在自动驾驶仿真计算、科学计算、金融量化、机器学习、数据处理、持续集成领域广泛应用。如果您需要使用 Argo Workflows 调度大规模工作流,欢迎加入钉钉群号一同交流:35688562
参考
Argo Workflows:
https://github.com/argoproj/argo-workflows
Airflow:
https://github.com/apache/airflow
Argo Workflows Docs:
https://argo-workflows.readthedocs.io/en/latest/
Airflow Docs:
https://airflow.apache.org/docs/apache-airflow/stable/index.html
Airflow Operators:
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html
Argo Workflow Plugins:
https://argo-workflows.readthedocs.io/en/latest/plugin-directory/
Pytorch with Airflow:https://pytorch.org/torchx/main/pipelines/airflow.html
Apache Spark Operators:
https://airflow.apache.org/docs/apache-airflow-providers-apache-spark/stable/operators.html
Apache Airflow vs Argo Workflow in Autonomous driving simulation:
https://towardsdev.com/apache-airflow-vs-argo-workflow-feat-dag-kubernetes-4944a2d7097d
Argo Python SDK Hera:
https://hera-workflows.readthedocs.io/en/latest/
Argo Workflows Volcano Plugin:
https://github.com/iflytek/argo-volcano-executor-plugin
Argo Workflows Pytorch Plugin:
https://github.com/shuangkun/argo-workflows-pytorch-plugin
Argo Workflows Ray Plugin:
https://github.com/argoproj-labs/argo-workflows-ray-plugin
阿里云 Serverless Argo Workflows:https://www.alibabacloud.com/help/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/overview-12
创建工作流集群:
【1】阿里云容器服务ACK Pro
https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/product-overview/what-is-ack
【2】阿里云容器服务ACS
https://help.aliyun.com/zh/cs/product-overview/product-introduction