kuairand-27k的Parquet 数据导出与上传到 MaxCompute 完整流程(hstu格式)

简介: 本文详解如何将本地kuairand-27k(1257行×14列)Parquet推荐数据集,经探查、类型映射(int64→bigint,list→array<bigint>),通过pyodps上传至阿里云MaxCompute表,含完整环境配置、建表与批量上传代码。

概述

本文介绍如何将本地 Parquet 文件(kuairand-27k 推荐系统数据集)导出并上传到阿里云 MaxCompute 表,包含:数据探查、类型映射、建表、上传的完整代码。


1. 环境准备

依赖安装

pip install pandas pyarrow pyodps
  • pandas + pyarrow:读取 Parquet 文件
  • pyodps:阿里云 MaxCompute Python SDK

凭证配置

通过环境变量传入 AccessKey,避免硬编码:

export ACCESS_ID="your_access_id"
export ACCESS_KEY="your_access_key"

2. 数据探查:读取 Parquet 文件

在上传之前,先了解 Parquet 文件的数据结构:

import pandas as pd

df = pd.read_parquet("kuairand-27k-train-0.parquet")

print(f"数据总行数: {len(df)}")
print(f"数据列数: {len(df.columns)}")
print(f"\n列名列表:")
print(df.columns.tolist())
print(f"\n数据类型:")
print(df.dtypes)

# 逐列查看前5条数据
for col in df.columns:
    print(f"\n--- {col} (dtype: {df[col].dtype}) ---")
    for i in range(min(5, len(df))):
        val = df[col].iloc[i]
        if isinstance(val, (list,)):
            print(f"  [{i}] len={len(val)}, 前10个值: {val[:10]}")
        else:
            print(f"  [{i}] {val}")

探查结果

本数据集共 1257 行、14 列,结构如下:

列名 Python 类型 说明
user_id int64 用户 ID
user_active_degree int64 用户活跃度
follow_user_num_range int64 关注人数区间
fans_user_num_range int64 粉丝人数区间
friend_user_num_range int64 好友人数区间
register_days_range int64 注册天数区间
video_id list(int) 用户历史交互视频序列
action_timestamp list(int) 行为时间戳序列
action_weight list(int) 行为权重序列(bitmask 编码)
watch_time list(int) 观看时长序列
item_video_id list(int) 候选视频 ID 序列
item_action_weight list(int) 候选视频行为标签
item_target_watchtime list(int) 候选视频目标观看时长
item_query_time list(int) 候选请求时间戳

3. 类型映射:Parquet → MaxCompute

Parquet/Python 类型 MaxCompute 类型
int64(标量) bigint
list(int)(数组) array<bigint>

4. 完整上传脚本

#!/usr/bin/env python3
"""
将 kuairand-27k-train-0.parquet 数据上传到 MaxCompute 表 pairec_kuairand_train
"""

import os
import pandas as pd
import numpy as np
from odps import ODPS
from odps.models import TableSchema as Schema, Column

# ========== 1. 配置连接参数 ==========
project_name = "pairec_mc"
access_id = os.environ["ACCESS_ID"]
access_key = os.environ["ACCESS_KEY"]
endpoint = "http://service.cn.maxcompute.aliyun.com/api"

# ========== 2. 连接 MaxCompute ==========
odps = ODPS(access_id, access_key, project_name, endpoint=endpoint)
print("MaxCompute 连接成功")

# ========== 3. 建表 ==========
TABLE_NAME = "pairec_kuairand_train"

# 先删除已有表(如需覆盖写入)
if odps.exist_table(TABLE_NAME):
    print(f"表 {TABLE_NAME} 已存在,正在删除...")
    odps.delete_table(TABLE_NAME)
    print(f"表 {TABLE_NAME} 已删除")

# 定义表 schema
columns = [
    # 用户侧标量字段 (bigint)
    Column(name="user_id", type="bigint"),
    Column(name="user_active_degree", type="bigint"),
    Column(name="follow_user_num_range", type="bigint"),
    Column(name="fans_user_num_range", type="bigint"),
    Column(name="friend_user_num_range", type="bigint"),
    Column(name="register_days_range", type="bigint"),
    # 历史序列字段 (array<bigint>)
    Column(name="video_id", type="array<bigint>"),
    Column(name="action_timestamp", type="array<bigint>"),
    Column(name="action_weight", type="array<bigint>"),
    Column(name="watch_time", type="array<bigint>"),
    # 候选物料字段 (array<bigint>)
    Column(name="item_video_id", type="array<bigint>"),
    Column(name="item_action_weight", type="array<bigint>"),
    Column(name="item_target_watchtime", type="array<bigint>"),
    Column(name="item_query_time", type="array<bigint>"),
]

