直接使用
请打开Gbdt-FM模型,并点击右上角 “ 在DSW中打开” 。
Gbdt + FM 一体化模型训练及服务部署
GBDT+FM 模型是由 Gbdt+LR 延伸出来的模型。该模型利用GBDT自动进行特征筛选和组合,进而生成新的离散特征向量,再把该特征向量当做 FM 模型的输入,来产生最后的预测结果。该模型能够综合利用用户、物品和上下文等多种不同的特征,生成较为全面的推荐,在CTR点击率预估场景下使用较为广泛。 本文将介绍如何基于DSW使用 Alink 快速的构建 Gbdt+FM 模型,并且会介绍如何方便的将建立的模型部署成服务。
运行环境要求
1. PAI-DSW 官方镜像中默认已经安装了 PyAlink,内存要求 4G 及以上。 2. 本 Notebook 的内容可以直接运行查看,不需要准备任何其他文件。
from pyalink.alink import * useLocalEnv(2)
Use one of the following commands to start using PyAlink: - useLocalEnv(parallelism, flinkHome=None, config=None): run PyAlink scripts locally. - useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", config=None): run PyAlink scripts on a Flink cluster. - getMLEnv(): run PyAlink scripts as PyFlink scripts, support 'flink run -py xxx.py'. Call resetEnv() to reset environment and switch to another. JVM listening on 127.0.0.1:58007
MLEnv(benv=<pyflink.dataset.execution_environment.ExecutionEnvironment object at 0x7fdd965a0ed0>, btenv=<pyflink.table.table_environment.BatchTableEnvironment object at 0x7fdd965a0ad0>, senv=<pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment object at 0x7fdd965a0b50>, stenv=<pyflink.table.table_environment.StreamTableEnvironment object at 0x7fdd965cc050>)
扩展到更大规模的数据
在这个示例中,我们使用 useLocalEnv 在本地(也就是 DSW 的 container 内)运行 Alink 作业,使用多线程的方式模拟分布式计算。
对于更大规模的数据,可以使用 usePAIEnv 向大规模集群提交作业,详细使用可以通过 help(usePAIEnv) 查看。
数据准备
Adult 数据来源 https://archive.ics.uci.edu/ml/datasets/Adult
算法相关文档:
Adult数据集(即“人口普查收入”数据集),由美国人口普查数据集库 抽取而来,其中共包含48842条记录,年收入大于50k美元的占比23.93%,年收入小于50k美元的占比76.07%,并且已经划分为训练数据32561条和测试数据16281条。 该数据集类变量为年收入是否超过50k美元,属性变量包括年龄、工种、学历、职业等 14类重要信息,其中有8类属于类别离散型变量,另外6类属于数值连续型变量。该数据集是一个分类数据集,用来预测年收入是否超过50k美元。
PATH = "https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/" TRAIN_FILE = "adult_train.csv" TEST_FILE = "adult_test.csv" SCHEMA_STRING = "age bigint, workclass string, fnlwgt bigint, education string, education_num bigint,"\ + " marital_status string, occupation string, relationship string, race string, sex string, "\ + "capital_gain bigint, capital_loss bigint, hours_per_week bigint, native_country string, label string" trainData = CsvSourceBatchOp() \ .setFilePath(PATH + TRAIN_FILE) \ .setFieldDelimiter(",") \ .setSchemaStr(SCHEMA_STRING) testData = CsvSourceBatchOp() \ .setFilePath(PATH + TEST_FILE) \ .setFieldDelimiter(",") \ .setSchemaStr(SCHEMA_STRING) trainData.lazyPrint(5) BatchOperator.execute()
age | workclass | fnlwgt | education | education_num | marital_status | occupation | relationship | race | sex | capital_gain | capital_loss | hours_per_week | native_country | label | |
0 | 51 | Private | 166934 | HS-grad | 9 | Married-civ-spouse | Machine-op-inspct | Husband | White | Male | 0 | 0 | 40 | United-States | >50K |
1 | 80 | Self-emp-not-inc | 26865 | 7th-8th | 4 | Never-married | Farming-fishing | Unmarried | White | Male | 0 | 0 | 20 | United-States | <=50K |
2 | 24 | Private | 227594 | Some-college | 10 | Never-married | Sales | Own-child | White | Female | 0 | 0 | 20 | United-States | <=50K |
3 | 50 | Private | 93690 | HS-grad | 9 | Married-civ-spouse | Transport-moving | Husband | White | Male | 0 | 0 | 40 | United-States | >50K |
4 | 35 | Local-gov | 226311 | Some-college | 10 | Divorced | Adm-clerical | Own-child | White | Female | 0 | 0 | 38 | United-States | <=50K |
训练模型
算法相关文档:
- https://www.yuque.com/pinshu/alink_doc/intro
- https://www.yuque.com/pinshu/alink_doc/gbdtencoder
- https://www.yuque.com/pinshu/alink_doc/fmclassifier
我们通过将 GbdtEncoder 和 FM 这两个算子放到一个Pipeline的方式完成模型的一体化训练。这里是用GbdtEncoder对输入的数据进行编码,并将编码的结果输送给FM进行训练。最终我们得到一个pipeline model,这个模型可以用来对数据进行推理,也可以部署成服务。
featureCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week", "workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"] numericalCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"] label = "label" vecCol = "vec" gbdtFmPipe = Pipeline() \ .add( GbdtEncoder()\ .setLabelCol(label)\ .setFeatureCols(featureCols)\ .setReservedCols([label])\ .setPredictionCol(vecCol))\ .add( FmClassifier() \ .setVectorCol(vecCol) \ .setLabelCol(label) \ .setReservedCols([label]) \ .setPredictionDetailCol("detail") \ .setPredictionCol("pred")) model = gbdtFmPipe.fit(trainData)
模型评估
算法相关文档:
- https://www.yuque.com/pinshu/alink_doc/evalbinaryclassbatchop
- https://www.yuque.com/pinshu/alink_doc/jsonvaluebatchop
模型评估阶段,我们先试用上面训练好的模型对testData进行推理,然后用评估组件EvalBinaryClassBatchOp对推理结果进行评估,最后使用JsonValueBatchOp组件完成评估结果的抽取。
result = model.transform(testData) result.lazyPrint(5) EvalBinaryClassBatchOp() \ .setPredictionDetailCol("detail").setLabelCol(label).linkFrom(result) \ .link(JsonValueBatchOp().setSelectedCol("Data") \ .setReservedCols([]) \ .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"]) \ .setJsonPath(["$.Accuracy", "$.AUC", "ConfusionMatrix"])).print()
label | pred | detail | |
0 | <=50K | <=50K | {"<=50K":"0.9995387333228002",">50K":"4.612666771997954E-4"} |
1 | >50K | >50K | {"<=50K":"1.812868328410211E-5",">50K":"0.9999818713167159"} |
2 | <=50K | <=50K | {"<=50K":"0.999317570963927",">50K":"6.824290360729248E-4"} |
3 | <=50K | <=50K | {"<=50K":"0.9691711111422066",">50K":"0.030828888857793415"} |
4 | <=50K | <=50K | {"<=50K":"0.9650003558051857",">50K":"0.03499964419481427"} |
Accuracy | AUC | ConfusionMatrix | |
0 | 0.851852 | 0.904172 | [[2251,817],[1595,11618]] |
与 Gbdt+LR 效果对比
算法相关文档:
- https://www.yuque.com/pinshu/alink_doc/evalbinaryclassbatchop
- https://www.yuque.com/pinshu/alink_doc/jsonvaluebatchop
- https://www.yuque.com/pinshu/alink_doc/logisticregression
- https://www.yuque.com/pinshu/alink_doc/gbdtencoder
通过对比可以看到,Gbdt+FM 的效果要比 Gbdt+LR 好一些,对于同一个数据,AUC 大概高0.7个百分点。
gbdtLrPipe = Pipeline() \ .add( GbdtEncoder()\ .setLabelCol(label)\ .setFeatureCols(featureCols)\ .setReservedCols([label])\ .setPredictionCol(vecCol))\ .add( LogisticRegression() \ .setVectorCol(vecCol) \ .setLabelCol(label) \ .setReservedCols([label]) \ .setPredictionDetailCol("detail") \ .setPredictionCol("pred")) lrModel = gbdtLrPipe.fit(trainData) resultLr = lrModel.transform(testData) resultFm = model.transform(testData) EvalBinaryClassBatchOp() \ .setPredictionDetailCol("detail").setLabelCol(label).linkFrom(resultLr) \ .link(JsonValueBatchOp().setSelectedCol("Data") \ .setReservedCols([]) \ .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"]) \ .setJsonPath(["$.Accuracy", "$.AUC", "ConfusionMatrix"])).print() EvalBinaryClassBatchOp() \ .setPredictionDetailCol("detail").setLabelCol(label).linkFrom(resultFm) \ .link(JsonValueBatchOp().setSelectedCol("Data") \ .setReservedCols([]) \ .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"]) \ .setJsonPath(["$.Accuracy", "$.AUC", "ConfusionMatrix"])).print()
Accuracy | AUC | ConfusionMatrix | |
0 | 0.847 | 0.89727 | [[2432,1077],[1414,11358]] |
Accuracy | AUC | ConfusionMatrix | |
0 | 0.851852 | 0.904172 | [[2251,817],[1595,11618]] |
模型写出
算法相关文档:
模型写出阶段,我们使用AkSinkBatchOp将模型写出到文件系统,这里的文件系统可以是本地文件系统(如代码所示),也可以时网络文件系统(比如OSS),可以通过代码:
fs = OssFileSystem("3.4.1", "oss-cn-hangzhou-zmf.aliyuncs.com", "name", "************", "**********") filePath = FilePath("/model/gbdt_fm_model.ak", fs)
完成网络文件系统路径的构建,将这个路径以参数的方式塞给AkSinkBatchOp组件:
AkSinkBatchOp().setFilePath(filePath).setOverwriteSink(True)
便可以完成将模型写出待网络文件系统的目的。
modelData = model.save(); filePath = "/tmp/gbdt_fm_model.ak" # 可以将模型文件写出到OSS,这样可以直接部署到EAS,需要一个OSS的idkey。此处直接写出到/tmp 目录下 # fs = OssFileSystem("3.4.1", "oss-cn-hangzhou-zmf.aliyuncs.com", "name", "************", "**********") # filePath = FilePath("/model/gbdt_fm_model.ak", fs) modelData.link(AkSinkBatchOp().setFilePath(filePath).setOverwriteSink(True)); BatchOperator.execute();
加载模型并推理
这里加载模型的路径和模型写出时一样,可以是本地文件系统(如代码所示),也可以时网络文件系统(比如OSS)。
model = PipelineModel.load(filePath) result = model.transform(testData).lazyPrint(5) BatchOperator.execute()
label | pred | detail | |
0 | <=50K | <=50K | {"<=50K":"0.9995387333228002",">50K":"4.612666771997954E-4"} |
1 | >50K | >50K | {"<=50K":"1.812868328410211E-5",">50K":"0.9999818713167159"} |
2 | <=50K | <=50K | {"<=50K":"0.999317570963927",">50K":"6.824290360729248E-4"} |
3 | <=50K | <=50K | {"<=50K":"0.9691711111422066",">50K":"0.030828888857793415"} |
4 | <=50K | <=50K | {"<=50K":"0.9650003558051857",">50K":"0.03499964419481427"} |
模型部署
模型部署可以使用命令行部署:
!./eascmd64 -i {EAS AccessKeyId} -k {EAS AccessKeySecret} -e pai-eas.cn-beijing.aliyuncs.com create config.json
也可以通过阿里云PAI的交互界面,通过填写若干参数,一键部署,具体细节可以参见文档: