【DSW Gallery】COMMON_IO使用指南

本文涉及的产品
交互式建模 PAI-DSW,每月250计算时 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
模型训练 PAI-DLC,100CU*H 3个月
简介: COMMON_IO模块提供了TableReader和TableWriter两个接口,使用TableReader可以读取ODPS Table中的数据,使用TableWriter可以将数据写入ODPS Table。

直接使用

请打开基于COMMON_IO使用指南,并点击右上角 “ 在DSW中打开” 。

image.png


为了方便读写MaxCompute Table数据,我们基于MaxCompute Tunnel开发了COMMON_IO模块,它提供了TableReader和TableWriter两个接口,使用TableReader可以读取MaxCompute Table中的数据,使用TableWriter可以将数据写入MaxCompute Table,使用这两个接口时需要先在配置文件中配置账户AK等信息,否则无权读写MaxCompute Table。

说明:

- COMMON_IO已在DLC/DSW官方镜像中安装, 暂不支持自定义镜像;

- COMMON_IO适用PyTorch任务读取MaxCompute Table数据场景;

- COMMON_IO适用机器学习任务写MaxCompute Table场景;

1. 准备工作:配置账户信息

配置文件内容格式如下所示,包含了MaxCompute access_id、access_key以及endpoint信息。

access_id/access_key获取方式参见链接

end_point填入您的MaxCompute项目所在区域对应的Endpoint,可参考链接,例如杭州region endpoint为:http://service.cn-hangzhou.maxcompute.aliyun.com/api

access_id=xxxx

access_key=xxxx

end_point=http://xxxx

在代码中通过以下方式指定配置文件路径

os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

2. TableReader使用说明

2.1 接口说明

接口定义

common_io.table.TableReader(
    table,
    selected_cols="",
    excluded_cols="",
    slice_id=0,
    slice_count=1)

image.png

接口方法

reader.read(num_records=1, allow_smaller_final_batch=False)

  • 顺序读取num_records值对应的行数并返回,默认读取1行。当num_records参数超出未读的行数时,返回读取到的所有行。当未读取到记录时,抛出异常(Exception: "End of table reached!")。
  • Read读取操作返回一个python数组,数组中每个元素为表的一行数据组成的一个tuple。

reader.start_pos

  • 获取读取的表(分片)起始位置

reader.end_pos

  • 获取读取的表(分片)结束位置

reader.offset_pos

  • 获取正在读取的位置

reader.get_row_count()

  • 返回表的行数。如果设置slice_id和slice_count,则返回分片大小

reader.get_schema()

  • 获取表的schema

reader.seek(offset=0)

  • 定位到相应行,下一个Read操作将从定位的行开始

reader.close()

  • 关闭reader

2.2 使用示例

假设在algo_platform_dev项目中存储了一张名为test的表,内容如下所示。

image.png

以下代码实现了使用TableReader读取itemid、name及price列的数据。

import os
import common_io
# 指定配置文件路径
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/workspace/tunnel_io/odps_config.ini"
# 打开一个表,返回reader对象
reader = common_io.table.TableReader(
    "odps://algo_platform_dev/tables/test", 
    selected_cols="itemid,name,price")
# 获得表的总行数
total_records_num = reader.get_row_count()
print("total_records_num:", total_records_num)
batch_size = 2
# 读表,返回值将是一个python数组,形式为[(itemid, name, price)*2]
records = reader.read(batch_size)
print("records:", records)
records = reader.read(batch_size, True)
print("records:", records)
try:
    # 继续读取将抛出异常, 原因是数据已全部读取完毕
    records = reader.read(batch_size, True)
except common_io.exception.OutOfRangeException:
    pass
# 关闭reader
reader.close()
total_records_num: 3
records: [(25, 'Apple', 5.0), (38, 'Pear', 4.5)]
records: [(17, 'Watermelon', 2.2)]

3. TableWriter使用说明

3.1 接口说明

接口定义

common_io.table.TableWriter(
    table,
    slice_id=0)

image.png

接口方法

writer.write(values, indices)

  • values为需要写入的数据,类型为python数组、或者np.ndarray
  • col_indices为写入数据对应的列号,类型为python tuple

writer.close()

  • 关闭close,close调用后数据才会真正写入

3.2 使用示例

假设在algo_platform_dev项目中存储了一张名为test的表,一共四列数据,分别为:

itemid(bigint)、name(string)、price(double)、virtual(bool)

下面的示例展示了如何将数据写入test表。

import os
import common_io
# 指定配置文件路径
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/workspace/tunnel_io/odps_config.ini"
# 准备数据
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]
# 打开一个表,返回writer对象
writer = common_io.table.TableWriter("odps://algo_platform_dev/tables/test")
# 将数据写至表中的第0-3列
records = writer.write(values, col_indices=[0, 1, 2, 3])
# 关闭writer, 执行close后,数据才会真正写入
writer.close()

4. 最佳实践

4.1 构建pytorch dataset

基于common_io构建dataset示例如下,构建了一个流式dataset。

