【DSW Gallery】特征平台

简介: 特征平台是专门用来存储,共享,管理机器学习模型特征的存储库。特征平台可以方便的向多人、多团队共享特征,提供安全,高效且统一的存储,保证离线在线的一致性。

直接使用

请打开基于特征平台,并点击右上角 “ 在DSW中打开” 。

image.png


pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-0.1.6-py3-none-any.whl


import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import Schema, Field
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

数据集介绍

我们使用的是开源电影数据集,数据集官网:http://moviedata.csuldw.com

其中主要使用的是 movie 数据,user 数据,rating 数据。这三份数据可以对应推荐流程中的物料表,用户表,label 表。

我们将展示如果使用 feature store 方便的将三份数据的特征整合在一起离线训练模型,并且完成后续上线服务。

Project

我们可以通过 project 可以创建多个项目空间,每个项目空间是独立的。project 里会配置基本的信息,每个 project 会对应一个 offlinestore 和 onlinestore。

运行 notebook 需要 feature store server 端配合运行,购买完 pairec 配置中心实例后,在配置中心可以看到服务接口地址 (host) 和 token.

host = ""
token = ""
fs = FeatureStoreClient(host, token)
cur_project_name = "fs_movie_7"
offline_datasource_id = 38
online_datasource_id = 1
project = fs.get_project(cur_project_name)
if project is None:
  project = fs.create_project(cur_project_name, offline_datasource_id, online_datasource_id)

获取对应的 project

project = fs.get_project(cur_project_name)

打印该project的信息

project.print_summary()

FeatureEntity

FeatureEntity 描述了一组相关的特征集合。多个 FeatureView 可以关联一个 FeatureEntity。 每个Entity 都会有一个 Entity JoinId , 通过 JoinId 可以关联多个 FeatureView 特征。每一个 FeatureView 都有一个主键(索引键)来获取其下面的特征数据,但是这里的索引键可以和 JoinId 定义的名称不一样。 这里我们创建 movie, user, rating 三个 Entity。

cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_movie)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
print("entity_id = ", entity_id)

获取对应的entity

获取对应的entity

打印该entity的信息

feature_entity.print_summary()
cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_id = None
entity_id = project.get_entity(cur_entity_name_user)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_user, join_id=join_id)
print("entity_id = ", entity_id)
cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_ratings)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
print("entity_id = ", entity_id)

FeatureView

FeatureStore是一个特征管理平台,当外部的数据进入到 FS 中, 需要通过 FeatureView。 FeatureView 指定了数据从哪里来(DataSource), 数据进入FS 需要哪些转换(特征工程/Transformation), 特征 schema (特征名称+类型),数据需要放到哪里(OnlineStore/OfflineStore)、特征meta(主键、事件时间、分区键, FeatureEntity, ttl )。

FeatureView 会分为两种类型, BatchFeatureView 和 StreamFeatureView 。 BatchFeatureView 可以把离线数据注入到 FS 中, StreamFeatureView 支持实时特征的写入。 BatchFeatureView 会把数据管理到 OfflineStore 里, 然后可以选择同步到 OnlineStore 里。StreamFeatureView 会把数据写入到 OnlineStore 里,然后同步到 OfflineStore 里, 但实际上我们会把同样的数据同时写入到里面。

BatchFeatureView

DataSource 中的特征数据写入 FS 中,有两种情况。

1. 数据直接写入

UrlDataSource 写入到 maxcompute 的 offlinestore, 那么定义的 FeatureView 的 schema 需要手动创建。

path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)

schema 定义了字段的名称和类型。

movie_schema = Schema(
    Field(name='movie_id', type=FSTYPE.STRING),
    Field(name='name', type=FSTYPE.STRING),
    Field(name='alias', type=FSTYPE.STRING),
    Field(name='actores', type=FSTYPE.STRING),
    Field(name='cover', type=FSTYPE.STRING),
    Field(name='directors', type=FSTYPE.STRING),
    Field(name='double_score', type=FSTYPE.STRING),
    Field(name='double_votes', type=FSTYPE.STRING),
    Field(name='genres', type=FSTYPE.STRING),
    Field(name='imdb_id', type=FSTYPE.STRING),
    Field(name='languages', type=FSTYPE.STRING),
    Field(name='mins', type=FSTYPE.STRING),
    Field(name='official_site', type=FSTYPE.STRING),
    Field(name='regions', type=FSTYPE.STRING),
    Field(name='release_data', type=FSTYPE.STRING),
    Field(name='slug', type=FSTYPE.STRING),
    Field(name='story', type=FSTYPE.STRING),
    Field(name='tags', type=FSTYPE.STRING),
    Field(name='year', type=FSTYPE.STRING),
    Field(name='actor_ids', type=FSTYPE.STRING),
    Field(name='director_ids', type=FSTYPE.STRING),
    Field(name='dt', type=FSTYPE.STRING)
)
print(movie_schema)

