YAML焦虑再见:PythonSDK助力大规模Argo Workflows构建

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: Hera优雅的对接Python生态体系与Argo Workflows框架,将繁琐复杂的工作流设计转化为直观简明的创作体验。它不仅为大规模任务编排开创了一条免受YAML复杂性困扰的通途,还为数据工程师铺设了平滑的桥梁,让他们能够借助熟悉的Python语言,无缝构造和优化机器学习工作流。

【阅读原文】戳:YAML焦虑再见:PythonSDK助力大规模Argo Workflows构建

Argo Workflows是一个开源的工作流管理系统,专为Kubernetes设计,旨在帮助用户创建和运行复杂的工作流程。它允许用户定义一系列的任务,这些任务可以按照特定的顺序执行,也可以设置任务间的依赖关系,从而实现自动化的工作流程编排。

 

使用Argo Workflows的场景非常广泛,包括定时任务、机器学习、仿真计算、科学计算、ETL数据处理、模型训练、CI/CD等。

 

 

Argo Workflows默认使用YAML格式进行编排,对于初次接触或者不熟悉YAML格式及Argo Workflows的人来说,使用YAML来编排复杂的工作流可能会显得有些挑战性。YAML虽然简洁且易于阅读,但是编写大型或复杂的工作流配置时,确实可能因为其严格的缩进规则和较为繁琐的结构而显得有些棘手。

 

Hera是一个用于构建和提交Argo工作流程的Python SDK框架,其主要目标是简化工作流程的构建和提交,尤其是对于数据科学家,使用Python能更好的兼容平时的使用习惯,克服YAML的阻碍。使用Hera PythonSDK具有以下优势:

 

1) 简洁性:编写代码简短易懂,大大提高编写效率。

 

2) 支持复杂工作流:在编写复杂工作流时,如果用YAML进行编辑的话,容易出现语法问题。

 

3) Python生态集成:每个Function就是一个Template,非常容易和Python生态的框架进行集成。

 

4) 可测试性:能够直接使用Python测试框架来提升代码质量。

 

ACK One Serverless Argo工作流集群托管了Argo Workflow,本文将介绍使用如何使用Hera和ACK One Serveless Argo集群进行交互,其架构如下所示:

 

 

 

1、开通Argo工作流集群并获取访问认证Token

 

参考:

 

1)创建工作流集群:

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/create-a-workflow-cluster

 

2)开通Argo Server:

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-argo-server-for-a-workflow-cluster

 

3)开通Argo Server公网访问(专线用户可选):

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-public-access-to-the-argo-server

 

4) 创建并获取集群Token:

 

kubectl create token default -n default

 

 


2、开启Hera PythonSDK之旅

 

1) 安装Hera:

安装Hera非常简便,只需一条命令:

 

pip install hera-workflows

 

2) 编写并提交Workflows

在Argo Workflows中,DAG(有向无环图)是一种常用的方式来定义复杂的任务依赖关系,其中"Diamond"结构是指一个常见的工作流模式,其中两个或多个任务并行执行后,它们的结果汇聚到一个共同的后续任务。这种结构在需要合并不同数据流或处理结果的场景中非常有用。

下面是一个具体的示例,展示如何使用Hera定义一个具有"Diamond"结构的工作流,即两个任务taskA和taskB并行运行,它们的输出都作为输入给到taskC:

 

 

a. Simple DAG diamond

 

# 导入相关包
from hera.workflows import DAG, Workflow, script
from hera.shared import global_config
import urllib3
urllib3.disable_warnings()
# 配置访问地址和token
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx" # 填入之前获取的token
global_config.verify_ssl = ""
# 装饰器函数script是 Hera 实现近乎原生的 Python 函数编排的关键功能。
# 它允许您在 Hera 上下文管理器(例如Workflow或Steps上下文)下调用该函数,
# 该函数在任何 Hera 上下文之外仍将正常运行,这意味着您可以在给定函数上编写单元测试。
# 该示例是打印输入的信息。
@script()
def echo(message: str):
    print(message)
# 构建workflow,Workflow是 Argo 中的主要资源,
# 也是 Hera 的关键类,负责保存模板、设置入口点和运行模板。
with Workflow(
    generate_name="dag-diamond-",
    entrypoint="diamond",
) as w:
    with DAG(name="diamond"):
        A = echo(name="A", arguments={"message": "A"}) # 构建template
        B = echo(name="B", arguments={"message": "B"})
        C = echo(name="C", arguments={"message": "C"})
        D = echo(name="D", arguments={"message": "D"})
        A >> [B, C] >> D # 构建依赖关系,B、C任务依赖A,D依赖B和C
# 创建workflow
w.create()

 

提交工作流:

 

python simpleDAG.py

 

在控制台查看工作流运行状态,可以看到任务运行成功:

 

 

b. Map-Reduce

 

在Argo Workflows中实现MapReduce风格的数据处理,关键在于如何有效利用其DAG(有向无环图)模板来组织和协调多个任务,从而模拟Map和Reduce阶段。

 

以下是一个更加详细的示例,展示了如何使用Hera构建一个简单的MapReduce工作流,用于处理文本文件的单词计数任务,其中每一步都是一个Python函数,可以非常容易和Python生态进行集成。

 

