直接使用
请打开基于特征平台,并点击右上角 “ 在DSW中打开” 。
pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-0.1.6-py3-none-any.whl
import unittest import sys import os from os.path import dirname, join, abspath from feature_store_py.fs_client import FeatureStoreClient from feature_store_py.fs_project import FeatureStoreProject from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput from feature_store_py.fs_type import FSTYPE from feature_store_py.fs_schema import Schema, Field from feature_store_py.fs_feature_view import FeatureView from feature_store_py.fs_features import FeatureSelector from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig import logging logger = logging.getLogger("foo") logger.addHandler(logging.StreamHandler(stream=sys.stdout))
数据集介绍
我们使用的是开源电影数据集,数据集官网:http://moviedata.csuldw.com
其中主要使用的是 movie 数据,user 数据,rating 数据。这三份数据可以对应推荐流程中的物料表,用户表,label 表。
我们将展示如果使用 feature store 方便的将三份数据的特征整合在一起离线训练模型,并且完成后续上线服务。
Project
我们可以通过 project 可以创建多个项目空间,每个项目空间是独立的。project 里会配置基本的信息,每个 project 会对应一个 offlinestore 和 onlinestore。
运行 notebook 需要 feature store server 端配合运行,购买完 pairec 配置中心实例后,在配置中心可以看到服务接口地址 (host) 和 token.
host = "" token = "" fs = FeatureStoreClient(host, token) cur_project_name = "fs_movie_7" offline_datasource_id = 38 online_datasource_id = 1 project = fs.get_project(cur_project_name) if project is None: project = fs.create_project(cur_project_name, offline_datasource_id, online_datasource_id)
获取对应的 project
project = fs.get_project(cur_project_name)
打印该project的信息
project.print_summary()
FeatureEntity
FeatureEntity 描述了一组相关的特征集合。多个 FeatureView 可以关联一个 FeatureEntity。 每个Entity 都会有一个 Entity JoinId , 通过 JoinId 可以关联多个 FeatureView 特征。每一个 FeatureView 都有一个主键(索引键)来获取其下面的特征数据,但是这里的索引键可以和 JoinId 定义的名称不一样。 这里我们创建 movie, user, rating 三个 Entity。
cur_entity_name_movie = "movie_data" join_id = 'movie_id' entity_id = None entity_id = project.get_entity(cur_entity_name_movie) if entity_id is None: entity_id = project.create_entity(name = cur_entity_name_movie, join_id=join_id) print("entity_id = ", entity_id)
获取对应的entity
获取对应的entity
打印该entity的信息
feature_entity.print_summary()
cur_entity_name_user = "user_data" join_id = 'user_md5' entity_id = None entity_id = project.get_entity(cur_entity_name_user) if entity_id is None: entity_id = project.create_entity(name = cur_entity_name_user, join_id=join_id) print("entity_id = ", entity_id)
cur_entity_name_ratings = "rating_data" join_id = 'rating_id' entity_id = None entity_id = project.get_entity(cur_entity_name_ratings) if entity_id is None: entity_id = project.create_entity(name = cur_entity_name_ratings, join_id=join_id) print("entity_id = ", entity_id)
FeatureView
FeatureStore是一个特征管理平台,当外部的数据进入到 FS 中, 需要通过 FeatureView。 FeatureView 指定了数据从哪里来(DataSource), 数据进入FS 需要哪些转换(特征工程/Transformation), 特征 schema (特征名称+类型),数据需要放到哪里(OnlineStore/OfflineStore)、特征meta(主键、事件时间、分区键, FeatureEntity, ttl )。
FeatureView 会分为两种类型, BatchFeatureView 和 StreamFeatureView 。 BatchFeatureView 可以把离线数据注入到 FS 中, StreamFeatureView 支持实时特征的写入。 BatchFeatureView 会把数据管理到 OfflineStore 里, 然后可以选择同步到 OnlineStore 里。StreamFeatureView 会把数据写入到 OnlineStore 里,然后同步到 OfflineStore 里, 但实际上我们会把同样的数据同时写入到里面。
BatchFeatureView
DataSource 中的特征数据写入 FS 中,有两种情况。
1. 数据直接写入
UrlDataSource 写入到 maxcompute 的 offlinestore, 那么定义的 FeatureView 的 schema 需要手动创建。
path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv' delimiter = ',' omit_header = True ds = UrlDataSource(path, delimiter, omit_header) print(ds)
schema 定义了字段的名称和类型。
movie_schema = Schema( Field(name='movie_id', type=FSTYPE.STRING), Field(name='name', type=FSTYPE.STRING), Field(name='alias', type=FSTYPE.STRING), Field(name='actores', type=FSTYPE.STRING), Field(name='cover', type=FSTYPE.STRING), Field(name='directors', type=FSTYPE.STRING), Field(name='double_score', type=FSTYPE.STRING), Field(name='double_votes', type=FSTYPE.STRING), Field(name='genres', type=FSTYPE.STRING), Field(name='imdb_id', type=FSTYPE.STRING), Field(name='languages', type=FSTYPE.STRING), Field(name='mins', type=FSTYPE.STRING), Field(name='official_site', type=FSTYPE.STRING), Field(name='regions', type=FSTYPE.STRING), Field(name='release_data', type=FSTYPE.STRING), Field(name='slug', type=FSTYPE.STRING), Field(name='story', type=FSTYPE.STRING), Field(name='tags', type=FSTYPE.STRING), Field(name='year', type=FSTYPE.STRING), Field(name='actor_ids', type=FSTYPE.STRING), Field(name='director_ids', type=FSTYPE.STRING), Field(name='dt', type=FSTYPE.STRING) ) print(movie_schema)
新建 batch_feature_view
feature_view_movie_name = "feature_view_movie" batch_feature_view = project.get_feature_view(feature_view_movie_name) if batch_feature_view is None: batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, owner='yancheng', schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)
batch_feature_view = project.get_feature_view(feature_view_movie_name) batch_feature_view.print_summary()
数据写入 mc 表
cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'}) cur_task.wait()
查看当前 task 的信息
print(cur_task.task_summary)
数据同步到 onlinestore 中
cur_task = batch_feature_view.publish_table({'dt':'20220830'}) cur_task.wait()
print(cur_task.task_summary)
获取对应的FeatureView
batch_feature_view = project.get_feature_view(feature_view_movie_name)
打印该FeatureView的信息
batch_feature_view.print_summary()
我们按此顺序,依次导入 users 表, ratings 表。
users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv' ds = UrlDataSource(users_path, delimiter, omit_header) print(ds)
user_schema = Schema( Field(name='user_md5', type=FSTYPE.STRING), Field(name='user_nickname', type=FSTYPE.STRING), Field(name='ds', type=FSTYPE.STRING) ) print(user_schema)
feature_view_user_name = "feature_view_users_1" batch_feature_view = project.get_feature_view(feature_view_user_name) if batch_feature_view is None: batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, owner='yancheng', schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])
write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'}) write_table_task.wait() print(write_table_task.task_summary)
cur_task = batch_feature_view.publish_table({'ds':'20220830'}) cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_user_name) batch_feature_view.print_summary()
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv' ds = UrlDataSource(ratings_path, delimiter, omit_header) print(ds)
ratings_schema = Schema( Field(name='rating_id', type=FSTYPE.STRING), Field(name='user_md5', type=FSTYPE.STRING), Field(name='movie_id', type=FSTYPE.STRING), Field(name='rating', type=FSTYPE.STRING), Field(name='rating_time', type=FSTYPE.STRING), Field(name='dt', type=FSTYPE.STRING) )
feature_view_rating_name = "feature_view_ratings" batch_feature_view = project.get_feature_view(feature_view_rating_name) if batch_feature_view is None: batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, owner='yancheng', schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])
cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'}) cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name) batch_feature_view.print_summary()
Offlinestore
离线特征数据存储的数据仓库,在我们系统中是 MaxCompute 或者是 DS 上的 HDFS,但是通过 spark 进行数据写入。 通过 offlinestore, 我们可以生成 training set 数据,也就是样本,用于模型训练。再一个可以生成 batch predition 数据, 用于批量预测。
Onlinestore
在线预测时,需要低延迟的获取特征数据, onlinestore 提供在线特征数据的存储。我们目前优先支持 hologres 或者 redis。
在线特征的获取
我们可以从 FeatureView 的角度获取在线特征
feature_view_movie_name = "feature_view_movie" batch_feature_view = project.get_feature_view(feature_view_movie_name) ret_features_1 = batch_feature_view.get_online_features(join_ids={'movie_id':['26357307']}, features=['name', 'actores', 'regions']) print("ret_features = ", ret_features_1)
feature_view_movie_name = "feature_view_movie" batch_feature_view = project.get_feature_view(feature_view_movie_name) ret_features_2 = batch_feature_view.get_online_features(join_ids={'movie_id':['30444960']}, features=['name', 'actores', 'regions']) print("ret_features = ", ret_features_2)
FeatureSelector
当我们从 offlinestore 或者 onlinestore 获取特征时,需要明确的指出该获取哪些特征。可以从 FeatureView 的角度来选择特征.
feature_view_name = 'feature_view_movie' # 选择部分特征 feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category']) #选择全部特征 feature_selector = FeatureSelector(feature_view_name, '*') # 支持别名 feature_selector = FeatureSelector( feature_view='user1', features = ['f1','f2', 'f3'], alias={"f1":"f1_1"} # 字段别名,最终会产出 f1_1 的字段名称 )
TrainingSet
当我们要训练模型的时候,首先要构造样本表。样本表是由 label 数据和 特征数据组成。在与 FS 交互时, label 数据需要由客户提供,需要定义要获取的特征名称,然后根据主键进行 point-in-time join( 存在 event_time的情况下)
label_ds = MaxComputeDataSource(data_source_id=offline_datasource_id, table='fs_movie_6_feature_view_ratings_offline') output_ds = MaxComputeDataSource(data_source_id=offline_datasource_id) label_input = LabelInput(label_ds, event_time='rating_time') train_set_output = TrainingSetOutput(output_ds)
feature_view_movie_name = "feature_view_movie" feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actores', 'regions','tags']) feature_view_user_name = 'feature_view_users_1' feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname']) train_set = project.create_training_set(label_input, train_set_output, [feature_movie_selector, feature_user_selector]) print("train_set = ", train_set)
Model
从 offlinestore 的角度讲,我们最终是训练出模型,变成服务进行业务的预测。 那么训练的样本可以从上面的 TrainingSet 获得, 然后就是模型训练,最终会部署成服务。
model_name = "fs_model_movie_rating_3" owner = 'yancheng' deploy_config = EASDeployConfig(ak_id= '',region='',config='') cur_model = project.get_model(model_name) if cur_model is None: cur_model = project.create_model(model_name, owner, train_set, deploy_config) print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)
导出样本表
实际训练的时候,我们需要导出样本表
指定 label 表以及各个 feature view 的分区, event_time
label_partitions = PartitionConfig(name = 'dt', value = '20220831') label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00') movie_partitions = PartitionConfig(name = 'dt', value = '20220830') feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions) user_partitions = PartitionConfig(name = 'ds', value = '20220830') feature_view_user_config = FeatureViewConfig(name = 'feature_view_users_1', partition_config=user_partitions) feature_view_config_list = [feature_view_movie_config, feature_view_user_config] train_set_partitions = PartitionConfig(name = 'dt', value = '20220831') train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)
根据指定的条件,导出样本表
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config) task.wait()
print("task = ", task.task_summary)