浅析机器学习工作流Kubflow Pipelines

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: KubeflowKubeflow 简述Kubeflow项目是基于容器和Kubernetes构建,旨在为数据科学家、机器学习工程师、系统运维人员提供面向机器学习业务的敏捷部署、开发、训练、发布和管理平台。它利用了云原生技术的优势,让用户更快速、方便的部署、使用和管理当前最流行的机器学习软件。

Kubeflow


Kubeflow 简述

Kubeflow项目是基于容器和Kubernetes构建,旨在为数据科学家、机器学习工程师、系统运维人员提供面向机器学习业务的敏捷部署、开发、训练、发布和管理平台。它利用了云原生技术的优势,让用户更快速、方便的部署、使用和管理当前最流行的机器学习软件。

Kubeflow集成了很多领域的开源项目,比如Jupyter、TF Serving、Katib、Fairing、Argo等。可以针对机器学习的不同阶段:数据预处理、模型训练、模型预测、服务管理等进行管理。只要安装了Kubernetes,可以在本地、机房、云环境中部署。

网络异常,图片无法展示
|


Kubeflow的核心组件

网络异常,图片无法展示
|


如上图展示了工业上机器学习的整体流程,从数据采集,验证,到模型训练再到服务发布等。而图中的每一个小组件都是Kubeflow中包含的。可见,kubeflow的野心之大,同时另一方面也表达了它的功能之强。以下为每个组件的简介:

  • Jupyter: 创建和管理多用户交互式Jupyter notebooks
  • Tensorflow/PyTorch: 当前主要支持的机器学习引擎
  • Seldon: 提供在 Kubernetes 上对机器学习模型的部署
  • TF-Serving: 提供对 Tensorflow 模型的在线部署,支持版本控制及无需停止线上服务切换模型等功能
  • Argo: 基于 Kubernetes 的工作流引擎
  • Pipelines: 是一个基于Argo实现了面向机器学习场景的工作流项目,提供机器学习流程的创建、编排调度和管理,还提供了一个Web UI
  • Ambassador: 对外提供统一服务的网关(API Gateway)
  • Istio: 提供微服务的管理,Telemetry 收集
  • Ksonnet: Kubeflow 使用 ksonnet 来向 kubernetes 集群部署需要的 k8s 资源
  • Operator:针对不同的机器学习框架提供资源调度和分布式训练的能力(TF-OperatorPyTorch-OperatorCaffe2-OperatorMPI-OperatorMXNet-Operator
  • Katib:基于各个Operator实现的超参数搜索和简单的模型结构搜索的系统,支持并行搜索和分布式训练等。超参优化在实际的工作中还没有被大规模的应用,所以这部分的技术还需要一些时间来成熟
  • Pachyderm:Pachyderm版本控制数据,类似于Git对代码的处理。 您可以跟踪一段时间内的数据状态,对历史数据进行回溯测试,与队友共享数据,以及恢复到以前的数据状态

Kubeflow 特点

  • 基于Kubernetes,具有云原生的特性:弹性伸缩、高可用、DevOps等
  • 集成大量机器学习所用到的工具

上面简要介绍了Kubeflow的基本特性。接下来,我们进行Kubeflow Pipelines组件的详细介绍。


KubeFlow Pipelines 简述


Kubeflow v0.1.3之后, Pipelines已经成为Kubeflow的核心组件。Kubeflow的目的主要是为了简化在Kubernetes上运行机器学习任务的流程,最终希望能够实现一套完整可用的流水线, 来实现机器学习从数据到模型的一整套端到端的过程。 而Pipelines是一个工作流平台,能够编译部署机器学习的工作流。所以从这个层面来说,pipeline能够成为Kubeflow的核心组件一点也不意外。

Kubeflow Pipelines 平台包括:

  • 用于管理和跟踪实验、作业和运行的用户界面 (UI)。
  • 用于调度多个步骤 ML 工作流的引擎。
  • 用于定义和操作管道和组件的 SDK。
  • 使用 SDK用于与系统交互的Notebooks


KubeFlow Pipelines 架构图


网络异常,图片无法展示
|


上图为Kubeflow Pipelines的架构图,主要分为八个部分:

  • Python SDK: 用于创建kubeflow Pipelines 组件的特定语言(DSL)。
  • DSL Compiler: 将Python代码转换成YAML静态配置文件(DSL编译器)。
  • Pipeline Web Server: Pipeline的前端服务,它收集各种数据以显示相关视图:当前正在运行的pipeline列表,pipeline执行的历史记录,有关各个pipeline运行的调试信息和执行状态等。
  • Pipeline Service: Pipeline的后端服务,调用K8S服务从YAML创建 pipeline运行。
  • Kubernetes Resources: 创建CRDs运行Pipeline。
  • Machine Learning Metadata Service: 用于监视由Pipeline Service创建的Kubernetes资源,并将这些资源的状态持久化在ML元数据服务中(存储任务流容器之间的input/output数据交互)。
  • Artifact Storage: 用于存储MetadataArtifactKubeflow Pipelines将元数据存储在MySQL数据库中,将工件制品存储在Minio服务器或Cloud Storage等工件存储中。
  • Orchestration controllers:任务编排,比如Argo Workflow控制器,它可以协调任务驱动的工作流。


KubeFlow Pipelines 主要特点


  • 端到端编排:启用和简化机器学习工作流的编排。
  • 轻松实验:让您可以轻松尝试多种想法和方法并管理您的各种试验/实验。
  • 易于重复使用:使您能够重用组件和工作流以快速创建端到端的解决方案,而无需每次都重新构建。


示例


背景

当训练一个新的 ML 模型任务时,大多数数据科学家和 ML 工程师可能会首先开发一些新的 Python 脚本或交互式notebooks,这些脚本或交互式notebooks执行必要的数据提取和预处理,以构建用于训练模型的干净数据集。然后,他们可能会创建几个额外的脚本或notebooks来尝试不同类型的模型或不同的机器学习框架。最后,他们将收集和探索指标以评估每个模型在测试数据集上的表现,然后确定将哪个模型部署到生产中。

网络异常,图片无法展示
|


这显然是对真正的机器学习工作流程的过度简化,但关键是这种通用方法需要大量人工参与,并且除了最初开发它的工程师之外,任何人都不能轻松的重复使用。

我们可以使用KubeFlow Pipelines来解决这些问题。与其将数据准备、模型训练、模型验证和模型部署视为针对我们正在处理的特定模型的单个代码库,我们可以将此工作流程视为一系列单独的模块化步骤,每个步骤都专注于一个具体任务。

网络异常,图片无法展示
|


环境准备

pip install kfp
复制代码


设计工作流程

我们将总共创建四个组件,如下图所示:

网络异常,图片无法展示
|


  • preprocess-data:该组件将从 sklearn.datasets 加载波士顿房价数据集,然后将数据集拆分为训练集和测试集。
  • train-model:该组件将训练一个模型,使用波士顿房价数据集来预测波士顿房屋的中值。
  • test-model:该组件将计算并输出模型在测试数据集上的均方误差。
  • deploy-model:我们不会在本文中关注模型部署,因此该组件只会记录一条消息,说明它正在部署模型。 在实际场景中,这可能是将任何模型部署到生产环境的通用组件。

接下来我们进行代码编写,进行组件开发,进行镜像制作。

预处理组件开发(preprocess-data)

首先,编写预处理代码preprocess.py

import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split
def _preprocess_data():
     X, y = datasets.load_boston(return_X_y=True)
     X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
     np.save('x_train.npy', X_train)
     np.save('x_test.npy', X_test)
     np.save('y_train.npy', y_train)
     np.save('y_test.npy', y_test)
if __name__ == '__main__':
     print('Preprocessing data...')
     _preprocess_data()
复制代码


然后,编写镜像文件Dockerfile

FROM python:3.7-slim
WORKDIR /app
RUN pip install -U scikit-learn numpy
COPY preprocess.py ./preprocess.py
ENTRYPOINT [ "python", "preprocess.py" ]
复制代码


接下来构建镜像:

docker build -t wintfru/boston_pipeline_preprocess:v1  -f Dockerfile .
复制代码


最后推送镜像到远程仓库:

docker push wintfru/boston_pipeline_preprocess:v1
复制代码


剩余组件开发(train-model、test-model、deploy-model)

剩余组件开发与预处理组件开发(preprocess-data)流程类似,详情请见参考文档

构建工作流

首先,我们编排工作流pipeline.py:

import kfp
from kfp import dsl
def preprocess_op():
    return dsl.ContainerOp(
        name='Preprocess Data',
        image='wintfru/boston_pipeline_preprocess:v1',
        arguments=[],
        file_outputs={
            'x_train': '/app/x_train.npy',
            'x_test': '/app/x_test.npy',
            'y_train': '/app/y_train.npy',
            'y_test': '/app/y_test.npy',
        }
    )
def train_op(x_train, y_train):
    return dsl.ContainerOp(
        name='Train Model',
        image='wintfru/boston_pipeline_train:v1',
        arguments=[
            '--x_train', x_train,
            '--y_train', y_train
        ],
        file_outputs={
            'model': '/app/model.pkl'
        }
    )
def test_op(x_test, y_test, model):
    return dsl.ContainerOp(
        name='Test Model',
        image='wintfru/boston_pipeline_test:v1',
        arguments=[
            '--x_test', x_test,
            '--y_test', y_test,
            '--model', model
        ],
        file_outputs={
            'mean_squared_error': '/app/output.txt'
        }
    )
def deploy_model_op(model):
    return dsl.ContainerOp(
        name='Deploy Model',
        image='wintfru/boston_pipeline_deploy:v1',
        arguments=[
            '--model', model
        ]
    )
@dsl.pipeline(
    name='Boston Housing Pipeline',
    description='An example pipeline that trains and logs a regression model.'
)
def boston_pipeline():
    _preprocess_op = preprocess_op()
    _train_op = train_op(
        dsl.InputArgumentPath(_preprocess_op.outputs['x_train']),
        dsl.InputArgumentPath(_preprocess_op.outputs['y_train'])
    ).after(_preprocess_op)
    _test_op = test_op(
        dsl.InputArgumentPath(_preprocess_op.outputs['x_test']),
        dsl.InputArgumentPath(_preprocess_op.outputs['y_test']),
        dsl.InputArgumentPath(_train_op.outputs['model'])
    ).after(_train_op)
    deploy_model_op(
        dsl.InputArgumentPath(_train_op.outputs['model'])
    ).after(_test_op)
# client = kfp.Client()
# client.create_run_from_pipeline_func(boston_pipeline, arguments={})
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(boston_pipeline, __file__ + '.yaml')
复制代码


然后,我们编译pipeline.py,编译成一个Kubernetes任务yaml配置的文件。

python pipeline.py
复制代码


执行工作流

首先,进入Kubflow Pipelines 图形用户界面上传Yaml文件。

网络异常,图片无法展示
|


然后运行该工作流,运行完成之后的DAG图如下所示。

网络异常,图片无法展示
|


我们也可以查看每个组件的输入和输出结果以及控制台日志等。

网络异常,图片无法展示
|



到此为止,整个Kubflow Pipelines工作流程就已经完成。


总结


本文介绍了Kubflow及Kubflow Pipelines的基本架构和组件以及如何使用Kubflow Pipelines实现一个简单的机器学习工作流,用于加载一些数据、训练模型、在保持数据集上对其进行评估,然后“部署”它。通过使用 Kubeflow Pipelines,我们能够将此工作流中的每个步骤封装到工作流组件中,每个组件都在自己的、隔离的 Docker 容器环境中运行。 这种封装促进了我们机器学习工作流程中步骤之间的松耦合,并开辟了在未来工作流中重用组件的可能性。 例如,我们的训练组件中没有任何特定于波士顿房价数据集的内容。 任何时候我们想使用 Sklearn 训练回归模型时,我们都可以重用这个组件。

当然,本文只是触及了 Kubeflow Pipelines 的表面,但希望本文能帮助您了解Kubeflow Pipelines组件的基础知识。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
机器学习/深度学习 消息中间件 存储
【干货篇】bilibili:基于 Flink 的机器学习工作流平台在 b 站的应用
介绍 b 站的机器学习工作流平台 ultron 在 b 站多个机器学习场景上的应用。
【干货篇】bilibili:基于 Flink 的机器学习工作流平台在 b 站的应用
|
6月前
|
机器学习/深度学习 Python
机器学习中的工作流机制
机器学习中的工作流机制
|
机器学习/深度学习 存储 Kubernetes
如何将 Apache Airflow 用于机器学习工作流
Apache Airflow 是一个流行的平台,用于在 Python 中创建、调度和监控工作流。 它在 Github 上有超过 15,000 颗星,被 Twitter、Airbnb 和 Spotify 等公司的数据工程师使用。 如果您使用的是 Apache Airflow,那么您的架构可能已经根据任务数量及其要求进行了演变。 在 Skillup.co 工作时,我们首先有几百个 DAG 来执行我们所有的数据工程任务,然后我们开始做机器学习。
|
机器学习/深度学习 SQL 存储
图解大数据 | 工作流与特征工程@Spark机器学习
本文介绍Spark中用于大数据机器学习的板块MLlib/ML,讲解机器学习工作流(Pipeline)及其构建方式,并详解讲解基于DataFrame的Spark ML特征工程,包括二值化、定边界离散化、标准化、特征抽取等。
569 0
图解大数据 | 工作流与特征工程@Spark机器学习
|
机器学习/深度学习 SQL Kubernetes
开源机器学习工作流Ploomber
简述 Ploomber 是为数据科学和机器学习构建可靠数据工作流的最简单方法。 当你以标准形式提供您的源代码,Ploomber 会自动为您构建工作流。 任务可以是 Python 函数、Jupyter Notebook、Python/R/shell 脚本和 SQL 脚本中的任何内容。当你准备就绪后,无需更改代码即可部署到 Airflow 或 Kubernetes(使用 Argo)。
|
机器学习/深度学习 数据采集 分布式计算
大数据 | Spark机器学习工作流开发指南
Spark.ml是在Spark 1.2开始引入的一个包,它旨在提供一套统一的高级API,帮助用户创建和优化实用的机器学习工作流,它在原来的MLlib的基础上进行了大量的改进和优化,让Spark生态更见坚不可摧,本文就来详细介绍一下Spark机器学习工作流的基本概念和用法。
大数据 | Spark机器学习工作流开发指南
|
机器学习/深度学习 固态存储 容器
像Google一样构建机器学习系统 - 在阿里云上搭建Kubeflow Pipelines
谈到机器学习工作流平台,Google的工程经验非常丰富,它的TensorFlow Extended机器学习平台支撑了Google的搜索,翻译,视频等核心业务;更重要的是其对机器学习领域工程效率问题的理解深刻,
5578 0
|
存储 TensorFlow 算法框架/工具
解锁云原生 AI 技能 - 开发你的机器学习工作流
按照上篇文章《解锁云原生 AI 技能 | 在 Kubernetes 上构建机器学习系统》搭建了一套 Kubeflow Pipelines 之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于 Kubeflow Pipelines 的机器学习工作流。
|
机器学习/深度学习 TensorFlow 算法框架/工具
像Google一样构建机器学习系统2 - 开发你的机器学习工作流
按照上篇文章搭建了一套Kubeflow Pipelines之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于Kubeflow Pipelines的机器学习工作流。 准备工作 机器学习工作流是一个任务驱动的流程,同时也是数据驱动的流程,这里涉及到数据的导入和准备,模型训练Checkpoint的导出评估,到最终模型的导出。
5579 0
|
机器学习/深度学习 关系型数据库 MySQL
Kubeflow Pipeline — 基于Kubernetes 的机器学习工作流
#### 介绍 Pipeline是Kubeflow社区最近开源的一个端到端工作流项目,帮助我们来管理,部署端到端的机器学习工作流。Kubeflow 是一个谷歌的开源项目,它将机器学习的代码像构建应用一样打包,使其他人也能够重复使用。 kubeflow/pipeline 提供了一个工作流方案,将这些机器学习中的应用代码按照流水线的方式编排,形成可重复的工作流。并提供平台,帮助编排,部署,管
7755 0