Airflow vs Argo Workflows:分布式任务调度系统的“华山论剑”

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
容器镜像服务 ACR,镜像仓库100个 不限时长
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 本文对比了Apache Airflow与Argo Workflows两大分布式任务调度系统。两者均支持复杂的DAG任务编排、社区支持及任务调度功能,且具备优秀的用户界面。Airflow以Python为核心语言,适合数据科学家使用,拥有丰富的Operator库和云服务集成能力;而Argo Workflows基于Kubernetes设计,支持YAML和Python双语定义工作流,具备轻量化、高性能并发调度的优势,并通过Kubernetes的RBAC机制实现多用户隔离。在大数据和AI场景中,Airflow擅长结合云厂商服务,Argo则更适配Kubernetes生态下的深度集成。

1.gif


现代数据工程和自动化任务管理的领域中,选择合适的工具对于提升开发效率、确保流程稳定性和促进团队协作至关重要。在这个背景下,Argo Workflows 和 Apache Airflow 作为两大最流行的分布式任务调度系统,各自凭借着独特的设计理念和强大的功能集,在业界引起了广泛的关注与应用。本文旨在深入探讨这两者的核心特性、应用场景以及它们之间的区别,旨在为寻求高效分布式任务调度解决方案的数据工程师、IT 决策者及开发者们提供一个全面而客观的比较指南。


Apache Airflow 简介



Apache Airflow 于 2015 年开源,它是一个著名的开源自动化和工作流管理平台,用于编写、调度和监控工作流。Airflow 允许使用 Python 语言将工作流编写为 DAG,它的设计原则,如代码优先配置、动态工作流和强大的可视化界面,使得 Airflow 在数据工程师和数据科学家之间非常受欢迎。它是 Apache 顶级项目。


它的主要特性有:


  • Pythonic DSL:Airflow 允许用户将工作流定义为 Python 代码,从而帮助开发人员灵活、轻松地创建和管理工作流。
  • DAG 支持:Airflow 使用有向无环图(DAG)来表示工作流中的任务及其依赖关系。这种结构使得复杂的任务调度变得直观且易于理解。
  • 丰富三方库:Airflow 社区提供了大量的 Operator 和其他组件,覆盖了从数据处理到机器学习等多种场景,这些都极大地扩展了其功能范围。
  • 良好的用户界面:Airflow 拥有一个直观的 Web 界面,让用户能够方便地监控 DAG 的状态、触发任务执行以及查看日志等。
  • 平台集成:Airflow 可以很容易地与各种不同的系统和服务进行集成,比如数据库、云服务提供商等,这得益于它高度可配置的设计。
  • 可扩展性:无论是增加更多的 Worker 节点以处理更多任务还是通过编写自定义 Operator 来满足特定需求,Airflow 都非常灵活,允许根据具体应用场景调整配置或扩展功能。


Argo Workflows 简介



Argo Workflows 于 2018 年开源,专为 Kubernetes 编排并行 Job 而设计。它引入了一种声明式的方式来定义和执行多步骤的工作流,这些工作流可以包含复杂的依赖关系、并行执行、条件分支等特性。通过 YAML 文件来定义工作流模板,使得工作流易于编写、维护和重复使用。它是云原生计算基金会(CNCF)下的毕业项目。


它的主要特性有:


  • 云原生:Argo Workflows 是完全开源的,专为 Kubernetes 而设计,可以无缝与 Kubernetes 生态集成并利用其功能。
  • 轻量化:Argo Workflows 被设计成轻量级解决方案,仅依赖于 Kubernetes 作为运行时环境,不需要额外的组件或服务来操作。这意味着用户可以在不增加系统复杂性的情况下快速启动和管理任务。
  • 可扩展性:该工具提供了良好的水平扩展能力,能够根据需求自动调整资源使用情况。通过 Kubernetes 的强大调度能力和弹性伸缩特性,Argo Workflows 能轻松处理从小规模到大规模的各种工作负载。
  • 并发性:Argo Workflows 支持高并发执行多个任务的能力,这对于需要同时运行大量独立或相关作业的应用场景尤为重要。它允许定义并行步骤,从而显著提高工作效率。
  • DAG 支持:支持复杂的 DAG 工作流,可以创建具有复杂依赖关系的工作流。
  • 用户界面:用于可视化地监控和管理你的工作流。这个图形化界面让非技术人员也能方便地查看工作流的状态,并进行简单的管理和调试操作。


Airflow vs Argo Workflows 共同点


DAG 支持:均支持复杂的 DAG 任务编排逻辑,无论多么复杂的依赖关系均能被编排的井然有序。

社区支持:均有非常庞大的社区支持,Argo Workflows 是 CNCF 毕业项目,Airflow 是 Apache 顶级项目。

任务调度:均能控制任务的定时触发、自动化 Retry 等工作编排需要的各项基础能力。

用户界面:均有非常出色的 UI 控制,能够非常好的观测到任务的运行情况。


Airflow vs Argo Workflows 区别


1、架构和性能


a) Airflow架构



Airflow 的系统架构如上所示,可以单独部署,或者部署在 Kuberntes 集群中,如下所示:


airflow % kubectl get pod -n airflow
NAME                                 READY   STATUS    RESTARTS   AGE
airflow-postgresql-0                 1/1     Running   0          18h
airflow-scheduler-54ff57bcb6-tgtqs   2/2     Running   0          5h54m
airflow-statsd-769b757665-7pqjr      1/1     Running   0          18h
airflow-triggerer-0                  2/2     Running   0          5h53m
airflow-webserver-75b749479b-97llm   1/1     Running   0          5h54m


核心系统组件包括 Scheduler、Web Server、Exector、Metadata DB 等。


Airflow 任务调度主要涉及三个关键的部分,包括 scheduler、metadata DB 以及 Exector,任务调度流程如下:


1. DAG 定义与解析:用户编写 Python 脚本定义 DAG,包含任务(Operators)及依赖关系(如 >> 或 set_upstream)


2. 生成任务实例:Scheduler 周期性扫描 DAGs 目录(默认 30 秒),解析 DAG 文件并更新元数据库(PostgreSQL)中的元数据,根据 schedule_interval(如 @daily)生成 DAG Run,每个任务实例(Task Instance)关联唯一的 execution_date


3. 调度决策与触发:Scheduler 检查任务的上游依赖是否完成,并验证时间条件(如周期结束触发,例如 2023-01-01 的任务在 2023-01-02 00:00 触发),满足条件的任务实例状态从 None 转为 SCHEDULED,进入执行队列。


4. 任务执行:Executor(如 KubernetesExector、CeleryExecutor)从队列中拉取任务并执行,状态流转为 RUNNING,结束后更新为 SUCCESS/FAILED。


b) Argo Workflows架构



Argo Workflows 直接部署在 Kubernetes 集群之上。


argo % kubectl get pod -n argo
NAME                                       READY   STATUS    RESTARTS   AGE
argo-server-75d76d9996-sqx9z           1/1     Running   0          34s
workflow-controller-798c4b99bb-vl8rx   1/1     Running   0          34s


Argo Workflows 调度流程:


1. DAG 文件定义:通过 YAML 或者 Python 定义,Step 或者 DAG 类型的文件,描述各个任务之间的依赖关系。

 

2. 任务生成:workflow-controller 根据 workflow 产生的事件驱动,读取 etcd 中的 workflow crd 信息,检查 DAG 依赖关系,生成新的任务 Pod。


3. 任务调度:Kubernetes 调度器直接调度 Pod。


4. 任务执行:exector 中执行任务,包括三个 container,initContainer 初始化信息,主要收集该任务的前置依赖,例如前序任务传递到该任务中的文件等,main container 执行任务,waitcontaienr 观测 maincontainer 任务执行状态,并收集输出结果。


c) 总结


部署架构:Airflow 组件较多,运维较为复杂,但是可以部署在本地或者云服务器等非容器化架构。Argo Workflows 可直接部署在 Kubernetes,组件架构较为简单,易于使用。