新建 batch_feature_view

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, owner='yancheng', schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)
batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()

数据写入 mc 表

cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()

查看当前 task 的信息

print(cur_task.task_summary)

数据同步到 onlinestore 中

cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()
print(cur_task.task_summary)

获取对应的FeatureView

batch_feature_view = project.get_feature_view(feature_view_movie_name)

打印该FeatureView的信息

batch_feature_view.print_summary()

我们按此顺序,依次导入 users 表, ratings 表。

users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)
user_schema = Schema(
  Field(name='user_md5', type=FSTYPE.STRING),
  Field(name='user_nickname', type=FSTYPE.STRING),
  Field(name='ds', type=FSTYPE.STRING)
)
print(user_schema)
feature_view_user_name = "feature_view_users_1"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, owner='yancheng', schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])
write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)
cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)
ratings_schema = Schema(
  Field(name='rating_id', type=FSTYPE.STRING),
  Field(name='user_md5', type=FSTYPE.STRING),
  Field(name='movie_id', type=FSTYPE.STRING),
  Field(name='rating', type=FSTYPE.STRING),
  Field(name='rating_time', type=FSTYPE.STRING),
  Field(name='dt', type=FSTYPE.STRING)
)
feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, owner='yancheng', schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])
cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()

Offlinestore

离线特征数据存储的数据仓库,在我们系统中是 MaxCompute 或者是 DS 上的 HDFS,但是通过 spark 进行数据写入。 通过 offlinestore, 我们可以生成 training set 数据,也就是样本,用于模型训练。再一个可以生成 batch predition 数据, 用于批量预测。

Onlinestore

在线预测时,需要低延迟的获取特征数据, onlinestore 提供在线特征数据的存储。我们目前优先支持 hologres 或者 redis。

在线特征的获取

我们可以从 FeatureView 的角度获取在线特征

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.get_online_features(join_ids={'movie_id':['26357307']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_1)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_2 = batch_feature_view.get_online_features(join_ids={'movie_id':['30444960']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_2)

FeatureSelector

当我们从 offlinestore 或者 onlinestore 获取特征时,需要明确的指出该获取哪些特征。可以从 FeatureView 的角度来选择特征.

feature_view_name = 'feature_view_movie'
# 选择部分特征
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])
#选择全部特征
feature_selector = FeatureSelector(feature_view_name, '*')
# 支持别名
feature_selector = FeatureSelector(
    feature_view='user1',
    features = ['f1','f2', 'f3'],
    alias={"f1":"f1_1"} # 字段别名,最终会产出 f1_1 的字段名称 
)

TrainingSet

当我们要训练模型的时候,首先要构造样本表。样本表是由 label 数据和 特征数据组成。在与 FS 交互时, label 数据需要由客户提供,需要定义要获取的特征名称,然后根据主键进行 point-in-time join( 存在 event_time的情况下)

label_ds = MaxComputeDataSource(data_source_id=offline_datasource_id, table='fs_movie_6_feature_view_ratings_offline')
output_ds = MaxComputeDataSource(data_source_id=offline_datasource_id)
label_input = LabelInput(label_ds, event_time='rating_time')
train_set_output = TrainingSetOutput(output_ds)
feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actores', 'regions','tags'])
feature_view_user_name = 'feature_view_users_1'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_input, train_set_output, [feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)

Model

从 offlinestore 的角度讲,我们最终是训练出模型,变成服务进行业务的预测。 那么训练的样本可以从上面的 TrainingSet 获得, 然后就是模型训练,最终会部署成服务。

