解锁云原生 AI 技能 - 开发你的机器学习工作流

本文涉及的产品
函数计算FC,每月15万CU 3个月
应用实时监控服务-应用监控,每月50GB免费额度
云原生网关 MSE Higress,422元/月
简介: 按照上篇文章《解锁云原生 AI 技能 | 在 Kubernetes 上构建机器学习系统》搭建了一套 Kubeflow Pipelines 之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于 Kubeflow Pipelines 的机器学习工作流。

按照上篇文章《解锁云原生 AI 技能 | 在 Kubernetes 上构建机器学习系统》搭建了一套 Kubeflow Pipelines 之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于 Kubeflow Pipelines 的机器学习工作流。

准备工作

机器学习工作流是一个任务驱动的流程,同时也是数据驱动的流程,这里涉及到数据的导入和准备、模型训练 Checkpoint 的导出评估、到最终模型的导出。这就需要分布式存储作为传输的媒介,此处使用 NAS 作为分布式存储。

  • 创建分布式存储,这里以 NAS 为例。此处 NFS_SERVER_IP 需要替换成真实 NAS 服务器地址
  1. 创建阿里云 NAS 服务,可以参考文档
  2. 需要在 NFS Server 中创建 /data
# mkdir -p /nfs
# mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs
# mkdir -p /data
# cd /
# umount /nfs
  1. 创建对应的 Persistent Volume
# cat nfs-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: user-susan
  labels:
    user-susan: pipelines
spec:
  persistentVolumeReclaimPolicy: Retain
  capacity:
    storage: 10Gi
  accessModes:
  - ReadWriteMany
  nfs:
    server: NFS_SERVER_IP
    path: "/data"
    
# kubectl create -f nfs-pv.yaml
  1. 创建 Persistent Volume Claim
# cat nfs-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: user-susan
  annotations:
    description: "this is the mnist demo"
    owner: Tom
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
       storage: 5Gi
  selector:
    matchLabels:
      user-susan: pipelines
# kubectl create -f nfs-pvc.yaml

开发 Pipeline

由于 Kubeflow Pipelines 提供的例子都是依赖于 Google 的存储服务,这导致国内的用户无法真正体验 Pipelines 的能力。为此,阿里云容器服务团队提供了基于 NAS 存储训练 MNIST 模型的例子,方便您在阿里云上使用和学习 Kubeflow Pipelines。具体步骤分 3 步: 

  • (1) 下载数据 
  • (2) 利用 TensorFlow 进行模型训练 
  • (3) 模型导出

在这 3 个步骤中,后一个步骤都依赖于前一个步骤而完成。
Kubeflow Pipelines 中可以用 Python 代码描述这样一个流程, 完整代码可以查看 standalone_pipeline.py
我们在例子中使用了基于开源项目 Arena 的 arena_op ,这是对于 Kubeflow 默认的 container_op 封装,它能够实现对于分布式训练 MPI 和 PS 模式的无缝衔接,另外也支持使用 GPU 和 RDMA 等异构设备和分布式存储的简单接入,同时方便从 git 源同步代码,是一个比较实用的工具 API。 

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2'):
  """A pipeline for end to end machine learning workflow."""
  data=["user-susan:/training"]
  gpus=1
