【阅读原文】戳:YAML焦虑再见:PythonSDK助力大规模Argo Workflows构建
Argo Workflows是一个开源的工作流管理系统,专为Kubernetes设计,旨在帮助用户创建和运行复杂的工作流程。它允许用户定义一系列的任务,这些任务可以按照特定的顺序执行,也可以设置任务间的依赖关系,从而实现自动化的工作流程编排。
使用Argo Workflows的场景非常广泛,包括定时任务、机器学习、仿真计算、科学计算、ETL数据处理、模型训练、CI/CD等。
Argo Workflows默认使用YAML格式进行编排,对于初次接触或者不熟悉YAML格式及Argo Workflows的人来说,使用YAML来编排复杂的工作流可能会显得有些挑战性。YAML虽然简洁且易于阅读,但是编写大型或复杂的工作流配置时,确实可能因为其严格的缩进规则和较为繁琐的结构而显得有些棘手。
Hera是一个用于构建和提交Argo工作流程的Python SDK框架,其主要目标是简化工作流程的构建和提交,尤其是对于数据科学家,使用Python能更好的兼容平时的使用习惯,克服YAML的阻碍。使用Hera PythonSDK具有以下优势:
1) 简洁性:编写代码简短易懂,大大提高编写效率。
2) 支持复杂工作流:在编写复杂工作流时,如果用YAML进行编辑的话,容易出现语法问题。
3) Python生态集成:每个Function就是一个Template,非常容易和Python生态的框架进行集成。
4) 可测试性:能够直接使用Python测试框架来提升代码质量。
ACK One Serverless Argo工作流集群托管了Argo Workflow,本文将介绍使用如何使用Hera和ACK One Serveless Argo集群进行交互,其架构如下所示:
1、开通Argo工作流集群并获取访问认证Token
参考:
1)创建工作流集群:
2)开通Argo Server:
3)开通Argo Server公网访问(专线用户可选):
4) 创建并获取集群Token:
kubectl create token default -n default
2、开启Hera PythonSDK之旅
1) 安装Hera:
安装Hera非常简便,只需一条命令:
pip install hera-workflows
2) 编写并提交Workflows
在Argo Workflows中,DAG(有向无环图)是一种常用的方式来定义复杂的任务依赖关系,其中"Diamond"结构是指一个常见的工作流模式,其中两个或多个任务并行执行后,它们的结果汇聚到一个共同的后续任务。这种结构在需要合并不同数据流或处理结果的场景中非常有用。
下面是一个具体的示例,展示如何使用Hera定义一个具有"Diamond"结构的工作流,即两个任务taskA和taskB并行运行,它们的输出都作为输入给到taskC:
a. Simple DAG diamond
# 导入相关包 from hera.workflows import DAG, Workflow, script from hera.shared import global_config import urllib3 urllib3.disable_warnings() # 配置访问地址和token global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746" global_config.token = "abcdefgxxxxxx" # 填入之前获取的token global_config.verify_ssl = "" # 装饰器函数script是 Hera 实现近乎原生的 Python 函数编排的关键功能。 # 它允许您在 Hera 上下文管理器(例如Workflow或Steps上下文)下调用该函数, # 该函数在任何 Hera 上下文之外仍将正常运行,这意味着您可以在给定函数上编写单元测试。 # 该示例是打印输入的信息。 @script() def echo(message: str): print(message) # 构建workflow,Workflow是 Argo 中的主要资源, # 也是 Hera 的关键类,负责保存模板、设置入口点和运行模板。 with Workflow( generate_name="dag-diamond-", entrypoint="diamond", ) as w: with DAG(name="diamond"): A = echo(name="A", arguments={"message": "A"}) # 构建template B = echo(name="B", arguments={"message": "B"}) C = echo(name="C", arguments={"message": "C"}) D = echo(name="D", arguments={"message": "D"}) A >> [B, C] >> D # 构建依赖关系,B、C任务依赖A,D依赖B和C # 创建workflow w.create()
提交工作流:
python simpleDAG.py
在控制台查看工作流运行状态,可以看到任务运行成功:
b. Map-Reduce
在Argo Workflows中实现MapReduce风格的数据处理,关键在于如何有效利用其DAG(有向无环图)模板来组织和协调多个任务,从而模拟Map和Reduce阶段。
以下是一个更加详细的示例,展示了如何使用Hera构建一个简单的MapReduce工作流,用于处理文本文件的单词计数任务,其中每一步都是一个Python函数,可以非常容易和Python生态进行集成。
from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script from hera.shared import global_config import urllib3 urllib3.disable_warnings() # 设置访问地址 global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746" global_config.token = "abcdefgxxxxxx" # 填入之前获取的token global_config.verify_ssl = "" # 使用script装饰函数时,将script参数传递给script装饰器。这包括image、inputs、outputs、resources等。 @script( image="python:alpine3.6", inputs=Parameter(name="num_parts"), outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"), ) def split(num_parts: int) -> None: # 根据输入参数num_parts创建多个文件,文件中写入foo字符和parts编号 import json import os import sys os.mkdir("/mnt/out") part_ids = list(map(lambda x: str(x), range(num_parts))) for i, part_id in enumerate(part_ids, start=1): with open("/mnt/out/" + part_id + ".json", "w") as f: json.dump({"foo": i}, f) json.dump(part_ids, sys.stdout) # script中定义image、inputs、outputs @script( image="python:alpine3.6", inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),], outputs=OSSArtifact( name="part", path="/mnt/out/part.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json", ), ) def map_() -> None: # 根据文件中foo字符的个数,生成新文件,将foo内容parts编号乘以2,写入bar内容 import json import os os.mkdir("/mnt/out") with open("/mnt/in/part.json") as f: part = json.load(f) with open("/mnt/out/part.json", "w") as f: json.dump({"bar": part["foo"] * 2}, f) # script中定义image、inputs、outputs、resources @script( image="python:alpine3.6", inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"), outputs=OSSArtifact( name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json" ), ) def reduce() -> None: # 计算每个parts的bar的值的总和。 import json import os os.mkdir("/mnt/out") total = 0 for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))): result = json.load(f) total = total + result["bar"] with open("/mnt/out/total.json", "w") as f: json.dump({"total": total}, f) # 构建workflow,输入name、设置入口点、namespace、全局参数等。 with Workflow(generate_name="map-reduce-", entrypoint="main", namespace="default", arguments=Parameter(name="num_parts", value="4")) as w: with DAG(name="main"): s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # 构建templetes m = map_( with_param=s.result, arguments=[Parameter(name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json"),], ) # 输入参数并构建templetes, s >> m >> reduce() # 构建任务依赖关系 # 创建工作流 w.create()
提交工作流:
python map-reduce.py
控制台查看工作流状态,可以看到任务运行成功:
3、总结
Argo Workflow编辑方式 | YAML | Hera Framework |
简洁性 | 较高 | 高,代码量少 |
复杂工作流编写难易程度 | 难 | 易 |
Python生态集成难易程度 | 难 | 易,丰富的Python Lib |
可测试性 | 难,容易出现语法错误 | 易,可使用测试框架 |
Hera优雅的对接Python生态体系与Argo Workflows框架,将繁琐复杂的工作流设计转化为直观简明的创作体验。它不仅为大规模任务编排开创了一条免受YAML复杂性困扰的通途,还为数据工程师铺设了平滑的桥梁,让他们能够借助熟悉的Python语言,无缝构造和优化机器学习工作流,加速实现从创意到部署的高效迭代循环,推动智能应用的迅速落地与持续演进。ACKOne Serveles Argo团队是国内最早使用和维护Argo Workflows的团队之一,在Argo Workflow使用方面积累众多的最佳实践,欢迎加入钉钉群号一同交流:35688562。
参考:
分布式工作流:
Argo Workflows:
https://github.com/argoproj/argo-workflows
Hera:
https://hera.readthedocs.io/en/stable/
Train LLM with Hera:
https://www.youtube.com/watch?v=nRYf3GkKpss
simple-diamond Yaml:
https://github.com/argoproj/argo-workflows/blob/main/examples/dag-diamond.yaml
map-reduce Yaml:
https://github.com/argoproj/argo-workflows/blob/main/examples/map-reduce.yaml
我们是阿里巴巴云计算和大数据技术幕后的核心技术输出者。
获取关于我们的更多信息~