model_name = "fs_model_movie_rating_3"
owner = 'yancheng'
deploy_config = EASDeployConfig(ak_id= '',region='',config='')
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, owner, train_set, deploy_config)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

导出样本表

实际训练的时候,我们需要导出样本表

指定 label 表以及各个 feature view 的分区, event_time

label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')
movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)
user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users_1', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)

根据指定的条件,导出样本表

task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task = ", task.task_summary)
相关实践学习
使用PAI+LLaMA Factory微调Qwen2-VL模型,搭建文旅领域知识问答机器人
使用PAI和LLaMA Factory框架,基于全参方法微调 Qwen2-VL模型,使其能够进行文旅领域知识问答,同时通过人工测试验证了微调的效果。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
机器学习/深度学习 存储 人工智能
阿里云机器学习PAI全新推出特征平台 (Feature Store),助力AI建模场景特征数据高效利用
机器学习平台 PAI 推出特征平台(PAI-FeatureStore),在所有需要特征的AI建模场景,用户可通过 Feature Store 轻松地共享和重用特征数据,减少资源和时间成本、提升工作效率。
|
4月前
|
SQL Java 数据库连接
MyBatis 分页机制详解:从 RowBounds 到物理分页实践
MyBatis分页策略解析:逻辑分页(RowBounds)将全量数据加载至内存,仅适用于小数据量;物理分页通过SQL层面限制返回数据,性能更优。推荐使用PageHelper插件,自动适配数据库方言,一行代码实现高效分页,避免OOM风险,提升系统稳定性。
|
API 语音技术
基于Asterisk和TTS/ASR语音识别的配置示例
本文介绍了如何在Asterisk服务器上配置TTS(文本转语音)和ASR(自动语音识别)引擎,包括安装Asterisk、选择并配置TTS和ASR引擎、编辑Asterisk配置文件以实现语音识别和合成的功能,以及测试配置的有效性。具体步骤涉及下载安装包、编辑配置文件、设置API密钥等。
1036 1
|
SQL 存储 人工智能
Flink 在蚂蚁实时特征平台的深度应用
本文整理自蚂蚁集团高级技术专家赵亮星云,在 Flink Forward Asia 2023 AI 特征工程专场的分享。
2694 3
Flink 在蚂蚁实时特征平台的深度应用
|
数据安全/隐私保护
CTF — 压缩包密码爆破
CTF — 压缩包密码爆破
2370 0
|
Linux 开发工具 C语言
mac/linux中vim永久显示行号、开启语法高亮
步骤1:   cp /usr/share/vim/vimrc ~/.vimrc   先复制一份vim配置模板到个人目录下   注:redhat 改成 cp /etc/vimrc ~/.vimrc 步骤2:   vi ~/.vimrc   进入insert模式,在最后加二行   syntax on   set nu! 保存收工。
2421 0
|
域名解析 缓存 监控
【域名解析 DNS 专栏】解析失败的 DNS 重试策略与配置优化
【5月更文挑战第28天】DNS解析在数字化时代关键但常遇失败,可能由网络、服务器或域名错误引起。实施智能重试策略(如指数级增长的重试间隔)和配置优化(如选用可靠DNS服务器、设置缓存、监控预警)能提高成功率和系统稳定性。示例代码展示基本DNS重试函数,强调需按业务需求调整策略并配合监控以保证高效稳定的DNS解析。
593 1
|
SQL Web App开发 流计算
Flink入坑指南第五章 - 语法糖 view
Flink入坑指南系列文章,从实际例子入手,一步步引导用户零基础入门实时计算/Flink,并成长为使用Flink的高阶用户。本文属个人原创,仅做技术交流之用,笔者才疏学浅,如有错误,欢迎指正。 什么是view(视图):视图无非就是存储在数据库中并具有名字的 SQL 语句,或者说是以预定义的 SQL 查询的形式存在的数据表的成分。
4474 1
|
Linux 开发工具 git
阿里云dsw实例git clone Hugging Face
因为网络及python包版本的原因,dsw实例在使用git指令下载hugging face资源的时候,总是会出现这样或那样的问题,本文基于实际测试遇到的情况,给出对应的解决方案。
4697 1
|
NoSQL Java 关系型数据库
Flink 动态更新配置,不需要重启作业
Flink 动态更新配置,不需要重启作业
801 1