# 1. prepare data
  prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \
  cd /training/dataset/mnist && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")
  # 2. downalod source code and train the models
  train = arena.standalone_job_op(
    name="train",
    image="tensorflow/tensorflow:1.11.0-gpu-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command='''
    echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
    --max_steps 500 --data_dir /training/dataset/mnist \
    --log_dir /training/output/mnist  --learning_rate %s \
    --dropout %s''' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])
  # 3. export the model
  export_model = arena.standalone_job_op(
    name="export-model",
    image="tensorflow/tensorflow:1.11.0-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

Kubeflow Pipelines 会将上面的代码转化成一个有向无环图 (DAG), 其中的每一个节点就是 Component (组件),而 Component (组件)之间的连线代表它们之间的依赖关系。从 Pipelines UI 可以看到 DAG 图:


首先具体理解一下数据准备的部分,这里我们提供了 arena.standalone_job_op 的 Python API,  需要指定该步骤的名称: name; 需要使用的容器镜像: image; 要使用的数据以及其对应到容器内部的挂载目录: data。
这里的 data 是一个数组格式, 如 data=["user-susan:/training"],表示可以挂载到多个数据。 其中 user-susan 是之前创建的 Persistent Volume Claim, 而 /training 为容器内部的挂载目录。

prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \
  cd /training/dataset/mnist && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")

而上述步骤实际上是从指定地址利用 curl 下载数据到分布式存储对应的目录 /training/dataset/mnist,请注意这里的 /training 为分布式存储的根目录,类似大家熟悉的根 mount 点;而 /training/dataset/mnist 是子目录。其实后面的步骤可以通过使用同样的根 mount 点,读到数据,进行运算。
第二步是利用下载到分布式存储的数据,并通过 git 指定固定 commit id 下载代码,并进行模型训练。

train = arena.standalone_job_op(
    name="train",
    image="tensorflow/tensorflow:1.11.0-gpu-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command='''
    echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
    --max_steps 500 --data_dir /training/dataset/mnist \
    --log_dir /training/output/mnist  --learning_rate %s \
    --dropout %s''' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])

可以看到这个步骤比数据准备要相对复杂一点,除了和第一步骤中的 name, image,  data 和 command 一样需要指定之外,在模型训练步骤中,还需要指定:

  • 获取代码的方式: 从可重现实验的角度来看,对于运行试验代码的追本溯源,是非常重要的一环。可以在 API 调用时指定 sync_source 的 git 代码源,同时通过设定 env 中 GIT_SYNC_REV 指定训练代码的 commit id;
  • gpu:  默认为 0,就是不使用 GPU;如果为大于 0 的整数值,就代表该步骤需要这个数量的 GPU 数;
  • metrics:  同样是从可重现和可比较的实验目的出发,用户可以将需要的一系列指标导出,并且通过 Pipelines UI 进行直观的显示和比较。具体使用方法分为两步:1. 在调用 API 时以数组的形式指定要收集指标的 metrics name 和指标的展示格式 PERCENTAGE 或者是 RAW,比如 metrics=["Train-accuracy:PERCENTAGE"]。 2. 由于 Pipelines 默认会从 stdout 日志中收集指标,你需要在真正运行的模型代码中输出 {metrics name}={value} 或者 {metrics name}:{value}, 可以参考具体样例代码



值得注意的是:

在本步骤中指定了和 prepare_data 相同的 data 参数 ["user-susan:/training"],就可以在训练代码中读到对应的数据,比如 --data_dir /training/dataset/mnist
另外由于该步骤依赖于 prepare_data,可以在方法中通过指定 prepare_data.output 表示两个步骤的依赖关系。

最后 export_model 是基于 train 训练产生的 checkpoint,生成训练模型:

export_model = arena.standalone_job_op(
    name="export-model",
    image="tensorflow/tensorflow:1.11.0-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

export_model 和第二步 train 类似,甚至要更为简单,它只是从 git 同步模型导出代码并且利用共享目录 /training/output/mnist 中的 checkpoint 执行模型导出。
整个工作流程看起来还是很直观的, 下面就可以定义一个 Python 方法将整个流程贯穿在一起:

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2'):

@dsl.pipeline 是表示工作流的装饰器,这个装饰器中需要定义两个属性,分别是 name 和  description
入口方法 sample_pipeline 中定义了 4 个参数: learning_ratedropoutmodel_version 和 commit, 分别可以在上面的 train 和 export_model 阶段使用。这里的参数的值实际上是  dsl.PipelineParam 类型,定义成 dsl.PipelineParam 的目的在于可以通过 Kubeflow Pipelines 的原生 UI 将其转换成输入表单,表单的关键字是参数名称,而默认值为参数的值。值得注意的是,这里的 dsl.PipelineParam 对应值实际上只能是字符串和数字型;而数组和 map,以及自定义类型都是无法通过转型进行变换的。

实际上,这些参数都可以在用户提交工作流时进行覆盖,以下就是提交工作流对应的 UI:

提交 Pipeline

您可以在自己的 Kubernetes 内将前面开发工作流的 Python DSL 提交到 Kubeflow Pipelines 服务中, 实际提交代码很简单:

KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
  import kfp.compiler as compiler
  compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')
  client = kfp.Client(host=KFP_SERVICE)
  try:
    experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id
  except:
    experiment_id = client.create_experiment(EXPERIMENT_NAME).id
  run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',
                            params={'learning_rate':learning_rate,
                                     'dropout':dropout,
                                    'model_version':model_version,
                                    'commit':commit})

利用 compiler.compile 将 Python 代码编译成执行引擎 (Argo) 识别的 DAG 配置文件;
通过 Kubeflow Pipeline 的客户端创建或者找到已有的实验,并且提交之前编译出的 DAG 配置文件。

在集群内准备一个 python3 的环境,并且安装 Kubeflow Pipelines SDK:

# kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity
# kubectl  exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash

登录到 Python3 的环境后,执行如下命令,连续提交两个不同参数的任务:

# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade
# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
# curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py
# python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2
# python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3

查看运行结果

登录到 Kubeflow Pipelines 的 UI: [https://](){pipeline地址}/pipeline/#/experiments, 比如:

https://11.124.285.171/pipeline/#/experiments



点击 Compare runs 按钮,可以比较两个实验的输入、花费的时间和精度等一系列指标。让实验可追溯是让实验可重现的第一步,而利用 Kubeflow Pipelines 本身的实验管理能力则是开启实验可重现的第一步。

总结

实现一个可以运行的 Kubeflow Pipeline 需要的步骤是:

  1. 构建 Pipeline (流水线)中需要的最小执行单元 Component (组件),如果是利用原生定义的 dsl.container_ops, 需要构建两部分代码:
  • 构建运行时代码:通常是为每个步骤构建容器镜像,作为 Pipelines 和真正执行业务逻辑代码之间的适配器。它所做的事情为获取 Pipelines 上下文的输入参数,调用业务逻辑代码,并且将需要传递到下个步骤的输出按照 Pipelines 的规则放到容器内的指定位置,由底层工作流组件负责传递。 这样产生的结果是运行时代码与业务逻辑代码会耦合在一起。可以参考 Kubeflow Pipelines 的例子
  • 构建客户端代码:这个步骤通常是长成下面的样子, 熟悉 Kubernetes 的朋友会发现这个步骤实际上就是在编写 Pod Spec:
container_op = dsl.ContainerOp(
        name=name,
        image='<train-image>',
        arguments=[
            '--input_dir', input_dir,
            '--output_dir', output_dir,
            '--model_name', model_name,
            '--model_version', model_version,
            '--epochs', epochs
        ],
        file_outputs={'output': '/output.txt'}
    )
container_op.add_volume(k8s_client.V1Volume(
            host_path=k8s_client.V1HostPathVolumeSource(
                path=persistent_volume_path),
            name=persistent_volume_name))
container_op.add_volume_mount(k8s_client.V1VolumeMount(
            mount_path=persistent_volume_path,
            name=persistent_volume_name))

利用原生定义的 dsl.container_ops 的好处在于灵活,由于开放了和 Pipelines 的交互接口,用户可以在 container_ops 这个层面做许多事情。但是它的问题在于:

  • 复用度低。每个 Component 都需要构建镜像和开发运行时代码;
  • 复杂度高。使用者需要了解 Kubernetes 的概念,比如 resource limit,  PVC,  node selector 等一系列概念;
  • 支持分布式训练困难。由于 container_op 为单容器操作,如果需要支持分布式训练就需要在 container_ops 中提交和管理类似 TFJob 的任务。这里会带来复杂度和安全性的双重挑战,复杂度比较好理解,安全性是说提交 TFJob 这类任务的权限会需要开放额外的权限给 Pipeline 的开发者。

另一种方式是使用 arena_op 这种可以重用的 Component API,它使用通用运行时代码,可以免去重复构建运行时代码的工作;同时利用通用一套的 arena_op API 简化用户的使用;也支持 Parameter Server 和 MPI 等场景。建议您使用这种方式编译 Pipelines。

  1. 将构建好的 Component (组件)拼接成 Pipeline (流水线);
  2. 将 Pipeline (流水线)编译成 Argo 的执行引擎 (Argo) 识别的 DAG 配置文件, 并提交 DAG 配置文件到 Kubeflow Pipelines,  利用 Kubeflow Pipelines 自身的 UI 查看流程结果。
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
1月前
|
存储 人工智能 大数据
AI开发新范式,PAI模型构建平台升级发布
本次分享由阿里云智能集团产品专家高慧玲主讲,聚焦AI开发新范式及PAI模型构建平台的升级。分享分为四个部分,围绕“人人可用”和“面向生产”两大核心理念展开。通过降低AI工程化门槛、提供一站式全链路服务,PAI平台致力于帮助企业和开发者更高效地实现AI应用。案例展示中,介绍了多模态模型微调在文旅场景的应用,展示了如何快速复现并利用AI解决实际问题。最终目标是让AI技术更普及,赋能各行业,推动社会进步。
|
1天前
|
机器学习/深度学习 人工智能 自然语言处理
Java+机器学习基础:打造AI学习基础
随着人工智能(AI)技术的飞速发展,越来越多的开发者开始探索如何将AI技术应用到实际业务场景中。Java作为一种强大的编程语言,不仅在企业级应用开发中占据重要地位,在AI领域也展现出了巨大的潜力。本文将通过模拟一个AI应用,从背景历史、业务场景、优缺点、底层原理等方面,介绍如何使用Java结合机器学习技术来打造一个AI学习的基础Demo。
41 18
|
1月前
|
人工智能 运维 Cloud Native
云原生 Meetup,AI 应用工程化专场·广州站
欢迎莅临广州市海珠区鼎新路 88 号广州阿里中心,O-N-10-02 春秋书院。报名成功后,您将在活动前一周收到短信通知。
|
1月前
|
SQL 人工智能 关系型数据库
PolarDB-PG AI最佳实践 2 :PolarDB AI X EAS实现自定义库内模型推理最佳实践
PolarDB通过POLAR_AI插件支持使用SQL调用AI/ML模型,无需专业AI知识或额外部署环境。结合阿里云EAS在线模型服务,可轻松部署自定义模型,在SQL中实现如文本翻译等功能。
|
1月前
|
人工智能 安全 大数据
PAI年度发布:GenAI时代AI基础设施的演进
本文介绍了AI平台在大语言模型时代的新能力和发展趋势。面对推理请求异构化、持续训练需求及安全可信挑战,平台推出了一系列优化措施,包括LLM智能路由、多模态内容生成服务、serverless部署模式等,以提高资源利用效率和降低使用门槛。同时,发布了训推一体调度引擎、竞价任务等功能,助力企业更灵活地进行训练与推理任务管理。此外,PAI开发平台提供了丰富的工具链和最佳实践,支持从数据处理到模型部署的全流程开发,确保企业和开发者能高效、安全地构建AI应用,享受AI带来的红利。
|
1月前
|
人工智能 运维 监控
阿里云Milvus产品发布:AI时代云原生专业向量检索引擎
随着大模型和生成式AI的兴起,非结构化数据市场迅速增长,预计2027年占比将达到86.8%。Milvus作为开源向量检索引擎,具备极速检索、云原生弹性及社区支持等优势,成为全球最受欢迎的向量数据库之一。阿里云推出的全托管Milvus产品,优化性能3-10倍,提供企业级功能如Serverless服务、分钟级开通、高可用性和成本降低30%,助力企业在电商、广告推荐、自动驾驶等场景下加速AI应用构建,显著提升业务价值和稳定性。
|
2月前
|
人工智能 安全 算法
PAI负责任的AI解决方案: 安全、可信、隐私增强的企业级AI
在《PAI可信AI解决方案》会议中,分享了安全、可信、隐私增强的企业级AI。会议围绕三方面展开:首先通过三个案例介绍生活和技术层面的挑战;其次阐述构建AI的关键要素;最后介绍阿里云PAI的安全功能及未来展望,确保数据、算法和模型的安全与合规,提供全方位的可信AI解决方案。
|
3月前
|
机器学习/深度学习 数据采集 数据处理
Scikit-learn Pipeline完全指南:高效构建机器学习工作流
Scikit-learn管道是构建高效、鲁棒、可复用的机器学习工作流程的利器。通过掌握管道的使用,我们可以轻松地完成从数据预处理到模型训练、评估和部署的全流程,极大地提高工作效率。
62 2
Scikit-learn Pipeline完全指南:高效构建机器学习工作流
|
1月前
|
人工智能 容灾 Serverless
AI推理新纪元,PAI全球化模型推理服务的创新与实践
本次分享主题为“AI推理新纪元,PAI全球化模型推理服务的创新与实践”,由阿里云高级产品经理李林杨主讲。内容涵盖生成式AI时代推理服务的变化与挑战、play IM核心引擎的优势及ES专属网关的应用。通过LM智能路由、多模态异步生成等技术,PAI平台实现了30%以上的成本降低和显著性能提升,确保全球客户的业务稳定运行并支持异地容灾,目前已覆盖16个地域,拥有10万张显卡的推理集群。
|
1月前
|
人工智能 运维 API
PAI企业级能力升级:应用系统构建、高效资源管理、AI治理
PAI平台针对企业用户在AI应用中的复杂需求,提供了全面的企业级能力。涵盖权限管理、资源分配、任务调度与资产管理等模块,确保高效利用AI资源。通过API和SDK支持定制化开发,满足不同企业的特殊需求。典型案例中,某顶尖高校基于PAI构建了融合AI与HPC的科研计算平台,实现了作业、运营及运维三大中心的高效管理,成功服务于校内外多个场景。