schema = Schema(columns=columns)
odps.create_table(TABLE_NAME, schema)
print(f"表 {TABLE_NAME} 创建成功")

# ========== 4. 读取 Parquet 并上传数据 ==========
PARQUET_PATH = "kuairand-27k-train-0.parquet"
df = pd.read_parquet(PARQUET_PATH)
print(f"Parquet 读取完成,共 {len(df)} 行")

# 将 numpy 数组转为 Python list(PyODPS 要求原生 Python 类型)
array_cols = [
    "video_id", "action_timestamp", "action_weight", "watch_time",
    "item_video_id", "item_action_weight", "item_target_watchtime", "item_query_time"
]
for col in array_cols:
    df[col] = df[col].apply(
        lambda x: list(x) if isinstance(x, np.ndarray) else (x if isinstance(x, list) else [])
    )

# 标量列确保为 Python int
scalar_cols = [
    "user_id", "user_active_degree", "follow_user_num_range",
    "fans_user_num_range", "friend_user_num_range", "register_days_range"
]
for col in scalar_cols:
    df[col] = df[col].astype(int)

# 使用 Tunnel 上传
table = odps.get_table(TABLE_NAME)
print(f"开始上传数据(共 {len(df)} 行)...")

with table.open_writer() as writer:
    records = []
    for idx, row in df.iterrows():
        record = [
            int(row["user_id"]),
            int(row["user_active_degree"]),
            int(row["follow_user_num_range"]),
            int(row["fans_user_num_range"]),
            int(row["friend_user_num_range"]),
            int(row["register_days_range"]),
            list(row["video_id"]),
            list(row["action_timestamp"]),
            list(row["action_weight"]),
            list(row["watch_time"]),
            list(row["item_video_id"]),
            list(row["item_action_weight"]),
            list(row["item_target_watchtime"]),
            list(row["item_query_time"]),
        ]
        records.append(table.new_record(record))

        if (idx + 1) % 100 == 0:
            print(f"  已处理 {idx + 1}/{len(df)} 行")

    writer.write(records)

print(f"数据上传完成!共上传 {len(records)} 行到 {TABLE_NAME}")

5. 执行

source env.sh  # 设置 ACCESS_ID、ACCESS_KEY 环境变量
python upload_to_odps.py

预期输出:

project_name: pairec
endpoint: http://service.cn.maxcompute.aliyun.com/api
MaxCompute 连接成功
表 pairec_kuairand_train 创建成功
Parquet 读取完成,共 1257 行
开始上传数据(共 1257 行)...
  已处理 100/1257 行
  ...
  已处理 1200/1257 行
数据上传完成!共上传 1257 行到 pairec_kuairand_train

6. 常见问题与注意事项

Endpoint 与 Project 不匹配

MaxCompute 的 Project 绑定到特定 Region 的 Endpoint。如果报 Project not found,需确认 Project 所在的 Region 并切换对应 Endpoint:

Endpoint 适用场景
service.cn.maxcompute.aliyun.com 公网(华东2)

AccessKey 与 Endpoint 网络域不匹配

公网 AccessKey 只能用于公网 Endpoint,内网 AccessKey 只能用于内网 Endpoint,混用会报 AccessKeyIdNotFound

PyODPS 要求原生 Python 类型

open_writer() 写入数据时,不支持 numpy 的 int64ndarray 类型,需显式转换为 Python 原生的 intlist,否则会抛出类型错误。

批量上传性能

对于大数据量(百万行以上),建议分批写入(如每批 5000 条),避免单次 writer.write() 内存过大:

BATCH_SIZE = 5000
with table.open_writer() as writer:
    batch = []
    for idx, row in df.iterrows():
        batch.append(table.new_record([...]))
        if len(batch) >= BATCH_SIZE:
            writer.write(batch)
            batch = []
    if batch:
        writer.write(batch)