调度性能:Airflow 调度依赖于 Scheduler 对 DAG File 的定时轮巡和 Metadata DB 的频繁交互,有性能瓶颈,超过 300 DAG file 的时候调度速度便会明显变慢。Argo Workflows 依据事件驱动和 ETCD 存储,可并发调度数千工作流、数万子任务,并发调度性能较好。


2、语言与多用户


Airflow 原生支持 Python 语言,可以让任何了解 Python 语言的开发人员快速提交和运行工作流。


Argo Workflow 原生支持 YAML,可以让熟悉了解 YAML 规则的开发人员编写 DAG 工作流。同时也支持 Python 语言编排任务,被数据科学家们广泛使用。


下面是一个 Diamond Workflow 分别使用 Airflow 和 Argo Workflows 实现例子:


# The following workflow executes a diamond workflow
# 
#   A
#  / \
# B   C
#  \ /
#   D


a)Airflow Diamond Workflow


from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

# 定义默认参数
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2025, 5, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# 创建DAG对象
dag = DAG(
    'dag_diamond',
    default_args=default_args,
    description='A simple diamond-shaped workflow',
    schedule_interval=timedelta(days=1),  # 可根据需要调整调度间隔
)

# 定义一个简单的打印函数
def print_message(message):
    print(f"Message: {message}")

# 创建任务A
task_a = PythonOperator(
    task_id='A',
    python_callable=print_message,
    op_kwargs={'message': 'A'},
    dag=dag,
)

# 创建任务B
task_b = PythonOperator(
    task_id='B',
    python_callable=print_message,
    op_kwargs={'message': 'B'},
    dag=dag,
)

# 创建任务C
task_c = PythonOperator(
    task_id='C',
    python_callable=print_message,
    op_kwargs={'message': 'C'},
    dag=dag,
)

# 创建任务D
task_d = PythonOperator(
    task_id='D',
    python_callable=print_message,
    op_kwargs={'message': 'D'},
    dag=dag,
)

# 设置任务依赖关系
task_a >> [task_b, task_c]  # A完成后同时执行B和C
[task_b, task_c] >> task_d  # B和C都完成后才执行D


b)Argo Diamond Workflow


# 导入相关包
from hera.workflows import DAG, Workflow, script
from hera.shared import global_config
import urllib3

urllib3.disable_warnings()

# 配置访问地址和token
global_config.host = "https://{argoserverip}:2746"
global_config.token = "abcdefgxxxxxx" # 填入之前获取的token
global_config.verify_ssl = ""

# 装饰器函数script是 Hera 实现近乎原生的 Python 函数编排的关键功能。
# 它允许您在 Hera 上下文管理器(例如Workflow或Steps上下文)下调用该函数,
# 该函数在任何 Hera 上下文之外仍将正常运行,这意味着您可以在给定函数上编写单元测试。
# 该示例是打印输入的信息。
@script()
def echo(message: str):
    print(message)

# 构建workflow,Workflow是 Argo 中的主要资源,
# 也是 Hera 的关键类,负责保存模板、设置入口点和运行模板。
with Workflow(
    generate_name="dag-diamond-",
    entrypoint="diamond",
) as w:
    with DAG(name="diamond"):
        A = echo(name="A", arguments={"message": "A"}) # 构建template
        B = echo(name="B", arguments={"message": "B"})
        C = echo(name="C", arguments={"message": "C"})
        D = echo(name="D", arguments={"message": "D"})
        A >> [B, C] >> D # 构建依赖关系,B、C任务依赖A,D依赖B和C
# 创建workflow
w.create()


YAML:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dag-diamond-
spec:
  entrypoint: diamond
  templates:
  - container:
      command:
      - echo
      - '{{inputs.parameters.message}}'
      image: alpine:3.7
    inputs:
      parameters:
      - name: message
    name: echo
  - dag:
      tasks:
      - arguments:
          parameters:
          - name: message
            value: A
        name: A
        template: echo
      - arguments:
          parameters:
          - name: message
            value: B
        depends: A
        name: B
        template: echo
      - arguments:
          parameters:
          - name: message
            value: C
        depends: A
        name: C
        template: echo
      - arguments:
          parameters:
          - name: message
            value: D
        depends: B && C
        name: D
        template: echo
    name: diamond
