【DSW Gallery】特征平台

本文涉及的产品
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
交互式建模 PAI-DSW,每月250计算时 3个月
模型训练 PAI-DLC,100CU*H 3个月
简介: 特征平台是专门用来存储,共享,管理机器学习模型特征的存储库。特征平台可以方便的向多人、多团队共享特征,提供安全,高效且统一的存储,保证离线在线的一致性。

直接使用

请打开基于特征平台,并点击右上角 “ 在DSW中打开” 。

image.png


pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-0.1.6-py3-none-any.whl


import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import Schema, Field
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

数据集介绍

我们使用的是开源电影数据集,数据集官网:http://moviedata.csuldw.com

其中主要使用的是 movie 数据,user 数据,rating 数据。这三份数据可以对应推荐流程中的物料表,用户表,label 表。

我们将展示如果使用 feature store 方便的将三份数据的特征整合在一起离线训练模型,并且完成后续上线服务。

Project

我们可以通过 project 可以创建多个项目空间,每个项目空间是独立的。project 里会配置基本的信息,每个 project 会对应一个 offlinestore 和 onlinestore。

运行 notebook 需要 feature store server 端配合运行,购买完 pairec 配置中心实例后,在配置中心可以看到服务接口地址 (host) 和 token.

host = ""
token = ""
fs = FeatureStoreClient(host, token)
cur_project_name = "fs_movie_7"
offline_datasource_id = 38
online_datasource_id = 1
project = fs.get_project(cur_project_name)
if project is None:
  project = fs.create_project(cur_project_name, offline_datasource_id, online_datasource_id)

获取对应的 project

project = fs.get_project(cur_project_name)

打印该project的信息

project.print_summary()

FeatureEntity

FeatureEntity 描述了一组相关的特征集合。多个 FeatureView 可以关联一个 FeatureEntity。 每个Entity 都会有一个 Entity JoinId , 通过 JoinId 可以关联多个 FeatureView 特征。每一个 FeatureView 都有一个主键(索引键)来获取其下面的特征数据,但是这里的索引键可以和 JoinId 定义的名称不一样。 这里我们创建 movie, user, rating 三个 Entity。

cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_movie)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
print("entity_id = ", entity_id)

获取对应的entity

获取对应的entity

打印该entity的信息

feature_entity.print_summary()
cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_id = None
entity_id = project.get_entity(cur_entity_name_user)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_user, join_id=join_id)
print("entity_id = ", entity_id)
cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_ratings)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
print("entity_id = ", entity_id)

FeatureView

FeatureStore是一个特征管理平台,当外部的数据进入到 FS 中, 需要通过 FeatureView。 FeatureView 指定了数据从哪里来(DataSource), 数据进入FS 需要哪些转换(特征工程/Transformation), 特征 schema (特征名称+类型),数据需要放到哪里(OnlineStore/OfflineStore)、特征meta(主键、事件时间、分区键, FeatureEntity, ttl )。

FeatureView 会分为两种类型, BatchFeatureView 和 StreamFeatureView 。 BatchFeatureView 可以把离线数据注入到 FS 中, StreamFeatureView 支持实时特征的写入。 BatchFeatureView 会把数据管理到 OfflineStore 里, 然后可以选择同步到 OnlineStore 里。StreamFeatureView 会把数据写入到 OnlineStore 里,然后同步到 OfflineStore 里, 但实际上我们会把同样的数据同时写入到里面。

BatchFeatureView

DataSource 中的特征数据写入 FS 中,有两种情况。

1. 数据直接写入

UrlDataSource 写入到 maxcompute 的 offlinestore, 那么定义的 FeatureView 的 schema 需要手动创建。

path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)

schema 定义了字段的名称和类型。