相关实践学习
使用PAI+LLaMA Factory微调Qwen2-VL模型,搭建文旅领域知识问答机器人
使用PAI和LLaMA Factory框架,基于全参方法微调 Qwen2-VL模型,使其能够进行文旅领域知识问答,同时通过人工测试验证了微调的效果。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
3月前
|
机器学习/深度学习 分布式计算 搜索推荐
PAI-Rec 召回引擎:构建高性能推荐系统的核心引擎
PAI-Rec是阿里云智能推荐平台的核心召回引擎,经阿里大规模场景验证。支持多路召回融合(U2I/I2I/向量/随机)、召回即过滤、毫秒级实时更新与分布式弹性架构,开箱即用,助力企业构建毫秒级、高精度、强实时的推荐系统。
430 9
|
3月前
|
机器学习/深度学习 自然语言处理 iOS开发
Feature Generator(FG)特征算子配置指南
本文档全面介绍Feature Generator(FG)的各类特征算子配置方法,涵盖基础(ID/原始特征)、计算(表达式)、交叉(组合)、查找(Lookup/Match)、文本(重叠/BM25)、序列、预处理(分词/归一化)及字符串处理(正则替换/切片)等9大类算子,附详细配置示例与说明。
433 9
|
3月前
|
机器学习/深度学习 JSON 自然语言处理
PAI-Rec 特征工程全解析:统计特征、实时特征、序列特征与 FG 特征算子
PAI-Rec是阿里云智能推荐的特征工程解决方案,支持离线统计、实时及序列特征自动衍生,并通过Feature Generator(17种内置算子)保障离线/在线特征一致性,大幅降低开发与维护成本。
599 9
|
机器学习/深度学习 分布式计算 DataWorks
EasyRec 使用介绍|学习笔记
快速学习 EasyRec 使用介绍。
2183 0
|
20天前
|
机器学习/深度学习 人工智能 网络架构
深度解析:Transformer 的“灵魂”——QKV 变换的物理直觉
本文用图书馆检索等生活隐喻,从物理意义与认知科学角度解析Transformer中QKV设计的精妙本质:解耦查询(q)、键(k)、值(v)三重角色,实现语义分离、避免自注意力“自恋”,模拟人类动态信息路由的认知过程。(239字)
350 13
|
3月前
|
机器学习/深度学习 搜索推荐 数据处理
PAI-Rec推荐开发平台:企业级智能推荐解决方案,驱动业务全域增长
PAI-Rec是阿里云一站式推荐系统平台,集成多路召回、多目标精排(如DBMTL)、GPU加速推理与灵活迭代能力,已助力电商、直播、音视频等多行业提升点击率、转化率与ROI,实现高效、低成本、可自主演进的智能推荐。
446 16
|
3月前
|
存储 搜索推荐 PyTorch
为什么使用 TorchRec 训练和推理更快
本文结合TorchEasyRec实践,从四大维度解析推荐系统加速:1)KeyedJaggedTensor统一变长特征,实现Embedding批量融合查找;2)自动分布式分片突破单卡显存瓶颈;3)TrainPipelineSparseDist流水线并行,重叠通信与计算;4)fbgemm-gpu融合优化器,减少显存访问。端到端提升训练效率与扩展性。
462 9
|
3月前
|
存储 搜索推荐 Python
TorchRec大量使用Jagged Tensor
Jagged Tensor(锯齿张量)是专为变长序列设计的紧凑存储格式,用values+lengths/offsets替代padding,显著节省内存与计算。广泛应用于推荐系统中用户行为、多值标签等不等长特征处理,如HSTU模型中的拼接、拆分与矩阵乘法操作。
383 8
|
3月前
|
并行计算 算法框架/工具 iOS开发
TorchRec在macos ARM芯片(Apple Silicon)上无法安装
JaggedTensor等在macOS ARM芯片上无法运行,主因是ARM64与x86_64架构不兼容,且TorchRec深度依赖CUDA——而Apple Silicon仅支持Metal。fbgemm-gpu缺失、Rosetta 2不支持CUDA指令,导致关键操作失败。建议改用MLX框架或标准PyTorch张量替代。
380 4
|
机器学习/深度学习 缓存 PyTorch
为什么要用TorchEasyRec processor?
TorchEasyRec处理器支持Intel和AMD的CPU服务器及GPU推理,兼容普通PyTorch模型。它具备TorchEasyRec的特征工程(FG)和模型推理功能,提供更快的推理性能,降低成本。通过Item Feature Cache特性,它能够缓存特征以减少网络传输,进一步提升特征工程与推理的速度。
389 2

热门文章

最新文章