【DSW Gallery】COMMON_IO使用指南

本文涉及的产品
交互式建模 PAI-DSW,每月250计算时 3个月
模型训练 PAI-DLC,100CU*H 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
简介: COMMON_IO模块提供了TableReader和TableWriter两个接口,使用TableReader可以读取ODPS Table中的数据,使用TableWriter可以将数据写入ODPS Table。

直接使用

请打开基于COMMON_IO使用指南,并点击右上角 “ 在DSW中打开” 。

image.png


为了方便读写MaxCompute Table数据,我们基于MaxCompute Tunnel开发了COMMON_IO模块,它提供了TableReader和TableWriter两个接口,使用TableReader可以读取MaxCompute Table中的数据,使用TableWriter可以将数据写入MaxCompute Table,使用这两个接口时需要先在配置文件中配置账户AK等信息,否则无权读写MaxCompute Table。

说明:

- COMMON_IO已在DLC/DSW官方镜像中安装, 暂不支持自定义镜像;

- COMMON_IO适用PyTorch任务读取MaxCompute Table数据场景;

- COMMON_IO适用机器学习任务写MaxCompute Table场景;

1. 准备工作:配置账户信息

配置文件内容格式如下所示,包含了MaxCompute access_id、access_key以及endpoint信息。

access_id/access_key获取方式参见链接

end_point填入您的MaxCompute项目所在区域对应的Endpoint,可参考链接,例如杭州region endpoint为:http://service.cn-hangzhou.maxcompute.aliyun.com/api

access_id=xxxx

access_key=xxxx

end_point=http://xxxx

在代码中通过以下方式指定配置文件路径

os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

2. TableReader使用说明

2.1 接口说明

接口定义

common_io.table.TableReader(
    table,
    selected_cols="",
    excluded_cols="",
    slice_id=0,
    slice_count=1)

image.png

接口方法

reader.read(num_records=1, allow_smaller_final_batch=False)

  • 顺序读取num_records值对应的行数并返回,默认读取1行。当num_records参数超出未读的行数时,返回读取到的所有行。当未读取到记录时,抛出异常(Exception: "End of table reached!")。
  • Read读取操作返回一个python数组,数组中每个元素为表的一行数据组成的一个tuple。

reader.start_pos

  • 获取读取的表(分片)起始位置

reader.end_pos

  • 获取读取的表(分片)结束位置

reader.offset_pos

  • 获取正在读取的位置

reader.get_row_count()

  • 返回表的行数。如果设置slice_id和slice_count,则返回分片大小

reader.get_schema()

  • 获取表的schema

reader.seek(offset=0)

  • 定位到相应行,下一个Read操作将从定位的行开始

reader.close()

  • 关闭reader

2.2 使用示例

假设在algo_platform_dev项目中存储了一张名为test的表,内容如下所示。

image.png

以下代码实现了使用TableReader读取itemid、name及price列的数据。

import os
import common_io
# 指定配置文件路径
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/workspace/tunnel_io/odps_config.ini"
# 打开一个表,返回reader对象
reader = common_io.table.TableReader(
    "odps://algo_platform_dev/tables/test", 
    selected_cols="itemid,name,price")
# 获得表的总行数
total_records_num = reader.get_row_count()
print("total_records_num:", total_records_num)
batch_size = 2
# 读表,返回值将是一个python数组,形式为[(itemid, name, price)*2]
records = reader.read(batch_size)
print("records:", records)
records = reader.read(batch_size, True)
print("records:", records)
try:
    # 继续读取将抛出异常, 原因是数据已全部读取完毕
    records = reader.read(batch_size, True)
except common_io.exception.OutOfRangeException:
    pass
# 关闭reader
reader.close()
total_records_num: 3
records: [(25, 'Apple', 5.0), (38, 'Pear', 4.5)]
records: [(17, 'Watermelon', 2.2)]

3. TableWriter使用说明

3.1 接口说明

接口定义

common_io.table.TableWriter(
    table,
    slice_id=0)

image.png

接口方法

writer.write(values, indices)

  • values为需要写入的数据,类型为python数组、或者np.ndarray
  • col_indices为写入数据对应的列号,类型为python tuple

writer.close()

  • 关闭close,close调用后数据才会真正写入

3.2 使用示例

假设在algo_platform_dev项目中存储了一张名为test的表,一共四列数据,分别为:

itemid(bigint)、name(string)、price(double)、virtual(bool)

下面的示例展示了如何将数据写入test表。

import os
import common_io
# 指定配置文件路径
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/workspace/tunnel_io/odps_config.ini"
# 准备数据
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]
# 打开一个表,返回writer对象
writer = common_io.table.TableWriter("odps://algo_platform_dev/tables/test")
# 将数据写至表中的第0-3列
records = writer.write(values, col_indices=[0, 1, 2, 3])
# 关闭writer, 执行close后,数据才会真正写入
writer.close()

4. 最佳实践

4.1 构建pytorch dataset

基于common_io构建dataset示例如下,构建了一个流式dataset。