movie_schema = Schema(
    Field(name='movie_id', type=FSTYPE.STRING),
    Field(name='name', type=FSTYPE.STRING),
    Field(name='alias', type=FSTYPE.STRING),
    Field(name='actores', type=FSTYPE.STRING),
    Field(name='cover', type=FSTYPE.STRING),
    Field(name='directors', type=FSTYPE.STRING),
    Field(name='double_score', type=FSTYPE.STRING),
    Field(name='double_votes', type=FSTYPE.STRING),
    Field(name='genres', type=FSTYPE.STRING),
    Field(name='imdb_id', type=FSTYPE.STRING),
    Field(name='languages', type=FSTYPE.STRING),
    Field(name='mins', type=FSTYPE.STRING),
    Field(name='official_site', type=FSTYPE.STRING),
    Field(name='regions', type=FSTYPE.STRING),
    Field(name='release_data', type=FSTYPE.STRING),
    Field(name='slug', type=FSTYPE.STRING),
    Field(name='story', type=FSTYPE.STRING),
    Field(name='tags', type=FSTYPE.STRING),
    Field(name='year', type=FSTYPE.STRING),
    Field(name='actor_ids', type=FSTYPE.STRING),
    Field(name='director_ids', type=FSTYPE.STRING),
    Field(name='dt', type=FSTYPE.STRING)
)
print(movie_schema)

新建 batch_feature_view

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, owner='yancheng', schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)
batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()

数据写入 mc 表

cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()

查看当前 task 的信息

print(cur_task.task_summary)

数据同步到 onlinestore 中

cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()
print(cur_task.task_summary)

获取对应的FeatureView

batch_feature_view = project.get_feature_view(feature_view_movie_name)

打印该FeatureView的信息

batch_feature_view.print_summary()

我们按此顺序,依次导入 users 表, ratings 表。

users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)
user_schema = Schema(
  Field(name='user_md5', type=FSTYPE.STRING),
  Field(name='user_nickname', type=FSTYPE.STRING),
  Field(name='ds', type=FSTYPE.STRING)
)
print(user_schema)
feature_view_user_name = "feature_view_users_1"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, owner='yancheng', schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])
write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)
cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)
ratings_schema = Schema(
  Field(name='rating_id', type=FSTYPE.STRING),
  Field(name='user_md5', type=FSTYPE.STRING),
  Field(name='movie_id', type=FSTYPE.STRING),
  Field(name='rating', type=FSTYPE.STRING),
  Field(name='rating_time', type=FSTYPE.STRING),
  Field(name='dt', type=FSTYPE.STRING)
)
feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, owner='yancheng', schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])
cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()

Offlinestore

离线特征数据存储的数据仓库,在我们系统中是 MaxCompute 或者是 DS 上的 HDFS,但是通过 spark 进行数据写入。 通过 offlinestore, 我们可以生成 training set 数据,也就是样本,用于模型训练。再一个可以生成 batch predition 数据, 用于批量预测。

Onlinestore

在线预测时,需要低延迟的获取特征数据, onlinestore 提供在线特征数据的存储。我们目前优先支持 hologres 或者 redis。

在线特征的获取

我们可以从 FeatureView 的角度获取在线特征

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.get_online_features(join_ids={'movie_id':['26357307']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_1)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_2 = batch_feature_view.get_online_features(join_ids={'movie_id':['30444960']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_2)

FeatureSelector

当我们从 offlinestore 或者 onlinestore 获取特征时,需要明确的指出该获取哪些特征。可以从 FeatureView 的角度来选择特征.

feature_view_name = 'feature_view_movie'
# 选择部分特征
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])
#选择全部特征
feature_selector = FeatureSelector(feature_view_name, '*')
# 支持别名
feature_selector = FeatureSelector(
    feature_view='user1',
    features = ['f1','f2', 'f3'],
    alias={"f1":"f1_1"} # 字段别名,最终会产出 f1_1 的字段名称 
)

TrainingSet

当我们要训练模型的时候,首先要构造样本表。样本表是由 label 数据和 特征数据组成。在与 FS 交互时, label 数据需要由客户提供,需要定义要获取的特征名称,然后根据主键进行 point-in-time join( 存在 event_time的情况下)

label_ds = MaxComputeDataSource(data_source_id=offline_datasource_id, table='fs_movie_6_feature_view_ratings_offline')
output_ds = MaxComputeDataSource(data_source_id=offline_datasource_id)
label_input = LabelInput(label_ds, event_time='rating_time')
train_set_output = TrainingSetOutput(output_ds)
feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actores', 'regions','tags'])
feature_view_user_name = 'feature_view_users_1'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_input, train_set_output, [feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)

Model

从 offlinestore 的角度讲,我们最终是训练出模型,变成服务进行业务的预测。 那么训练的样本可以从上面的 TrainingSet 获得, 然后就是模型训练,最终会部署成服务。

