【DSW Gallery】如何在DLC上提交ElasticBatch任务

本文涉及的产品
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
交互式建模 PAI-DSW,每月250计算时 3个月
模型训练 PAI-DLC,5000CU*H 3个月
简介: ElasticBatch是一种分布式离线弹性批量推理作业类型, 本文将介绍ElasticBatch SDK接口以及如何在DLC上提交ElasticBatch任务。

直接使用

请打开基于如何在DLC上提交ElasticBatch任务,并点击右上角 “ 在DSW中打开” 。

image.png

1. ElasticBatch介绍

ElasticBatch 是一种分布式离线弹性批量推理作业类型。ElasticBatch作业具有以下特点

  • 任务等待时间大大降低, 部分worker有资源即可运行;
  • 支持自动检测慢机并启动backup worker替换,避免任务长尾或者Hang;
  • 支持数据分片全局动态分发,让快节点可以处理更多数据;
  • 支持任务早停,数据全部处理完成后,未启动的worker不再启动,避免增加任务结束时间;
  • 支持容错处理,偶发失败worker会被重新拉起执行;
  • 支持多种IO数据源,比如OSS、NAS以及MaxCompute Table等;
  • 支持使用各种深度学习框架推理模型,比如PyTorch、TensorFlow以及OneFlow等;

ElasticBatch job包含AIMaster和Worker两类节点,AIMaster负责job的全局管控,包括弹性扩缩容、数据分片全局动态分发、慢机检测以及容错处理。Worker是工作节点,从AIMaster获取一个数据分片后,进行数据读取、预处理、推理模型预测、数据后处理以及数据写回,之后重复上述流程,直到无法获取新的数据分片。

image.png

2. ElasticBatch任务提交方式

ElasticBatch任务可以通过DLC-Web或DLC Python SDK提交。

ElasticBatch任务在提交时建议打开容错监控,避免任务运行过程中出现的偶发异常导致任务失败。

ElasticBatch任务推荐的容错监控配置如下所示

--job-execution-mode=Async --fault-tolerant-policy=OnFailure --max-num-of-same-error=20

2.1 在DLC-Web提交

在DLC-Web创建任务时,任务类型选择ElasticBatch即可创建ElasticBatch任务,节点镜像可以根据您的推理模型来选择,比如使用PyTorch模型,那么您可以选择PyTorch镜像作为任务运行环境。

image.png

2.2 使用DLC Python SDK提交#

如果您的环境未安装DLC Python SDK,那么可以通过以下命令安装

!pip install alibabacloud-pai-dlc20201203 -U -q

下面是DLC Python SDK提交ElasticBatch任务示例,示例选择TF镜像作为离线推理运行环境,设置了3个运行worker并开启了容错监控。

import time
from alibabacloud_pai_dlc20201203.client import Client
from alibabacloud_pai_dlc20201203.models import CreateJobRequest, JobSpec, JobSettings
from alibabacloud_tea_openapi.models import Config
workspace_id = "***已有的AI工作空间ID***"
region_id = "cn-hangzhou" # Region,可以是cn-hangzhou,cn-shanghai,cn-shenzhen等
config = Config(
    access_key_id="***你的access_key_id***",
    access_key_secret="***你的access_key_secret***",
    region_id=region_id,
    endpoint= "pai-dlc.{}.aliyuncs.com".format(region_id))
dlc_client = Client(config)
docker_image = "registry.{}.aliyuncs.com/pai-dlc/tensorflow-training:1.15-cpu-py36-ubuntu18.04".format(region_id)
job_spec = [JobSpec(
    type = "Worker",
    pod_count = 3,
    image = docker_image,
    ecs_spec = "ecs.c6.large",)]
settings = JobSettings(
    enable_error_monitoring_in_aimaster = True,
    error_monitoring_args = "--job-execution-mode=Async --fault-tolerant-policy=OnFailure --max-num-of-same-error=20")
create_job_req = CreateJobRequest(
    display_name = "ElasticBatchJobDemo",
    job_type = "ElasticBatchJob",
    workspace_id = workspace_id,
    job_specs = job_spec,
    user_command = "sleep 100",
    settings = settings, )