c) 总结


语言:Airflow 仅支持 Python 语言编写工作流,由于其开源第一天就支持 Python,其 Python 语言支持更为成熟。Argo Workflows 既支持 YAML 编排,也能够支持 Python 语言构建工作流,其 Python 语言的支持在逐步完善,并且有生产大规模使用案例。


多用户场景:Airflow 的多用户场景支持不够完善,通常使用在数据科学家场景。Argo Workflows 得益于 YAML 和 Python 双语支持,和 Kubernetes 的 RBAC、命名空间等隔离能力,在多用户支持场景有天然优势。能够让不同的团队(数据科学家、MLOps/Devops 工程师),不同的用户共享集群,并有精细化的权限控制和隔离能力。


3、大数据和 AI


在离线任务编排领域,大数据 AI 生态是必须要考虑的问题。Argo Workflows 和 Airflow 在大数据和 AI 都有非常好的生态:


Airflow 有丰富的 Operator 能够让用户在工作流中编排便捷的结合大数据/Al 的框架来处理任务,包括 SparkKubernetesOperator、HiveOperator、SQLExecuteQueryOperator、PrestoToMySqlOperator,以及和云厂商结合的 EmrServerlessSparkStartJobRunOperator、GenieOperator 等。


Argo Workflows 有 Resource Template 可以和大数据和 AI 场景下各种框架兼容,以及多种的 Plugin 扩展,包括 Spark、Volcano、Pytorch、Ray Plugin 等,可以在 YAML 或者 Python Code 中嵌入大数据和 AI 任务。


a) Spark with Airflow


其中 spark-k8s.yam 里面内容是 SparkApplications:


from datetime import timedelta
import os
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable

default_args = {
    'owner': 'Howdy',
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'spark_on_k8s_airflow',
    start_date=days_ago(1),
    catchup=False,
    schedule_interval=timedelta(days=1),
    template_searchpath='/opt/airflow/dags/repo/dags/spark8s/'
)

spark_k8s_task = SparkKubernetesOperator(
    task_id='n-spark-on-k8s-airflow',
    trigger_rule="all_success",
    depends_on_past=False,
    retries=0,
    application_file='spark-k8s.yaml',
    namespace="spark-apps",
    kubernetes_conn_id="kubernetes_default",
    do_xcom_push=True,
    dag=dag
)

spark_k8s_task


本例为通过 Airflow 提交一个 Spark Pi 的 Job。


b) Spark with Argo Workflows


apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: spark-pi-demo-
spec:
  entrypoint: spark-pi
  templates:
    - name: spark-pi
      plugin:
        spark:
          # SparkApplication definition (Spark Operator must be installed in advance)
          apiVersion: "sparkoperator.k8s.io/v1beta2"
          kind: SparkApplication
          metadata:
            name: spark-pi-demo
            namespace: argo
          spec:
            type: Scala
            mode: cluster
            image: "gcr.io/spark-operator/spark:v3.3.1"
            mainClass: org.apache.spark.examples.SparkPi
            mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.3.1.jar"
            sparkVersion: "3.3.1"
            restartPolicy:
              type: Never
            driver:
              cores: 1
              memory: "2g"
              serviceAccount: spark-sa
              labels:
                version: 3.3.1
            executor:
              cores: 2
              instances: 2
              memory: "4g"
              labels:
                version: 3.3.1
            arguments:
              - "1000" 


Argo Workflows 可以通过 ResourceTemplate 或者 Plugin 的方式创建提交 Spark Job。本例为通过 Argo Workflows YAML 编排一个 Spark Pi 的 Job。


c) Pytorch with Airflow


import datetime
import pendulum

from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from airflow.models.dag import DAG
from airflow.decorators import task


DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
from torchx.schedulers.ids import make_unique

