【DSW Gallery】特征平台

本文涉及的产品
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
模型训练 PAI-DLC,5000CU*H 3个月
交互式建模 PAI-DSW,每月250计算时 3个月
简介: 特征平台是专门用来存储,共享,管理机器学习模型特征的存储库。特征平台可以方便的向多人、多团队共享特征,提供安全,高效且统一的存储,保证离线在线的一致性。

直接使用

请打开基于特征平台,并点击右上角 “ 在DSW中打开” 。

image.png


pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-0.1.6-py3-none-any.whl


import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import Schema, Field
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

数据集介绍

我们使用的是开源电影数据集,数据集官网:http://moviedata.csuldw.com

其中主要使用的是 movie 数据,user 数据,rating 数据。这三份数据可以对应推荐流程中的物料表,用户表,label 表。

我们将展示如果使用 feature store 方便的将三份数据的特征整合在一起离线训练模型,并且完成后续上线服务。

Project

我们可以通过 project 可以创建多个项目空间,每个项目空间是独立的。project 里会配置基本的信息,每个 project 会对应一个 offlinestore 和 onlinestore。

运行 notebook 需要 feature store server 端配合运行,购买完 pairec 配置中心实例后,在配置中心可以看到服务接口地址 (host) 和 token.

host = ""
token = ""
fs = FeatureStoreClient(host, token)
cur_project_name = "fs_movie_7"
offline_datasource_id = 38
online_datasource_id = 1
project = fs.get_project(cur_project_name)
if project is None:
  project = fs.create_project(cur_project_name, offline_datasource_id, online_datasource_id)

获取对应的 project

project = fs.get_project(cur_project_name)

打印该project的信息

project.print_summary()

FeatureEntity

FeatureEntity 描述了一组相关的特征集合。多个 FeatureView 可以关联一个 FeatureEntity。 每个Entity 都会有一个 Entity JoinId , 通过 JoinId 可以关联多个 FeatureView 特征。每一个 FeatureView 都有一个主键(索引键)来获取其下面的特征数据,但是这里的索引键可以和 JoinId 定义的名称不一样。 这里我们创建 movie, user, rating 三个 Entity。

cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_movie)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
print("entity_id = ", entity_id)

获取对应的entity

获取对应的entity

打印该entity的信息

feature_entity.print_summary()
cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_id = None
entity_id = project.get_entity(cur_entity_name_user)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_user, join_id=join_id)
print("entity_id = ", entity_id)
cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_id = None
entity_id = project.get_entity(cur_entity_name_ratings)
if entity_id is None:
  entity_id = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
print("entity_id = ", entity_id)

FeatureView

FeatureStore是一个特征管理平台,当外部的数据进入到 FS 中, 需要通过 FeatureView。 FeatureView 指定了数据从哪里来(DataSource), 数据进入FS 需要哪些转换(特征工程/Transformation), 特征 schema (特征名称+类型),数据需要放到哪里(OnlineStore/OfflineStore)、特征meta(主键、事件时间、分区键, FeatureEntity, ttl )。

FeatureView 会分为两种类型, BatchFeatureView 和 StreamFeatureView 。 BatchFeatureView 可以把离线数据注入到 FS 中, StreamFeatureView 支持实时特征的写入。 BatchFeatureView 会把数据管理到 OfflineStore 里, 然后可以选择同步到 OnlineStore 里。StreamFeatureView 会把数据写入到 OnlineStore 里,然后同步到 OfflineStore 里, 但实际上我们会把同样的数据同时写入到里面。

BatchFeatureView

DataSource 中的特征数据写入 FS 中,有两种情况。

1. 数据直接写入

UrlDataSource 写入到 maxcompute 的 offlinestore, 那么定义的 FeatureView 的 schema 需要手动创建。

path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)

schema 定义了字段的名称和类型。

movie_schema = Schema(
    Field(name='movie_id', type=FSTYPE.STRING),
    Field(name='name', type=FSTYPE.STRING),
    Field(name='alias', type=FSTYPE.STRING),
    Field(name='actores', type=FSTYPE.STRING),
    Field(name='cover', type=FSTYPE.STRING),
    Field(name='directors', type=FSTYPE.STRING),
    Field(name='double_score', type=FSTYPE.STRING),
    Field(name='double_votes', type=FSTYPE.STRING),
    Field(name='genres', type=FSTYPE.STRING),
    Field(name='imdb_id', type=FSTYPE.STRING),
    Field(name='languages', type=FSTYPE.STRING),
    Field(name='mins', type=FSTYPE.STRING),
    Field(name='official_site', type=FSTYPE.STRING),
    Field(name='regions', type=FSTYPE.STRING),
    Field(name='release_data', type=FSTYPE.STRING),
    Field(name='slug', type=FSTYPE.STRING),
    Field(name='story', type=FSTYPE.STRING),
    Field(name='tags', type=FSTYPE.STRING),
    Field(name='year', type=FSTYPE.STRING),
    Field(name='actor_ids', type=FSTYPE.STRING),
    Field(name='director_ids', type=FSTYPE.STRING),
    Field(name='dt', type=FSTYPE.STRING)
)
print(movie_schema)

新建 batch_feature_view

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, owner='yancheng', schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)
batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()

数据写入 mc 表

cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()

查看当前 task 的信息

print(cur_task.task_summary)

数据同步到 onlinestore 中

cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()
print(cur_task.task_summary)

获取对应的FeatureView

batch_feature_view = project.get_feature_view(feature_view_movie_name)

打印该FeatureView的信息

batch_feature_view.print_summary()

我们按此顺序,依次导入 users 表, ratings 表。

users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)
user_schema = Schema(
  Field(name='user_md5', type=FSTYPE.STRING),
  Field(name='user_nickname', type=FSTYPE.STRING),
  Field(name='ds', type=FSTYPE.STRING)
)
print(user_schema)
feature_view_user_name = "feature_view_users_1"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, owner='yancheng', schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])
write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)
cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)
ratings_schema = Schema(
  Field(name='rating_id', type=FSTYPE.STRING),
  Field(name='user_md5', type=FSTYPE.STRING),
  Field(name='movie_id', type=FSTYPE.STRING),
  Field(name='rating', type=FSTYPE.STRING),
  Field(name='rating_time', type=FSTYPE.STRING),
  Field(name='dt', type=FSTYPE.STRING)
)
feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, owner='yancheng', schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])
cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()

Offlinestore

离线特征数据存储的数据仓库,在我们系统中是 MaxCompute 或者是 DS 上的 HDFS,但是通过 spark 进行数据写入。 通过 offlinestore, 我们可以生成 training set 数据,也就是样本,用于模型训练。再一个可以生成 batch predition 数据, 用于批量预测。

Onlinestore

在线预测时,需要低延迟的获取特征数据, onlinestore 提供在线特征数据的存储。我们目前优先支持 hologres 或者 redis。

在线特征的获取

我们可以从 FeatureView 的角度获取在线特征

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.get_online_features(join_ids={'movie_id':['26357307']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_1)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_2 = batch_feature_view.get_online_features(join_ids={'movie_id':['30444960']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_2)

FeatureSelector

当我们从 offlinestore 或者 onlinestore 获取特征时,需要明确的指出该获取哪些特征。可以从 FeatureView 的角度来选择特征.

feature_view_name = 'feature_view_movie'
# 选择部分特征
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])
#选择全部特征
feature_selector = FeatureSelector(feature_view_name, '*')
# 支持别名
feature_selector = FeatureSelector(
    feature_view='user1',
    features = ['f1','f2', 'f3'],
    alias={"f1":"f1_1"} # 字段别名,最终会产出 f1_1 的字段名称 
)

TrainingSet

当我们要训练模型的时候,首先要构造样本表。样本表是由 label 数据和 特征数据组成。在与 FS 交互时, label 数据需要由客户提供,需要定义要获取的特征名称,然后根据主键进行 point-in-time join( 存在 event_time的情况下)

label_ds = MaxComputeDataSource(data_source_id=offline_datasource_id, table='fs_movie_6_feature_view_ratings_offline')
output_ds = MaxComputeDataSource(data_source_id=offline_datasource_id)
label_input = LabelInput(label_ds, event_time='rating_time')
train_set_output = TrainingSetOutput(output_ds)
feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actores', 'regions','tags'])
feature_view_user_name = 'feature_view_users_1'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_input, train_set_output, [feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)

Model

从 offlinestore 的角度讲,我们最终是训练出模型,变成服务进行业务的预测。 那么训练的样本可以从上面的 TrainingSet 获得, 然后就是模型训练,最终会部署成服务。

model_name = "fs_model_movie_rating_3"
owner = 'yancheng'
deploy_config = EASDeployConfig(ak_id= '',region='',config='')
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, owner, train_set, deploy_config)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

导出样本表

实际训练的时候,我们需要导出样本表