create_job_resp = dlc_client.create_job(create_job_req)
job_id = create_job_resp.body.job_id
while True:
    job = dlc_client.get_job(job_id).body
    print('job is {}'.format(job.status))
    if job.status in ('Succeeded', 'Failed', 'Stopped'):
        break
    time.sleep(10)

3. ElasticBatch SDK接口说明

您可以使用ElasticBatch SDK编写具体的离线推理代码,由于不同的数据源读写方式差别很大,ElasticBatch提供了针对MaxCompute Table以及POSIX File这两种数据源的SDK接口,下面给出具体接口说明以及使用示例。

3.1 MaxCompute Table数据源#

针对MaxCompute Table数据源,ElasticBatch基于PAIIO以及COMMON_IO做了二次封装,提供了简洁易用的数据输入输出接口。

3.1.2 接口说明

from aimaster import inference as elastic_inference
class MaxComputeTableConfig:
    def __init__(self,
                 input_table,
                 slice_size=4096,
                 output_table=None):
        """定义MaxComputeTable配置.
        参数:
           input_table: 输入表;
           slice_size: 分片大小;
           output_table: 输出表;
        """
class MaxComputeTableClient:
    def __init__(self, config):
        """定义MaxComputeTable Client, 使用该client可以创建输入dataset以及写表writer.
        参数:
            config: 上述定义的MaxComputeTableConfig对象;
        """
    def create_torch_dataset(self,
                          table_path=None,
                          batch_size=1,
                          selected_cols=None,
                          excluded_cols=None,
                          num_threads=1,
                          capacity=1024,
                          transform_fn=None):
        """创建PyTorch dataset, 内部是调用COMMON_IO实现,您可以参看COMMON_IO使用手册了解细节.
        参数:
            table_path: 带有分片信息的表路径;
            batch_size: 每次读取batch_size行数据进行处理;
            selected_cols: 取的列,格式为英文逗号,分隔的字符串。默认值表示读取所有列。该参数与excluded_cols不能同时使用;
            excluded_cols: 排除的列,格式为英文逗号,分隔的字符串。默认值表示读取所有列。该参数与selected_cols不能同时使用;
            num_threads: 内部数据预取线程数;
            capacity: 内部数据预取总量,单位行数;
            transform_fn: 一元函数,对输入的数据进行处理;
        """
    def create_tf_dataset(self,
                        record_defaults,
                        selected_cols=None,
                        excluded_cols=None,
                        num_threads=1,
                        capacity=1024):
        """创建TF dataset, 内部调用PAIIO TableRecordDataset实现,参数详情同TableRecordDataset.
        参数:
            record_defaults: 用于读出列的数据类型转换及列为空时的默认值;
            selected_cols: 取的列,格式为英文逗号,分隔的字符串。默认值表示读取所有列。该参数与excluded_cols不能同时使用;
            excluded_cols: 排除的列,格式为英文逗号,分隔的字符串。默认值表示读取所有列。该参数与selected_cols不能同时使用;
            num_threads: 内部数据预取线程数;
            capacity: 内部数据预取总量,单位行数;
        返回值:
            两个返回值(table_dataset, table_path_placeholder), 其中table_path_placeholder是table_dataset输入占位符
        """    
    def create_table_writer(self, buffer_size=0, filter_value=None):
        """创建写表的writer.
        参数:
            buffer_size: 写缓存大小,如果该值大于0,写操作会放到子进程中进行;
            filter_value: 需要过滤的值,写的数据中含有该值会被丢弃;
        返回值:
            返回一个writer对象,使用方式同COMMON_IO的writer;
        """

3.1.2 使用示例

  • TensorFlow模型推理示例