@task(task_id=f'hello_torchx')
def run_torchx(message):
    """This is a function that will run within the DAG execution"""
    from torchx.runner import get_runner
    with get_runner() as runner:
        # Run the utils.sh component on the local_cwd scheduler.
        app_id = runner.run_component(
            "utils.sh",
            ["echo", message],
            scheduler="local_cwd",
        )

        # Wait for the the job to complete
        status = runner.wait(app_id, wait_interval=1)

        # Raise_for_status will raise an exception if the job didn't succeed
        status.raise_for_status()

        # Finally we can print all of the log lines from the TorchX job so it
        # will show up in the workflow logs.
        for line in runner.log_lines(app_id, "sh", k=0):
            print(line, end="")

with DAG(
    dag_id=make_unique('example_python_operator'),
    schedule_interval=None,
    start_date=DATA_INTERVAL_START,
    catchup=False,
    tags=['example'],
) as dag:
    run_job = run_torchx("Hello, TorchX!")

dagrun = dag.create_dagrun(
    state=DagRunState.RUNNING,
    execution_date=DATA_INTERVAL_START,
    data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
    start_date=DATA_INTERVAL_END,
    run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id="hello_torchx")
ti.task = dag.get_task(task_id="hello_torchx")
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS      


在 Airflow 中使用 TorchX,可以进行管道编排并在远程 GPU 集群上运行 PyTorch 应用程序(即分布式训练)。


d) Pytorch with Argo Workflow


apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: pytorch-deepspeed-demo-
spec:
  entrypoint: main
  arguments:
    parameters:
    - name: checkpoint-dir
      value: /root/deepspeed_data
    - name: instance-spec
      value: ecs.gn7i-c8g1.2xlarge
    - name: image
      value: mirrors-ssl.aliyuncs.com/kubeflow/pytorch-deepspeed-demo:latest

  templates:
  - name: main
    steps:
    - - name: run-pytorch-job
        template: pytorch-job-template
        arguments:
          parameters:
          - name: checkpoint-dir
            value: "{{workflow.parameters.checkpoint-dir}}"
    - - name: check-pytorch-job-status
        template: check-status
        arguments:
          parameters:
          - name: job-name
            value: "{{steps.run-pytorch-job.outputs.parameters.job-name}}"

  - name: pytorch-job-template
    inputs:
      parameters:
      - name: checkpoint-dir
    resource:
      action: create
      successCondition: status.replicaStatuses.Worker.succeeded = 1
      failureCondition: status.replicaStatuses.Worker.failed > 0
      manifest: |
        apiVersion: "kubeflow.org/v1"
        kind: PyTorchJob
        metadata:
          generateName: pytorch-deepspeed-demo-
        spec:
          pytorchReplicaSpecs:
            Master:
              replicas: 1
              restartPolicy: OnFailure
              template:
                metadata:
                  name: deepspeed-master
                spec:
                  containers:
                    - name: pytorch
                      image: "{{workflow.parameters.image}}"
                      command:
                        - torchrun
                        - /train_bert_ds.py
                        - --checkpoint_dir
                        - {{inputs.parameters.checkpoint-dir}}
                      resources:
                        limits:
                          nvidia.com/gpu: 1
            Worker:
              replicas: 1
              restartPolicy: OnFailure
              template:
                metadata:
                  name: deepspeed-worker
                spec:
                  containers:
                    - name: pytorch
                      image: "{{workflow.parameters.image}}"
                      command:
                        - torchrun
                        - /train_bert_ds.py
                        - --checkpoint_dir
                        - {{inputs.parameters.checkpoint-dir}}
                      resources:
                        limits:
                          nvidia.com/gpu: 1
    outputs:
      parameters:
      - name: job-name
        valueFrom:
          jsonPath: '{.metadata.name}'

  - name: check-status
    inputs:
      parameters:
      - name: job-name
    script:
      image: mirrors-ssl.aliyuncs.com/bitnami/kubectl:latest
      command: [bash]
      source: |
        #!/bin/bash
        set -ex

        JOB_NAME="{{inputs.parameters.job-name}}"

        STATUS=$(kubectl get pytorchjob $JOB_NAME -o jsonpath='{.status.conditions[?(@.type=="Succeeded")].status}')

        if [ "$STATUS" = "True" ]; then
          echo "PyTorch Job succeeded."
          exit 0
        else
          echo "PyTorch Job failed or is still running."
          exit 1
        fi