import os
import re
import torch
import common_io
from torch.utils.data import Dataset
train_table = "odps://algo_platform_dev/tables/common_io_test"
class TableDataset(torch.utils.data.IterableDataset):
    def __init__(self, table_path, slice_id=0, slice_count=1):
        self.table_path = table_path
        reader = common_io.table.TableReader(table_path,
                                             slice_id=slice_id,
                                             slice_count=slice_count,
                                             num_threads=0)
        self.row_count = reader.get_row_count()
        self.start_pos = reader.start_pos
        self.end_pos = reader.end_pos
        reader.close()
        super(TableDataset, self).__init__()
        print("table total_row_count:{}, start_pos:{}, end_pos:{}".format(self.row_count, self.start_pos, self.end_pos))
    def __iter__(self):
        worker_info = torch.utils.data.get_worker_info()
        if worker_info is None:
            worker_id = 0
            num_workers = 1
        else:
            worker_id = worker_info.id
            num_workers = worker_info.num_workers
        print("worker_id:{}, num_workers:{}".format(worker_id, num_workers))
        table_start, table_end = self._get_slice_range(self.row_count, worker_id, num_workers, self.start_pos)
        table_path = "{}?start={}&end={}".format(self.table_path, table_start, table_end)
        print("table_path:%s" % table_path)
        def table_data_iterator():
            reader = common_io.table.TableReader(table_path, num_threads=1, capacity=1024)
            while True:
                try:
                    data = reader.read(num_records=1, allow_smaller_final_batch=True)
                except common_io.exception.OutOfRangeException:
                    reader.close()
                    break
                yield data
        return table_data_iterator()
    def _get_slice_range(self, row_count, worker_id, num_workers, baseline=0):
        # div-mod split, each slice data count max diff 1
        size = int(row_count / num_workers)
        split_point = row_count % num_workers
        if worker_id < split_point:
            start = worker_id * (size + 1) + baseline
            end = start + (size + 1)
        else:
            start = split_point * (size + 1) + (worker_id - split_point) * size + baseline
            end = start + size
        return start, end
slice_id = int(os.environ.get('RANK', 0))
slice_count = int(os.environ.get('WORLD_SIZE', 1))
train_dataset = TableDataset(train_table, slice_id, slice_count)
train_ld = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=3,
    shuffle=False,
    pin_memory=False,
    sampler=None,
    num_workers=5,
    collate_fn=lambda x: x )
for data in train_ld:
    print(data)

5. FAQ

5.1 错误 No such file: /root/.odps_config.ini

该错误表示未找到配置文件,参考使用说明准备工作部分。

相关实践学习
使用PAI+LLaMA Factory微调Qwen2-VL模型,搭建文旅领域知识问答机器人
使用PAI和LLaMA Factory框架,基于全参方法微调 Qwen2-VL模型,使其能够进行文旅领域知识问答,同时通过人工测试验证了微调的效果。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
pip镜像源大全及配置
在中国使用pip时,可以配置国内镜像源来提高安装速度和稳定性。以下是一些常见的国内镜像源:
19032 0
|
分布式计算 自然语言处理 DataWorks
高效使用 PyODPS 最佳实践
以更清晰的认知 PyODPS,DataWorks PyODPS 节点以及 PyODPS 何时在计算集群运行,开发者如何利用 PyODPS 更高效地进行数据开发。
18451 3
高效使用 PyODPS 最佳实践
conda常用操作和配置镜像源
conda常用操作和配置镜像源
31006 0
|
Linux 数据安全/隐私保护 Windows
更换(Pypi)pip源到国内镜像
pip国内的一些镜像 阿里云 http://mirrors.aliyun.com/pypi/simple/ 中国科技大学 https://pypi.mirrors.
247618 2
|
机器学习/深度学习 自然语言处理 算法
深度学习-生成式检索-论文速读-2024-09-14(下)
深度学习-生成式检索-论文速读-2024-09-14(下)
|
机器学习/深度学习 缓存 自然语言处理
PyTorch使用Tricks:梯度裁剪-防止梯度爆炸或梯度消失 !!
PyTorch使用Tricks:梯度裁剪-防止梯度爆炸或梯度消失 !!
1247 0
|
Shell
bash: accelerate: command not found
bash: accelerate: command not found
3825 3
|
Web App开发 测试技术 iOS开发
Mac OS 安装Wget
有些时候,我们希望直接通过 wget 来下载文件。 Mac OS 可以通过以下几种方式安装 Wget: 1、使用 port 命令 sudo port install wget2、使用 brew 命令 sudo brew install wget3、使用浏览器下载一个Wget的源码包,http://www.
32454 0
|
IDE Java Apache
commons-io如何添加和常见的用法
commons-io如何添加和常见的用法
|
机器学习/深度学习 人工智能 并行计算
【DSW Gallery】DSW镜像使用入门
介绍DSW中如何使用官方镜像、自定义镜像、第三方镜像地址来启动服务。DSW环境进行定制修改之后还可以选择停机保存环境或者保存镜像到ACR镜像仓库。
【DSW Gallery】DSW镜像使用入门