像Google一样构建机器学习系统2 - 开发你的机器学习工作流

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 按照上篇文章搭建了一套Kubeflow Pipelines之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于Kubeflow Pipelines的机器学习工作流。 准备工作 机器学习工作流是一个任务驱动的流程,同时也是数据驱动的流程,这里涉及到数据的导入和准备,模型训练Checkpoint的导出评估,到最终模型的导出。

本系列将利用阿里云容器服务,帮助您上手Kubeflow Pipelines.

按照上篇文章搭建了一套Kubeflow Pipelines之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于Kubeflow Pipelines的机器学习工作流。

准备工作

机器学习工作流是一个任务驱动的流程,同时也是数据驱动的流程,这里涉及到数据的导入和准备,模型训练Checkpoint的导出评估,到最终模型的导出。这就需要分布式存储作为传输的媒介,这里使用NAS作为分布式存储。

  • 创建分布式存储,这里以NAS为例。这里NFS_SERVER_IP需要替换成真实NAS服务器地址

1.创建阿里云NAS服务,可以参考文档

2.需要在 NFS Server 中创建 /data

# mkdir -p /nfs
# mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs
# mkdir -p /data
# cd /
# umount /nfs

3.创建对应的Persistent Volume.

# cat nfs-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: user-susan
  labels:
    user-susan: pipelines
spec:
  persistentVolumeReclaimPolicy: Retain
  capacity:
    storage: 10Gi
  accessModes:
  - ReadWriteMany
  nfs:
    server: NFS_SERVER_IP
    path: "/data"
    
# kubectl create -f nfs-pv.yaml

4.创建Persistent Volume Claim

# cat nfs-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: user-susan
  annotations:
    description: "this is the mnist demo"
    owner: Tom
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
       storage: 5Gi
  selector:
    matchLabels:
      user-susan: pipelines
# kubectl create -f nfs-pvc.yaml

开发Pipeline

由于Kubeflow Pipelines提供的例子都是依赖于Google的存储服务,这导致国内的用户无法真正体验Pipelines的能力。阿里云容器服务团队提供了训练MNIST模型的例子,方便您在阿里云上使用和学习Kubeflow Pipelines。具体步骤为3步:
(1)下载数据
(2)利用TensorFlow进行模型训练
(3)模型导出

这3个步骤中后一个步骤都依赖与前一个步骤完成。

在Kubeflow Pipelines中可以用Python代码描述了这样一个流程, 完整代码可以查看standalone_pipeline.py。我们在这个例子中使用了arena_op这是对于Kubeflow默认的container_op封装,能够实现对于分布式训练MPI和PS模式的无缝衔接,另外也支持使用GPU和RDMA等异构设备和分布式存储的简单接入,同时也方便从git源同步代码。是一个比较实用的工具API。而arena_op是基于开源项目Arena

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2'):
  """A pipeline for end to end machine learning workflow."""
  data=["user-susan:/training"]
  gpus=1