本例为通过 Argo Workflows 的 Resouece Template 编排一个 Pytorch 分布式训练任务并检查其运行状态的示例。


e) 总结


大数据:Airflow 在大数据场景下有众多的 Operator,生态非常丰富,并且擅长和云厂商的服务进行结合。Argo Workflow 擅长在 Kubernetes 生态下和 Kubeflow/SparkOperator 结合进行任务编排。


机器学习 Pipeline: Argo Workflows 可以通过 Plugin、ResourceTemplate 等方式编排 Pytorch、Volcano 等 Job 类型。Airflow 更擅长直接编写 Python 类型的 AI 作业任务。


两者在大数据 AI 场景中均具备独特优势,实际选型需结合技术栈与业务需求。


技术选型建议


总的来讲,Airflow 和 Argo Workflows 在都在批量数据处理、机器学习 Pipeline、自动化等场景有非常好的能力与应用,对于企业的决策者来说,选择哪种调度引擎需要根据团队的技术栈、服务的场景、使用的用户多少来决定。 尤其的,在以下三种场景,建议选择 Argo Workflows:


场景

选型建议

优势

容器化、Kubernetes 环境

Argo Workflows

Cloud Native,良好的扩展性、

可观测性、并发性。

多用户场景(数据处理+ MLOps+Devops)

Argo Workflows

YAML/Python 多语言支持,

RBAC、Namespace 级别权限控制

大规模高性能计算、

并行计算

Argo Workflows

事件驱动,可并行运行数千工作流,

数万子任务 Pod,调度性能高。


Airflow vs Argo Workflow 核心概念对照


Airflow 和 Argo Workflows 都是通用的任务编排引擎,其核心能力基本一致,有时候当选择了其中一个框架后,随着业务增长或者系统架构升级,比如资源调度从 Hadoop Yarn 升级到 Kubernetes 生态、或者需要更高性能的大规模调度,这时候原来的调度任务引擎不能满足要求,就需要从 Airflow 迁移到 Argo Workflows。下表即是一个 Airflow DAG 定义和 Argo Workflow DAG 核心概念的对比,可以帮助理清楚这两种任务编排引擎的任务定义关系,便于快速迁移。


核心特性

Airflow

Argo Workflows

Bash命令

BashOperator

Command:[sh, -c]

Python命令

PythonOperator

Command:[python]

任务定义

Task

Template

依赖关系

DAG

DAG

调度策略

Schedule Time

CronWorkflows

任务参数

Params

Inputs Parameters

任务变量

Variable

ENV

任务中间结果传递

Xcom

Artifacts

事件感知

Sensors

Eventbinding

回填

Airflow Backfill

Argo Backfill

扩展生态

SparkOperator

SparkPlugin


Serverless Argo Workflows


为了进一步降低用户在大规模任务编排场景下运维成本、增强调度效率,阿里云容器服务推出了 Serverless 的离线工作流平台:


采用全托管 Kubernetes 控制面(复用阿里云容器服务 ACK Pro【1】 控制面)与 Argo Workflows 控制面,并针对单体大工作流、大规模子任务 Pod 并行调度运行做了针对性的性能优化,同时使用阿里云容器服务 ACS【2】 提供的 Serverless Pod 运行工作流。通过控制面性能调优和 ACS 的 Serverless 极致弹性算力,工作流规模对比开源自建提升 10 倍,单体大工作流可以支持数万子任务 Pod,集群整体可以支持数千工作流和数万子任务的并行运行。



目前,阿里云 Serverless Argo Workflows 集群已经在自动驾驶仿真计算、科学计算、金融量化、机器学习、数据处理、持续集成领域广泛应用。如果您需要使用 Argo Workflows 调度大规模工作流,欢迎加入钉钉群号一同交流:35688562


参考


Argo Workflows:

https://github.com/argoproj/argo-workflows


Airflow:

https://github.com/apache/airflow


Argo Workflows Docs:

https://argo-workflows.readthedocs.io/en/latest/


Airflow Docs:

https://airflow.apache.org/docs/apache-airflow/stable/index.html


Airflow Operators:

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html


Argo Workflow Plugins:

https://argo-workflows.readthedocs.io/en/latest/plugin-directory/


Pytorch with Airflow:https://pytorch.org/torchx/main/pipelines/airflow.html


Apache Spark Operators:

https://airflow.apache.org/docs/apache-airflow-providers-apache-spark/stable/operators.html


Apache Airflow vs Argo Workflow in Autonomous driving simulation:

https://towardsdev.com/apache-airflow-vs-argo-workflow-feat-dag-kubernetes-4944a2d7097d


Argo Python SDK Hera:

https://hera-workflows.readthedocs.io/en/latest/


Argo Workflows Volcano Plugin:

https://github.com/iflytek/argo-volcano-executor-plugin


Argo Workflows Pytorch Plugin:

https://github.com/shuangkun/argo-workflows-pytorch-plugin


Argo Workflows Ray Plugin:

https://github.com/argoproj-labs/argo-workflows-ray-plugin


阿里云 Serverless Argo Workflows:https://www.alibabacloud.com/help/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/overview-12


创建工作流集群:

https://www.alibabacloud.com/help/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/create-a-workflow-cluster


【1】阿里云容器服务ACK Pro

https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/product-overview/what-is-ack


【2】阿里云容器服务ACS

https://help.aliyun.com/zh/cs/product-overview/product-introduction

相关文章
|
5月前
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
292 7
|
5月前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
178 7
|
7月前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
7月前
|
存储 运维 负载均衡
构建高可用性GraphRAG系统:分布式部署与容错机制
【10月更文挑战第28天】作为一名数据科学家和系统架构师,我在构建和维护大规模分布式系统方面有着丰富的经验。最近,我负责了一个基于GraphRAG(Graph Retrieval-Augmented Generation)模型的项目,该模型用于构建一个高可用性的问答系统。在这个过程中,我深刻体会到分布式部署和容错机制的重要性。本文将详细介绍如何在生产环境中构建一个高可用性的GraphRAG系统,包括分布式部署方案、负载均衡、故障检测与恢复机制等方面的内容。
394 4
构建高可用性GraphRAG系统:分布式部署与容错机制
|
6月前
|
机器学习/深度学习 存储 运维
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
325 4
|
7月前
|
机器学习/深度学习 人工智能 分布式计算
【AI系统】分布式通信与 NVLink
进入大模型时代后,AI的核心转向大模型发展,训练这类模型需克服大量GPU资源及长时间的需求。面对单个GPU内存限制,跨多个GPU的分布式训练成为必要,这涉及到分布式通信和NVLink技术的应用。分布式通信允许多个节点协作完成任务,而NVLink则是一种高速、低延迟的通信技术,用于连接GPU或GPU与其它设备,以实现高性能计算。随着大模型的参数、数据规模扩大及算力需求增长,分布式并行策略,如数据并行和模型并行,变得至关重要。这些策略通过将模型或数据分割在多个GPU上处理,提高了训练效率。此外,NVLink和NVSwitch技术的持续演进,为GPU间的高效通信提供了更强的支持,推动了大模型训练的快
182 0
|
8月前
|
消息中间件 中间件 数据库
NServiceBus:打造企业级服务总线的利器——深度解析这一面向消息中间件如何革新分布式应用开发与提升系统可靠性
【10月更文挑战第9天】NServiceBus 是一个面向消息的中间件,专为构建分布式应用程序设计,特别适用于企业级服务总线(ESB)。它通过消息队列实现服务间的解耦,提高系统的可扩展性和容错性。在 .NET 生态中,NServiceBus 提供了强大的功能,支持多种传输方式如 RabbitMQ 和 Azure Service Bus。通过异步消息传递模式,各组件可以独立运作,即使某部分出现故障也不会影响整体系统。 示例代码展示了如何使用 NServiceBus 发送和接收消息,简化了系统的设计和维护。
142 3
|
8月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现
消息队列系统中的确认机制在分布式系统中如何实现
|
8月前
|
消息中间件 存储 监控
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
105 4
|
8月前
|
存储 开发框架 .NET
C#语言如何搭建分布式文件存储系统
C#语言如何搭建分布式文件存储系统
163 2