import aimaster
from aimaster import inference as elastic_inference
import tensorflow as tf
import time
input_table = "odps://project/tables/test_input"
output_table = "odps://project/tables/test_output"
# 定义data client, 通过client创建dataset以及writer
mc_config = elastic_inference.MaxComputeTableConfig(input_table, slice_size=1024, output_table=output_table)
mc_client = elastic_inference.MaxComputeTableClient(mc_config)
# 创建一个dataset, 用于读取表数据
table_dataset, table_path_placeholder = mc_client.create_tf_dataset(record_defaults=("",))
iterator = table_dataset.batch(128).make_initializable_iterator()
table_data = iterator.get_next()
# 创建一个writer, 用于将模型预测结果写表
writer = mc_client.create_table_writer()
with tf.Session() as sess:
    while True:
        try:
            # 获取一个表数据分片, E.g. odps://project/tables/test_input?start=0&end=1024
            table_path = mc_client.get_next_slice()
        except aimaster.data.OutOfRangeException:
            print("All table data have been processed")
            break
    # 处理当前拿到的表数据分片, 只有当前分片数据全部被处理完毕后才可以去获取下一个分片
    sess.run(iterator.initializer, feed_dict={table_path_placeholder:table_path})
    while True:
        try:
            # 读取表数据
            values = sess.run(table_data)
            # 这里模拟模型预测,实际使用时替换成真实的TF模型
            time.sleep(5)
            # 将预测结果写表
            writer.write(values, col_indices=(0,))
        except tf.errors.OutOfRangeError:
            # 表示该分片数据已经处理完成
            break
# 关闭writer
writer.close()
  • PyTorch模型推理示例
import aimaster
from aimaster import inference as elastic_inference
import torch
def get_next_dataLoader(mc_client):
    # 获取一个表数据分片, E.g. odps://algo_platform/tables/table_test?start=0&end=1024
    table_path = mc_client.get_next_slice()
    # 创建一个table dataset
    table_dataset = mc_client.create_torch_dataset(table_path, batch_size=1, num_threads=2, capacity=512)
    # 创建一个dataloader
    dataloader = torch.utils.data.DataLoader(table_dataset, batch_size=128, num_workers=3)
    return dataloader
def predict(values):
    # 这里模拟模型预测,实际使用时替换成真实的torch模型
    result = [(v,) for v in values[0][0]]
    return result
input_table = "odps://project/tables/test_input"
output_table = "odps://project/tables/test_output"
# 定义data client, 通过client创建dataset以及writer
mc_config = elastic_inference.MaxComputeTableConfig(input_table, slice_size=1024, output_table=output_table)
mc_client = elastic_inference.MaxComputeTableClient(mc_config)
# 创建一个writer, 用于写预测结果
writer = mc_client.create_table_writer()
while True:
    try:
        # 获取一个table dataLoader,如果没有分片要处理,抛aimaster.data.OutOfRangeException异常
        dataloader = get_next_dataLoader(mc_client)
    except aimaster.data.OutOfRangeException:
        print("All table data have been processed")
        break
    for data in dataloader:
        result = predict(data)
        writer.write(result, col_indices=(0,))
# 关闭writer
writer.close()

3.2 POSIX File数据源

如果您的数据存在OSS或NAS上,由于OSS或NAS数据源会被挂载到实例的POD中,在POD中可以直接使用POSIX接口进行访问,这里给出ElasticBatch SDK中有关POSIX File弹性推理接口。

3.2.1 接口说明

from aimaster import inference as elastic_inference
class PosixFileConfig: 
    def __init__(self, 
               input_dir,
               output_dir,
               slice_size,
               auto_create_output_dir=True,
               input_file_meta_path=""):
        """定义PosixFile配置.
        参数:
            input_dir: 输入文件目录
            output_dir: 输出文件目录
            slice_size: 分片大小, 单位是文件个数
            auto_create_output_dir: 是否自动创建输出目录
            input_file_meta_path: 输入文件的meta信息文件路径;
                如果为空,AIMaster会从input_dir中list file,这一步可能会很耗时;
                Meta文件要求Json格式,必须包含FileCount以及FileList两个key, 示例如下
                {
                  "FileCount": 2,
                  "FIleDir": "/root/data",
                  "FileList": [
                      "image_1.txt",
                      "image_2.txt",
                   ]
                }  
        """
class PosixFileClient:
    def __init__(self, config):
        """定义PosixFile Client.
        参数:
            config: 上述定义的PosixFileConfig对象;
        """
    def get_next_slice(self):
        """获取一个数据分片.
        返回值:
            包含多个文件路径的list, 比如 [path1, path2, ..., path1024];
        异常:
            如果没有分片可以获取将抛出 aimaster.data.OutOfRangeException 异常;
        """
    def commit_slice(self):
        """提交一个分片,表示此分片已经处理完成.
        """