# 1. prepare data
  prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \
  cd /training/dataset/mnist && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")

  # 2. downalod source code and train the models
  train = arena.standalone_job_op(
    name="train",
    image="tensorflow/tensorflow:1.11.0-gpu-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command='''
    echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
    --max_steps 500 --data_dir /training/dataset/mnist \
    --log_dir /training/output/mnist  --learning_rate %s \
    --dropout %s''' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])
  # 3. export the model
  export_model = arena.standalone_job_op(
    name="export-model",
    image="tensorflow/tensorflow:1.11.0-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

Kubeflow Pipelines会将上面的代码转化成一个有向无环图(DAG),其中的每一个节点就是Component(组件),而Component(组件)之间的连线代表它们之间的依赖关系。从Pipelines UI可以看到DAG图:

4-pipeline-dag.jpg

首先具体理解一下数据准备的部分,这里我们提供了arena.standalone_job_op的Python API, 需要指定该步骤的名称:name,需要使用的容器镜像:image,要使用的数据以及其对应到容器内部的挂载目录:data,这里的data是一个数组格式, 如data=["user-susan:/training"],表示可以挂载到多个数据。 user-susan是之前创建的Persistent Volume Claim,而/training为容器内部的挂载目录。

prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \
  cd /training/dataset/mnist && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")

而上述步骤实际上是从指定地址利用curl下载数据到分布式存储对应的目录/training/dataset/mnist,请注意这里的/training为分布式存储的根目录,类似大家熟悉的根mount点;而/training/dataset/mnist是子目录。其实后面的步骤可以通过使用同样的根mount点,读到数据,进行运算。

第二步是利用下载到分布式存储的数据,并通过git指定固定commit id下载代码,并进行模型训练

train = arena.standalone_job_op(
    name="train",
    image="tensorflow/tensorflow:1.11.0-gpu-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command='''
    echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
    --max_steps 500 --data_dir /training/dataset/mnist \
    --log_dir /training/output/mnist  --learning_rate %s \
    --dropout %s''' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])

可以看到这个步骤比数据准备要相对复杂一点,除了和第一步骤中的name,image, data和command之外,在模型训练步骤中,还需要指定:

  • 获取代码的方式: 从可重现实验的角度来看,对于运行试验代码的追本溯源,是非常重要的一环。可以在API调用时指定sync_source的git代码源,同时通过设定envGIT_SYNC_REV指定训练代码的commit id
  • gpu: 默认为0,就是不使用GPU;如果为大于0的整数值,就代表该步骤需要这个数量的GPU数。
  • metrics: 同样是从可重现和可比较的实验目的出发,用户可以将需要的一系列指标导出,并且通过Pipelines UI上直观的显示和比较。具体使用方法分为两步,1.在调用API时以数组的形式指定要收集指标的metrics name和指标的展示格式PERCENTAGE或者是RAW,比如metrics=["Train-accuracy:PERCENTAGE"]。2.由于Pipelines默认会从stdout日志中收集指标,你需要在真正运行的模型代码中输出{metrics name}={value}或者{metrics name}:{value}, 可以参考具体样例代码

4-metrics.jpg

值得注意的是:

在本步骤中指定了和prepare_data相同的data参数["user-susan:/training"],就可以在训练代码中读到对应的数据,比如--data_dir /training/dataset/mnist

另外由于该步骤依赖于prepare_data,可以在方法中通过指定prepare_data.output表示两个步骤的依赖关系。

最后export_model是基于train训练产生的checkpoint,生成训练模型:

export_model = arena.standalone_job_op(
    name="export-model",
    image="tensorflow/tensorflow:1.11.0-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

export_model和第二步train类似,甚至要更为简单,它只是从git同步模型导出代码并且利用共享目录/training/output/mnist中的checkpoint执行模型导出。

整个工作流程看起来还是很直观的, 下面就可以定义一个Python方法将整个流程贯穿在一起。

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2'):

@dsl.pipeline是表示工作流的装饰器,这个装饰器中需要定义两个属性,分别是namedescription

入口方法sample_pipeline中定义了4个参数learning_rate,dropout,model_versioncommit,分别可以在上面的trainexport_model阶段使用。这里的参数的值实际上是 dsl.PipelineParam类型,定义成dsl.PipelineParam的目的在于可以通过Kubeflow Pipelines的原生UI可以将其转换成输入表单,表单的关键字是参数名称,而默认值为参数的值. 值得注意的是,这里的dsl.PipelineParam对应值的实际上只能是字符串和数字型;而数组和map,以及自定义类型都是无法通过转型进行变换的。

而实际上,这些参数都可以在用户提交工作流时进行覆盖,以下就是提交工作流对应的UI:

4-input.jpg

提交Pipeline

您可以在自己的Kubernetes内将前面开发工作流的Python DSL提交到Kubeflow Pipelines服务中, 实际提交代码很简单:

  KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
  import kfp.compiler as compiler
  compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')
  client = kfp.Client(host=KFP_SERVICE)
  try:
    experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id
  except:
    experiment_id = client.create_experiment(EXPERIMENT_NAME).id
  run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',
                            params={'learning_rate':learning_rate,
                                     'dropout':dropout,
                                    'model_version':model_version,
                                    'commit':commit})

利用compiler.compile将Python代码编译成执行引擎(Argo)识别的DAG配置文件

通过Kubeflow Pipeline的客户端创建或者找到已有的实验,并且提交之前编译出的DAG配置文件

在集群内准备一个python3的环境,并且安装Kubeflow Pipelines SDK

# kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity
# kubectl  exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash

登录到Python3的环境后,执行如下命令,连续提交两个不同参数的任务

# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade
# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
# curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py
# python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2
# python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3

查看运行结果

登录到Kubeflow Pipelines的UI: https://{pipeline地址}/pipeline/#/experiments, 比如

https://11.124.285.171/pipeline/#/experiments

4-experiment.jpg

点击Compare runs按钮,可以比较两个实验的输入,花费的时间和精度等一系列指标。让实验可追溯是让实验可重现的第一步;而利用Kubeflow Pipelines本身的实验管理能力则是开启实验可重现的第一步。

4-comparsion.jpg

总结

实现一个可以运行的Kubeflow Pipeline需要的步骤是:

1.构建Pipeline(流水线)中需要的最小执行单元Component(组件),如果是利用原生定义的dsl.container_ops,需要构建两部分代码:

  • 构建运行时代码:通常是为每个步骤构建容器镜像,作为Pipelines和真正执行业务逻辑代码之间的适配器。它所做的事情为获取Pipelines上下文的输入参数,调用业务逻辑代码,并且将需要传递到下个步骤的输出按照Pipelines的规则放到容器内的指定位置,由底层工作流组件负责传递。 这样产生的结果是运行时代码与业务逻辑代码会耦合在一起。可以参考Kubeflow Pipelines的例子
  • 构建客户端代码:这个步骤通常是长成下面的样子, 熟悉Kubernetes的朋友会发现这个步骤实际上就是在编写Pod Spec:
container_op = dsl.ContainerOp(
        name=name,
        image='<train-image>',
        arguments=[
            '--input_dir', input_dir,
            '--output_dir', output_dir,
            '--model_name', model_name,
            '--model_version', model_version,
            '--epochs', epochs
        ],
        file_outputs={'output': '/output.txt'}
    )
container_op.add_volume(k8s_client.V1Volume(
            host_path=k8s_client.V1HostPathVolumeSource(
                path=persistent_volume_path),
            name=persistent_volume_name))
container_op.add_volume_mount(k8s_client.V1VolumeMount(
            mount_path=persistent_volume_path,
            name=persistent_volume_name))

利用原生定义的dsl.container_ops的好处在于灵活,由于开放了和Pipelines的交互接口,用户可以在container_ops这个层面做许多事情。但是它的问题在于:

  • 复用度低,每个Component都需要构建镜像和开发运行时代码
  • 复杂度高,使用者需要了解Kubernetes的概念,比如resource limit, PVC, node selector等一系列概念
  • 支持分布式训练困难,由于container_op为单容器操作,如果需要支持分布式训练就需要在container_ops中提交和管理类似TFJob的任务。这里会带来复杂度和安全性的双重挑战,复杂度比较好理解,安全性是说提交TFJob这类任务的权限会需要开放额外的权限给Pipeline的开发者。

另一种方式是使用arena_op这种可以重用的Component API,它使用通用运行时代码,可以免去重复构建运行时代码的工作;同时利用通用一套的arena_op API简化用户的使用;也支持Parameter Server和MPI等场景。建议您使用这种方式编译Pipelines

2.将构建好的Component(组件)拼接成Pipeline(流水线)

3.将Pipeline(流水线)编译后Argo的执行引擎(Argo)识别的DAG配置文件, 并提交的DAG配置文件到Kubeflow Pipelines, 并利用Kubeflow Pipelines自身的UI查看流程结果。

相关实践学习
部署Stable Diffusion玩转AI绘画(GPU云服务器)
本实验通过在ECS上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。
目录
相关文章
|
15天前
|
机器学习/深度学习 算法 算法框架/工具
为什么使用C++进行机器学习开发
C++作为一种高性能语言,在某些性能要求极高或资源受限的场景下也具有非常重要的地位。C++的高效性和对底层硬件的控制能力,使其在大规模机器学习系统中发挥重要作用,尤其是当需要处理大数据或实时响应的系统时。
30 3
|
15天前
|
机器学习/深度学习 存储 人工智能
文本情感识别分析系统Python+SVM分类算法+机器学习人工智能+计算机毕业设计
使用Python作为开发语言,基于文本数据集(一个积极的xls文本格式和一个消极的xls文本格式文件),使用Word2vec对文本进行处理。通过支持向量机SVM算法训练情绪分类模型。实现对文本消极情感和文本积极情感的识别。并基于Django框架开发网页平台实现对用户的可视化操作和数据存储。
20 0
文本情感识别分析系统Python+SVM分类算法+机器学习人工智能+计算机毕业设计
|
28天前
|
SQL 监控 大数据
通过Google Dataflow,我们能够构建一个高效、可扩展且易于维护的实时数据处理系统
【9月更文挑战第7天】随着大数据时代的到来,企业对高效数据处理的需求日益增加,特别是在实时分析和事件驱动应用中。Google Dataflow作为Google Cloud Platform的一项服务,凭借其灵活、可扩展的特点,成为实时大数据处理的首选。本文将介绍Dataflow的基本概念、优势,并通过一个电商日志分析的实际案例和示例代码,展示如何构建高效的数据处理管道。Dataflow不仅支持自动扩展和高可用性,还提供了多种编程语言支持和与GCP其他服务的紧密集成,简化了整个数据处理流程。通过Dataflow,企业可以快速响应业务需求,优化用户体验。
35 3
|
2月前
|
机器学习/深度学习 PHP 开发者
探索PHP中的面向对象编程构建你的首个机器学习模型:以Python和scikit-learn为例
【8月更文挑战第30天】在PHP的世界中,面向对象编程(OOP)是一块基石,它让代码更加模块化、易于管理和维护。本文将深入探讨PHP中面向对象的魔法,从类和对象的定义开始,到继承、多态性、封装等核心概念,再到实战中如何应用这些理念来构建更健壮的应用。我们将通过示例代码,一起见证PHP中OOP的魔力,并理解其背后的设计哲学。
|
2月前
|
机器学习/深度学习 人工智能 Android开发
揭秘AI编程:从零开始构建你的第一个机器学习模型移动应用开发之旅:从新手到专家
【8月更文挑战第29天】本文将带你走进人工智能的奇妙世界,一起探索如何从零开始构建一个机器学习模型。我们将一步步解析整个过程,包括数据收集、预处理、模型选择、训练和测试等步骤,让你对AI编程有一个全面而深入的理解。无论你是AI初学者,还是有一定基础的开发者,都能在这篇文章中找到你需要的信息和启示。让我们一起开启这段激动人心的AI编程之旅吧! 【8月更文挑战第29天】在这篇文章中,我们将探索移动应用开发的奇妙世界。无论你是刚刚踏入这个领域的新手,还是已经有一定经验的开发者,这篇文章都将为你提供有价值的信息和指导。我们将从基础开始,逐步深入到更复杂的主题,包括移动操作系统的选择、开发工具的使用、
|
2月前
|
机器学习/深度学习 人工智能 算法
【悬念揭秘】ML.NET:那片未被探索的机器学习宝藏,如何让普通开发者一夜变身AI高手?——从零开始,揭秘构建智能应用的神秘旅程!
【8月更文挑战第28天】ML.NET 是微软推出的一款开源机器学习框架,专为希望在本地应用中嵌入智能功能的 .NET 开发者设计。无需深厚的数据科学背景,即可实现预测分析、推荐系统和图像识别等功能。它支持多种数据源,提供丰富的预处理工具和多样化的机器学习算法,简化了数据处理和模型训练流程。
37 1
|
2月前
|
机器学习/深度学习 Kubernetes Docker
机器学习开发的灵药:Docker容器
机器学习开发的灵药:Docker容器
|
2月前
|
机器学习/深度学习 数据处理 定位技术
构建您的首个机器学习项目:从理论到实践
【8月更文挑战第28天】本文旨在为初学者提供一个简明的指南,通过介绍一个基础的机器学习项目——预测房价——来揭示机器学习的神秘面纱。我们将从数据收集开始,逐步深入到数据处理、模型选择、训练和评估等环节。通过实际操作,你将学会如何利用Python及其强大的科学计算库来实现自己的机器学习模型。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你打开一扇通往机器学习世界的大门。
|
2月前
|
机器学习/深度学习 自动驾驶 算法
揭秘机器学习:用Python构建你的首个预测模型
【8月更文挑战第26天】 机器学习,这个听起来既神秘又遥不可及的领域,实际上正悄然改变着我们的世界。从推荐系统到自动驾驶汽车,机器学习技术无处不在。本文将带你走进机器学习的世界,通过一个简单的Python代码示例,展示如何构建一个基本的线性回归模型来预测房价。不需要复杂的数学公式或深奥的理论,我们将以最直观的方式理解机器学习的核心概念。无论你是编程新手还是数据科学爱好者,这篇文章都将为你打开一扇新的大门,让你看到数据背后的力量。
|
2月前
|
人工智能 编解码 算法
使用PAI-DSW x Free Prompt Editing开发个人AIGC绘图小助理
本文介绍如何借助阿里云PAI-DSW及Free Prompt Editing算法开发个性化AIGC绘图助手,实现图像智能编辑与生成。首先需领取PAI-DSW免费试用资源并创建实例;随后通过运行教程文件完成环境搭建。WebUI界面预设了多种参数供调整,如图像分辨率、编辑层数量等,支持更改图像背景与风格等功能演示。完成实验后应及时清理资源以避免额外费用。此外,参与阿里云开发者社区的“AIGC绘图小助手”活动,不仅可获得免费云资源试用,还有机会赢得桌面折叠风扇、小度智能屏X9等礼品。
下一篇
无影云桌面