from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
from hera.shared import global_config
import urllib3
urllib3.disable_warnings()
# 设置访问地址
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx" # 填入之前获取的token
global_config.verify_ssl = ""
# 使用script装饰函数时,将script参数传递给script装饰器。这包括image、inputs、outputs、resources等。
@script(
    image="python:alpine3.6",
    inputs=Parameter(name="num_parts"),
    outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
)
def split(num_parts: int) -> None: # 根据输入参数num_parts创建多个文件,文件中写入foo字符和parts编号
    import json
    import os
    import sys
    os.mkdir("/mnt/out")
    part_ids = list(map(lambda x: str(x), range(num_parts)))
    for i, part_id in enumerate(part_ids, start=1):
        with open("/mnt/out/" + part_id + ".json", "w") as f:
            json.dump({"foo": i}, f)
    json.dump(part_ids, sys.stdout)
# script中定义image、inputs、outputs
@script(
    image="python:alpine3.6",
    inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),],
    outputs=OSSArtifact(
        name="part",
        path="/mnt/out/part.json",
        archive=NoneArchiveStrategy(),
        key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json",
    ),
)
def map_() -> None: # 根据文件中foo字符的个数,生成新文件,将foo内容parts编号乘以2,写入bar内容
    import json
    import os
    os.mkdir("/mnt/out")
    with open("/mnt/in/part.json") as f:
        part = json.load(f)
    with open("/mnt/out/part.json", "w") as f:
        json.dump({"bar": part["foo"] * 2}, f)
# script中定义image、inputs、outputs、resources
@script(
    image="python:alpine3.6",
    inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
    outputs=OSSArtifact(
        name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
    ),
)
def reduce() -> None: # 计算每个parts的bar的值的总和。
    import json
    import os
    os.mkdir("/mnt/out")
    total = 0
    for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
        result = json.load(f)
        total = total + result["bar"]
    with open("/mnt/out/total.json", "w") as f:
        json.dump({"total": total}, f)
# 构建workflow,输入name、设置入口点、namespace、全局参数等。
with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="default", arguments=Parameter(name="num_parts", value="4")) as w:
    with DAG(name="main"):
        s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # 构建templetes
        m = map_(
            with_param=s.result,
            arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),],
        ) # 输入参数并构建templetes,
        s >> m >> reduce() # 构建任务依赖关系
# 创建工作流
w.create()

 

提交工作流:

 

python map-reduce.py

 

控制台查看工作流状态,可以看到任务运行成功:

 

 


 

3、总结


 

Argo Workflow编辑方式 YAML Hera Framework
简洁性 较高 高,代码量少
复杂工作流编写难易程度
Python生态集成难易程度 易,丰富的Python Lib
可测试性 难,容易出现语法错误 易,可使用测试框架


Hera优雅的对接Python生态体系与Argo Workflows框架,将繁琐复杂的工作流设计转化为直观简明的创作体验。它不仅为大规模任务编排开创了一条免受YAML复杂性困扰的通途,还为数据工程师铺设了平滑的桥梁,让他们能够借助熟悉的Python语言,无缝构造和优化机器学习工作流,加速实现从创意到部署的高效迭代循环,推动智能应用的迅速落地与持续演进。ACKOne Serveles Argo团队是国内最早使用和维护Argo Workflows的团队之一,在Argo Workflow使用方面积累众多的最佳实践,欢迎加入钉钉群号一同交流:35688562。

 

参考:

 

分布式工作流:

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/overview-12?spm=a2c4g.11186623.0.0.2f066c0bcQJtWZ

 

Argo Workflows:

https://github.com/argoproj/argo-workflows

 

Hera:

https://hera.readthedocs.io/en/stable/

 

Train LLM with Hera:

https://www.youtube.com/watch?v=nRYf3GkKpss

 

simple-diamond Yaml:

https://github.com/argoproj/argo-workflows/blob/main/examples/dag-diamond.yaml

 

map-reduce Yaml:

https://github.com/argoproj/argo-workflows/blob/main/examples/map-reduce.yaml


我们是阿里巴巴云计算和大数据技术幕后的核心技术输出者。

欢迎关注 “阿里云基础设施”同名微信微博知乎

获取关于我们的更多信息~

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
5月前
|
运维 负载均衡 Serverless
函数计算产品使用问题之yaml如果写多个function,可不可以yaml在构建的时候能构建多个函数
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
域名解析 Cloud Native jenkins
【Drone-初识篇】Drone借助GitLab构建CICD环境、以及编写 .drone.yaml 流水线
【Drone-初识篇】Drone借助GitLab构建CICD环境、以及编写 .drone.yaml 流水线
952 0
|
XML 测试技术 数据格式
|
2月前
|
JSON Kubernetes API
深入理解Kubernetes配置:编写高效的YAML文件
深入理解Kubernetes配置:编写高效的YAML文件
|
5月前
|
存储 运维 Serverless
函数计算产品使用问题之在YAML文件中配置了环境变量,但在PHP代码中无法读取到这些环境变量,是什么原因
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
1月前
|
Kubernetes 应用服务中间件 nginx
k8s学习--YAML资源清单文件托管服务nginx
k8s学习--YAML资源清单文件托管服务nginx
k8s学习--YAML资源清单文件托管服务nginx
|
1月前
|
Kubernetes Docker Perl
k8s常见故障--yaml文件检查没有问题 pod起不来(一直处于创建中)
k8s常见故障--yaml文件检查没有问题 pod起不来(一直处于创建中)
|
1月前
ingress相关yaml文件报错且相关资源一切正常解决方法
ingress相关yaml文件报错且相关资源一切正常解决方法
ingress相关yaml文件报错且相关资源一切正常解决方法
|
4月前
|
运维 Kubernetes Serverless
Serverless 应用引擎使用问题之s.yaml文件中如何使用外部环境变量
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。