指定 label 表以及各个 feature view 的分区, event_time

label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')
movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)
user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users_1', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)

根据指定的条件,导出样本表

task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task = ", task.task_summary)
相关实践学习
使用PAI-EAS一键部署ChatGLM及LangChain应用
本场景中主要介绍如何使用模型在线服务(PAI-EAS)部署ChatGLM的AI-Web应用以及启动WebUI进行模型推理,并通过LangChain集成自己的业务数据。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
存储 机器学习/深度学习 人工智能
【DSW Gallery】DSW基础使用介绍
PAI-DSW是一款云端机器学习开发IDE,为您提供交互式编程环境,适用于不同水平的开发者。本文为您介绍PAI-DSW的功能特点以及界面的基础使用。
【DSW Gallery】DSW基础使用介绍
|
消息中间件 运维 算法
【DSW Gallery】IsolationForest算法解决异常检测问题
IsolationForest 是一种无监督的异常检测算法, 用于对无 label 的数据进行异常检测,并且支持将 IsolationForest 模型部署成一个流服务,用来对实时数据进行异常检测。该 Demo 将介绍如何在 DSW 中使用 IsolationForest 算法解决异常检测问题。
【DSW Gallery】IsolationForest算法解决异常检测问题
|
机器学习/深度学习 人工智能 自然语言处理
【DSW Gallery】基于EasyNLP的中文信息抽取
EasyNLP提供多种模型的训练及预测功能,旨在帮助自然语言开发者方便快捷地构建模型并应用于生产。本文以中文信息抽取为例,为您介绍如何在PAI-DSW中基于EasyNLP快速使用K-Global Pointer算法进行中文信息抽取模型的训练、评估、推理。
【DSW Gallery】基于EasyNLP的中文信息抽取
|
消息中间件 运维 算法
【DSW Gallery】OneClassSVM 算法解决异常检测问题
OneClassSVM 是一种无监督的异常检测算法, 用于对无 label 的数据进行异常检测,并且支持将 OneClassSVM 模型部署成一个流服务,用来对实时数据进行异常检测。该D emo 将介绍如何在 DSW 中使用 OneClassSVM 算法解决异常检测问题。
【DSW Gallery】OneClassSVM 算法解决异常检测问题
|
机器学习/深度学习 并行计算 数据可视化
【DSW Gallery】EasyCV-基于关键点的视频分类示例
EasyCV是基于Pytorch,以自监督学习和Transformer技术为核心的 all-in-one 视觉算法建模工具,并包含图像分类,度量学习,目标检测,姿态识别等视觉任务的SOTA算法。本文以基于关键点的视频分类为例,为您介绍如何在PAI-DSW中使用EasyCV。
【DSW Gallery】EasyCV-基于关键点的视频分类示例
|
算法 PyTorch 算法框架/工具
【DSW Gallery】基于EasyCV的视频分类示例
EasyCV是基于Pytorch,以自监督学习和Transformer技术为核心的 all-in-one 视觉算法建模工具,并包含图像分类,度量学习,目标检测,姿态识别等视觉任务的SOTA算法。本文以视频分类为例,为您介绍如何在PAI-DSW中使用EasyCV。
【DSW Gallery】基于EasyCV的视频分类示例
|
人工智能 并行计算 算法
【DSW Gallery】基于MOCOV2的自监督学习示例
EasyCV是基于Pytorch,以自监督学习和Transformer技术为核心的 all-in-one 视觉算法建模工具,并包含图像分类,度量学习,目标检测,姿态识别等视觉任务的SOTA算法。本文以自监督学习-MOCO为例,为您介绍如何在PAI-DSW中使用EasyCV。
【DSW Gallery】基于MOCOV2的自监督学习示例
|
机器学习/深度学习 算法
【DSW Gallery】如何使用EasyRec训练DeepFM模型
本文基于EasyRec 0.4.7 展示了如何使用EasyRec快速的训练一个DeepFM模型
【DSW Gallery】如何使用EasyRec训练DeepFM模型
|
缓存 自然语言处理 Shell
【DSW Gallery】基于预训练模型的多场景文本生成(以新闻标题生成为例)
EasyNLP提供多种模型的训练及预测功能,旨在帮助自然语言开发者方便快捷地构建模型并应用于生产。本文以中文文本生成为例,为您介绍如何在PAI-DSW中使用EasyNLP。
【DSW Gallery】基于预训练模型的多场景文本生成(以新闻标题生成为例)