import os
import re
import torch
import common_io
from torch.utils.data import Dataset
train_table = "odps://algo_platform_dev/tables/common_io_test"
class TableDataset(torch.utils.data.IterableDataset):
    def __init__(self, table_path, slice_id=0, slice_count=1):
        self.table_path = table_path
        reader = common_io.table.TableReader(table_path,
                                             slice_id=slice_id,
                                             slice_count=slice_count,
                                             num_threads=0)
        self.row_count = reader.get_row_count()
        self.start_pos = reader.start_pos
        self.end_pos = reader.end_pos
        reader.close()
        super(TableDataset, self).__init__()
        print("table total_row_count:{}, start_pos:{}, end_pos:{}".format(self.row_count, self.start_pos, self.end_pos))
    def __iter__(self):
        worker_info = torch.utils.data.get_worker_info()
        if worker_info is None:
            worker_id = 0
            num_workers = 1
        else:
            worker_id = worker_info.id
            num_workers = worker_info.num_workers
        print("worker_id:{}, num_workers:{}".format(worker_id, num_workers))
        table_start, table_end = self._get_slice_range(self.row_count, worker_id, num_workers, self.start_pos)
        table_path = "{}?start={}&end={}".format(self.table_path, table_start, table_end)
        print("table_path:%s" % table_path)
        def table_data_iterator():
            reader = common_io.table.TableReader(table_path, num_threads=1, capacity=1024)
            while True:
                try:
                    data = reader.read(num_records=1, allow_smaller_final_batch=True)
                except common_io.exception.OutOfRangeException:
                    reader.close()
                    break
                yield data
        return table_data_iterator()
    def _get_slice_range(self, row_count, worker_id, num_workers, baseline=0):
        # div-mod split, each slice data count max diff 1
        size = int(row_count / num_workers)
        split_point = row_count % num_workers
        if worker_id < split_point:
            start = worker_id * (size + 1) + baseline
            end = start + (size + 1)
        else:
            start = split_point * (size + 1) + (worker_id - split_point) * size + baseline
            end = start + size
        return start, end
slice_id = int(os.environ.get('RANK', 0))
slice_count = int(os.environ.get('WORLD_SIZE', 1))
train_dataset = TableDataset(train_table, slice_id, slice_count)
train_ld = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=3,
    shuffle=False,
    pin_memory=False,
    sampler=None,
    num_workers=5,
    collate_fn=lambda x: x )
for data in train_ld:
    print(data)

5. FAQ

5.1 错误 No such file: /root/.odps_config.ini

该错误表示未找到配置文件,参考使用说明准备工作部分。

相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
Kubernetes TensorFlow 算法框架/工具
Kubeflow 使用指南
Kubeflow(https://github.com/kubeflow)是基于Kubernetes(https://kubernets.io,容器编排与管理服务软件)和TensorFlow(https://tensorflow.org,深度学习库)的机器学习流程工具,使用Ksonnet进行应用包的管理。
12842 0
|
11天前
|
Java API Apache
java集成stable diffusion
通过REST API和JNI两种方法,我们可以在Java应用程序中集成Stable Diffusion模型。REST API方法更加简单和易于维护,而JNI方法则提供更高的性能。根据具体应用场景和需求,选择合适的集成方法,可以充分利用Stable Diffusion的强大功能,实现高效的图像生成和处理。
49 15
|
5月前
|
API Python
【Azure Compute Gallery】使用 Python 代码从 Azure Compute Gallery 复制 Image-Version
【Azure Compute Gallery】使用 Python 代码从 Azure Compute Gallery 复制 Image-Version
|
SQL Java 关系型数据库
JPA 之 QueryDSL-JPA 使用指南2
JPA 之 QueryDSL-JPA 使用指南2
865 1
|
7月前
|
存储 计算机视觉 Python
huggingface下载的.arrow数据集读取与使用说明
huggingface下载的.arrow数据集读取与使用说明
|
SQL Java 数据库连接
JPA 之 QueryDSL-JPA 使用指南
JPA 之 QueryDSL-JPA 使用指南
562 0
|
8月前
|
并行计算 计算机视觉 异构计算
ModelScope问题之官方模型demo运行时候报错如何解决
ModelScope模型报错是指在使用ModelScope平台进行模型训练或部署时遇到的错误和问题;本合集将收集ModelScope模型报错的常见情况和排查方法,帮助用户快速定位问题并采取有效措施。
242 3
|
开发工具
阿里云的镜像服务(mirrors.aliyun.com)可以同步 Google Cloud SDK 的软件包
阿里云的镜像服务(mirrors.aliyun.com)可以同步 Google Cloud SDK 的软件包
2254 3
|
存储 缓存 Prometheus
听GPT 讲Prometheus源代码--util
听GPT 讲Prometheus源代码--util
117 0
|
机器学习/深度学习 人工智能 算法
阿里云DLC运行DDP Sample
PAI提供的云原生基础AI平台,提供灵活、稳定、易用和高性能的机器学习训练环境。该平台支持多种算法框架、超大规模分布式深度学习任务运行及自定义算法框架。本文演示如何在DLC上面运行Pytorch DDP任务。
586 0