3.2.2 使用示例

  • 简单示例
import aimaster
from aimaster import inference as elastic_inference
import time
data_config = elastic_inference.PosixFileConfig(
        input_dir="/root/data/input_files/",
        slice_size=1024,
        output_dir="/root/data/output_files/",
        auto_create_output_dir=True)
pf_client = elastic_inference.PosixFileClient(data_config)
while True:
    try:
        # 获取一个slice,类型为list,包含slice_size个file path
        file_slice = pf_client.get_next_slice()
    except aimaster.data.OutOfRangeException:
        print("All file data have been processed")
        break
    # 这里模拟预测以及输出
    time.sleep(10)
    # 提交slice,表示当前slice数据已完成处理
    pf_client.commit_slice()
  • PyTorch inception_v3模型推理示例
# Code adapted from https://pytorch.org/hub/pytorch_vision_inception_v3/
import os
import sys
import torch
from PIL import Image
from torchvision import models
from torchvision import transforms
import aimaster
from aimaster import inference as elastic_inference
class DataInputor(object):
    def __init__(self, pf_client):
        self.pf_client = pf_client
        self.preprocess = self.get_data_preprocess()
    def get_next_dataloader(self):
        filepath_slice = self.pf_client.get_next_slice()
        for filepath in filepath_slice:
            input_image = Image.open(filepath)
            try:
                input_tensor = self.preprocess(input_image)
            except:
                 print("Invalid file data {}".format(filepath))
                 continue
            input_batch = input_tensor.unsqueeze(0)
            if torch.cuda.is_available():
                input_batch = input_batch.to('cuda')
            yield input_batch
    def get_data_preprocess(self):
        return transforms.Compose([
            transforms.Resize(299),
            transforms.CenterCrop(299),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),])
class DataWriter(object):
    def __init__(self, pf_client):
        self.pf_client = pf_client
        self.pre_slice_id = self.pf_client.get_slice_id()
        self.output_dir = self.pf_client.config.output_dir
        self.writer = None
    def write(self, data):
        new_slice_id = self.pf_client.get_slice_id()
        if self.pre_slice_id != new_slice_id:
            self._create_new_writer()
            self.pre_slice_id = new_slice_id
        top5_prob, top5_catid = data
        for i in range(top5_prob.size(0)):
            self.writer.write("{}: {}\n".format(self.categories[top5_catid[i]], top5_prob[i].item()))
    def close(self):
        if all([self.pre_slice_id, self.writer]):
            self.writer.close()
            self.pf_clinet.commit_slice(self.pre_slice_id)
    def _create_new_writer(self, new_slice_id):
        self.close()
        out_file = os.path.join(self.output_dir, new_slice_id)  
        self.writer = open(out_file, "w")
class InceptionV3Classification(object):
    def __init__(self, model_path):
        self.model_path = model_path
        self.model = models.resnet50(pretrained=False)
        self.model.load_state_dict(torch.load(self.model_path))
        self.model.eval()
        if torch.cuda.is_available():
            self.model.to('cuda')
        # wget https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt
        with open("imagenet_classes.txt", "r") as f:
            self.categories = [s.strip() for s in f.readlines()]
    def predict(self, input_batch):
        with torch.no_grad():
            output = self.model(input_batch)
        probabilities = torch.nn.functional.softmax(output[0], dim=0)
        top5_prob, top5_catid = torch.topk(probabilities, 5)
        return (top5_prob, top5_catid)
data_config = elastic_inference.PosixFileConfig(
        input_dir='/root/data/cifar10/imagenet-large',
        slice_size=128,
        output_dir='/root/data/cifar10/output',
        auto_create_output_dir=True)
pf_client = elastic_inference.PosixFileClient(data_config)
data_inputor = DataInputor(pf_client)
predictor = InceptionV3Classification(model_path='/root/data/model/resnet50-19c8e357.pth')
data_writer = DataWriter(pf_client)
while True:
    try:
        dataloader = data_inputor.get_next_dataloader()
        for input_data in dataloader:
            predict_result = predictor.predict(input_data)
            data_writer.write(predict_result)
    except aimaster.data.OutOfRangeException:
      data_writer.close()
      print("All file data have been processed.")
      break