model_name = "fs_model_movie_rating_3"
owner = 'yancheng'
deploy_config = EASDeployConfig(ak_id= '',region='',config='')
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, owner, train_set, deploy_config)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

导出样本表

实际训练的时候,我们需要导出样本表

指定 label 表以及各个 feature view 的分区, event_time

label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')
movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)
user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users_1', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)

根据指定的条件,导出样本表

task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task = ", task.task_summary)
相关实践学习
使用PAI+LLaMA Factory微调Qwen2-VL模型,搭建文旅领域知识问答机器人
使用PAI和LLaMA Factory框架,基于全参方法微调 Qwen2-VL模型,使其能够进行文旅领域知识问答,同时通过人工测试验证了微调的效果。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
6月前
|
人工智能 自然语言处理 IDE
Trae 开发工具与使用技巧
V哥推荐字节推出的AI原生IDE——Trae,这款工具大幅提升程序员开发效率。Trae定位为“AI协同编程”伙伴,支持零基础用户通过对话完成项目开发。其核心功能包括Builder模式自动生成代码、智能问答辅助开发、上下文引用与多模态开发等。对比Cursor和Windsurf,Trae在中文支持、全自动项目管理和免费模型使用上更具优势。新手可通过3步快速上手:启动Builder模式、一键运行调试、迭代优化。立即体验Trae,开启AI时代编程新篇章!
1321 2
|
10月前
|
开发者
解决 display: inline-block 产生的空白间隙问题
【10月更文挑战第27天】在实际应用中,可以根据具体的布局需求、代码结构和兼容性要求等因素,选择合适的方法来消除空白间隙,以实现更精确、更美观的页面布局效果。
|
10月前
|
API 语音技术
基于Asterisk和TTS/ASR语音识别的配置示例
本文介绍了如何在Asterisk服务器上配置TTS(文本转语音)和ASR(自动语音识别)引擎,包括安装Asterisk、选择并配置TTS和ASR引擎、编辑Asterisk配置文件以实现语音识别和合成的功能,以及测试配置的有效性。具体步骤涉及下载安装包、编辑配置文件、设置API密钥等。
655 1
|
SQL 存储 人工智能
Flink 在蚂蚁实时特征平台的深度应用
本文整理自蚂蚁集团高级技术专家赵亮星云,在 Flink Forward Asia 2023 AI 特征工程专场的分享。
2016 3
Flink 在蚂蚁实时特征平台的深度应用
|
12月前
|
数据安全/隐私保护
CTF — 压缩包密码爆破
CTF — 压缩包密码爆破
696 0
|
机器学习/深度学习 存储 人工智能
特征平台(Feature Store):您需要知道的关于特征平台的一切信息(Continuous)
特征平台已于 2021 年问世,成为实现 AI 的一项重要技术。 尽管高科技公司对特征平台充满热情,但大多数传统 ML 平台仍然缺少它们,并且在许多企业公司中相对不为人知。在这里,我们将介绍特征平台的常见功能,以及在你自己的工作中采用这种方法的利弊。
|
XML 开发框架 JSON
探索 SOAP:揭开 Web 服务的神秘面纱(上)
探索 SOAP:揭开 Web 服务的神秘面纱(上)
|
机器学习/深度学习 人工智能 分布式数据库
FFA 2023 专场解读:AI 特征工程、数据集成
今年 Flink Forward Asia(以下简称 FFA ) 重新回归线下,将于 12 月 8 - 9 日在北京望京凯悦酒店举办。
751 0
FFA 2023 专场解读:AI 特征工程、数据集成
|
供应链 算法
深度 | 5分钟读懂阿里零售通智慧供应链平台
大家好,先做个简单自我介绍,过去十年更多是在2B类业务方面做技术架构和研发工作,近两年专注在零售通供应链方面的技术架构和研发的工作。从技术视角分享二点最近几年感受比较深刻的,第一个点,从技术的架构的升级,从过去的电商架构到现在新零售的架构,比如从过去信息平台到交易平台再到现在供应链协同平台,其架构演进的核心动力是互联网、大数据等技术与商业不断融合和发展。
14196 0
|
Linux 开发工具 git
阿里云dsw实例git clone Hugging Face
因为网络及python包版本的原因,dsw实例在使用git指令下载hugging face资源的时候,总是会出现这样或那样的问题,本文基于实际测试遇到的情况,给出对应的解决方案。
4320 1