相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
存储 机器学习/深度学习 人工智能
【DSW Gallery】DSW基础使用介绍
PAI-DSW是一款云端机器学习开发IDE,为您提供交互式编程环境,适用于不同水平的开发者。本文为您介绍PAI-DSW的功能特点以及界面的基础使用。
【DSW Gallery】DSW基础使用介绍
|
6月前
|
编解码
PAI-EAS试用-ComfyUI
探索阿里云PAI平台的试用体验,试用地址在[PAI-EAS试用](https//developer.aliyun.com/topic/pai/svd?spm=a2c6h.27234800.J_6638147300.2.717259efR1C0is),提供丰富的资源,如A10服务器。
252 0
|
6月前
|
缓存 分布式计算 监控
DSW、DLC、EAS
DSW、DLC、EAS 是分别表示 "分布式共享内存"、"数据加载与缓存"、"增强型自动调度"的缩写,是 tuemo 工具中常用的三种技术。 1. DSW 分布式共享内存(Distributed Shared Memory)
683 1
|
机器学习/深度学习 人工智能 算法
【DSW Gallery】PAI-DSW快速入门
PAI-DSW是一款为AI开发者量身定制的云端机器学习交互式开发IDE,随时随地开启Notebook快速读取数据、开发算法、训练及部署模型。本文介绍如何快速上手PAI-DSW。
【DSW Gallery】PAI-DSW快速入门
|
存储 机器学习/深度学习 人工智能
【DSW Gallery】如何在DLC训练任务中挂载OSS
阿里云对象存储OSS(Object Storage Service)是一款海量、安全、低成本、高可靠的云存储服务。本文将介绍如何使用在DLC训练任务中挂载OSS,使用读写本地文件的方式来访问OSS中的数据。
【DSW Gallery】如何在DLC训练任务中挂载OSS
|
机器学习/深度学习 人工智能 Kubernetes
【DSW Gallery】介绍如何使用命令行工具提交DLC任务
本文介绍如何使用DLC命令行工具提交任务到指定的工作空间内. 同时,会介绍如何提交预付费和后付费的DLC训练任务
【DSW Gallery】介绍如何使用命令行工具提交DLC任务
|
存储 机器学习/深度学习 Kubernetes
【DSW Gallery】如何在DLC中进行Pytorch DDP分布式训练任务
本文基于Pytorch 1.8版本,介绍了如何使用DLC进行Pytorch DDP分布式训练任务.
【DSW Gallery】如何在DLC中进行Pytorch DDP分布式训练任务
|
机器学习/深度学习 监控 Kubernetes
【DSW Gallery】如何提交开启AIMaster容错监控的DLC任务
AIMaster是一个管控组件,起到任务监控、容错判断以及资源控制等作用。本文将介绍如何使用Python SDK提交开启AIMaster容错监控的DLC任务。
【DSW Gallery】如何提交开启AIMaster容错监控的DLC任务
|
机器学习/深度学习 IDE Cloud Native
【DSW Gallery】如何在DSW/DLC中使用企业版镜像服务ACR
PAI-DSW是一款云端机器学习开发IDE,为您提供交互式编程环境。用户可以使用官方镜像或者自定义镜像,创建DSW实例;进入DSW实例后,用户有root权限可以任意自定义环境(安装更新系统软件,Python包等),然后保存环境到ACR中,然后用于PAI-DLC进行分布式训练。本文将介绍如何在DSW/DLC中使用阿里云提供的容器镜像服务ACR。
【DSW Gallery】如何在DSW/DLC中使用企业版镜像服务ACR
|
机器学习/深度学习 人工智能 并行计算
【DSW Gallery】DSW镜像使用入门
介绍DSW中如何使用官方镜像、自定义镜像、第三方镜像地址来启动服务。DSW环境进行定制修改之后还可以选择停机保存环境或者保存镜像到ACR镜像仓库。
【DSW Gallery】DSW镜像使用入门