一、使用sklearn数据挖掘
1.数据挖掘的步骤数据挖掘通常包括数据采集,数据分析,特征工程,训练模型,模型评估等步骤。显然,这不是巧合,这正是sklearn的设计风格。我们能够更加优雅地使用sklearn进行特征工程和模型训练工作。此时,不妨从一个基本的数据挖掘场景入手:
我们使用sklearn进行虚线框内的工作(sklearn也可以进行文本特征提取)。通过分析sklearn源码,我们可以看到除训练,预测和评估以外,处理其他工作的类都实现了3个方法:fit、transform和fit_transform。从命名中可以看到,fit_transform方法是先调用fit然后调用transform,我们只需要关注fit方法和transform方法即可。transform方法主要用来对特征进行转换。从可利用信息的角度来说,转换分为无信息转换和有信息转换。无信息转换是指不利用任何其他信息进行转换,比如指数、对数函数转换等。有信息转换从是否利用目标值向量又可分为无监督转换和有监督转换。无监督转换指只利用特征的统计信息的转换,统计信息包括均值、标准差、边界等等,比如标准化、PCA法降维等。有监督转换指既利用了特征信息又利用了目标值信息的转换,比如通过模型选择特征、LDA法降维等。通过总结常用的转换类,我们得到下表: 不难看到,只有有信息的转换类的fit方法才实际有用,显然fit方法的主要工作是获取特征信息和目标值信息,在这点上,fit方法和模型训练时的fit方法就能够联系在一起了:都是通过分析特征和目标值,提取有价值的信息,对于转换类来说是某些统计量,对于模型来说可能是特征的权值系数等。
另外,只有有监督的转换类的fit和transform方法才需要特征和目标值两个参数。fit方法无用不代表其没实现,而是除合法性校验以外,其并没有对特征和目标值进行任何处理,Normalizer的fit方法实现如下:
def fit(self, X, y=None): """Do nothing and return the estimator unchanged This method is just there to implement the usual API and hence work in pipelines.""" X = check_array(X, accept_sparse='csr') return self
基于这些特征处理工作都有共同的方法,那么试想可不可以将他们组合在一起?在本文假设的场景中,我们可以看到这些工作的组合形式有两种:流水线式和并行式。基于流水线组合的工作需要依次进行,前一个工作的输出是后一个工作的输入;基于并行式的工作可以同时进行,其使用同样的输入,所有工作完成后将各自的输出合并之后输出。sklearn提供了包pipeline来完成流水线式和并行式的工作。2. 数据初貌
在此,我们仍然使用IRIS数据集来进行说明。为了适应提出的场景,对原数据集需要稍微加工:
ffrom numpy import hstack, vstack, array, median, nan from numpy.random import choice from sklearn.datasets import load_iris iris = load_iris() #特征矩阵加工 #使用vstack增加一行含缺失值的样本(nan, nan, nan, nan) #使用hstack增加一列表示花的颜色(0-白、1-黄、2-红),花的颜色是随机的,意味着颜色并不影响花的分类 iris.data = hstack((choice([0, 1, 2], size=iris.data.shape[0]+1).reshape(-1,1), vstack((iris.data, array([nan, nan, nan, nan]).reshape(1,-1))))) #目标值向量加工 #增加一个目标值,对应含缺失值的样本,值为众数 iris.target = hstack((iris.target, array([median(iris.target)])))
3.关键技术
并行处理,流水线处理,自动化调参,持久化是使用sklearn优雅地进行数据挖掘的核心。并行处理和流水线处理将多个特征处理工作,甚至包括模型训练工作组合成一个工作(从代码的角度来说,即将多个对象组合成了一个对象)。
在组合的前提下,自动化调参技术帮我们省去了人工调参的反锁。训练好的模型是贮存在内存中的数据,持久化能够将这些数据保存在文件系统中,之后使用时无需再进行训练,直接从文件系统中加载即可。
二、并行处理
并行处理使得多个特征处理工作能够并行地进行。根据对特征矩阵的读取方式不同,可分为整体并行处理和部分并行处理。整体并行处理,即并行处理的每个工作的输入都是特征矩阵的整体;部分并行处理,即可定义每个工作需要输入的特征矩阵的列。1.整体并行处理
pipeline包提供了FeatureUnion类来进行整体并行处理:
from numpy import log1p from sklearn.preprocessing import FunctionTransformer from sklearn.preprocessing import Binarizer from sklearn.pipeline import FeatureUnion #新建将整体特征矩阵进行对数函数转换的对象 step2_1 = ('ToLog', FunctionTransformer(log1p)) #新建将整体特征矩阵进行二值化类的对象 step2_2 = ('ToBinary', Binarizer()) #新建整体并行处理对象 #该对象也有fit和transform方法,fit和transform方法均是并行地调用需要并行处理的对象的fit和transform方法 #参数transformer_list为需要并行处理的对象列表,该列表为二元组列表,第一元为对象的名称,第二元为对象 step2 = ('FeatureUnion', FeatureUnion(transformer_list=[step2_1, step2_2])) 2.部分并行处理整体并行处理有其缺陷,在一些场景下,我们只需要对特征矩阵的某些列进行转换,而不是所有列。pipeline并没有提供相应的类(仅OneHotEncoder类实现了该功能),需要我们在FeatureUnion的基础上进行优化: from sklearn.pipeline import FeatureUnion, _fit_one_transformer, _fit_transform_one, _transform_one from sklearn.externals.joblib import Parallel, delayed from scipy import sparse import numpy as np #部分并行处理,继承FeatureUnion class FeatureUnionExt(FeatureUnion): #相比FeatureUnion,多了idx_list参数,其表示每个并行工作需要读取的特征矩阵的列 def __init__(self, transformer_list, idx_list, n_jobs=1, transformer_weights=None): self.idx_list = idx_list FeatureUnion.__init__(self, transformer_list=map(lambda trans:(trans[0], trans[1]), transformer_list), n_jobs=n_jobs, transformer_weights=transformer_weights) #由于只部分读取特征矩阵,方法fit需要重构 def fit(self, X, y=None): transformer_idx_list = map(lambda trans, idx:(trans[0], trans[1], idx), self.transformer_list, self.idx_list) transformers = Parallel(n_jobs=self.n_jobs)( #从特征矩阵中提取部分输入fit方法 delayed(_fit_one_transformer)(trans, X[:,idx], y) for name, trans, idx in transformer_idx_list) self._update_transformer_list(transformers) return self #由于只部分读取特征矩阵,方法fit_transform需要重构 def fit_transform(self, X, y=None, **fit_params): transformer_idx_list = map(lambda trans, idx:(trans[0], trans[1], idx), self.transformer_list, self.idx_list) result = Parallel(n_jobs=self.n_jobs)( #从特征矩阵中提取部分输入fit_transform方法 delayed(_fit_transform_one)(trans, name, X[:,idx], y, self.transformer_weights, **fit_params) for name, trans, idx in transformer_idx_list) Xs, transformers = zip(*result) self._update_transformer_list(transformers) if any(sparse.issparse(f) for f in Xs): Xs = sparse.hstack(Xs).tocsr() else: Xs = np.hstack(Xs) return Xs #由于只部分读取特征矩阵,方法transform需要重构 def transform(self, X): transformer_idx_list = map(lambda trans, idx:(trans[0], trans[1], idx), self.transformer_list, self.idx_list) Xs = Parallel(n_jobs=self.n_jobs)( #从特征矩阵中提取部分输入transform方法 delayed(_transform_one)(trans, name, X[:,idx], self.transformer_weights) for name, trans, idx in transformer_idx_list) if any(sparse.issparse(f) for f in Xs): Xs = sparse.hstack(Xs).tocsr() else: Xs = np.hstack(Xs) return Xs
在本文提出的场景中,我们对特征矩阵的第1列(花的颜色)进行定性特征编码,对第2、3、4列进行对数函数转换,对第5列进行定量特征二值化处理。使用FeatureUnionExt类进行部分并行处理的代码如下:
from numpy import log1p from sklearn.preprocessing import OneHotEncoder from sklearn.preprocessing import FunctionTransformer from sklearn.preprocessing import Binarizer #新建将部分特征矩阵进行独热编码的对象 step2_1 = ('OneHotEncoder', OneHotEncoder(sparse=False)) #新建将部分特征矩阵进行对数函数转换的对象 step2_2 = ('ToLog', FunctionTransformer(log1p)) #新建将部分特征矩阵进行二值化类的对象 step2_3 = ('ToBinary', Binarizer()) #新建部分并行处理对象 #参数transformer_list为需要并行处理的对象列表,该列表为二元组列表,第一元为对象的名称,第二元为对象 #参数idx_list为相应的需要读取的特征矩阵的列 step2 = ('FeatureUnionExt', FeatureUnionExt(transformer_list=[step2_1, step2_2, step2_3], idx_list=[[0], [1, 2, 3], [4]]))
三、流水线
pipeline包提供了Pipeline类来进行流水线处理。流水线上除最后一个工作以外,其他都要执行fit_transform方法,且上一个工作输出作为下一个工作的输入。最后一个工作必须实现fit方法,输入为上一个工作的输出;但是不限定一定有transform方法,因为流水线的最后一个工作可能是训练!根据本文提出的场景,结合并行处理,构建完整的流水线的代码如下:
from numpy import log1p from sklearn.preprocessing import Imputer from sklearn.preprocessing import OneHotEncoder from sklearn.preprocessing import FunctionTransformer from sklearn.preprocessing import Binarizer from sklearn.preprocessing import MinMaxScaler from sklearn.feature_selection import SelectKBest from sklearn.feature_selection import chi2 from sklearn.decomposition import PCA from sklearn.linear_model import LogisticRegression from sklearn.pipeline import Pipeline #新建计算缺失值的对象 step1 = ('Imputer', Imputer()) #新建将部分特征矩阵进行定性特征编码的对象 step2_1 = ('OneHotEncoder', OneHotEncoder(sparse=False)) #新建将部分特征矩阵进行对数函数转换的对象 step2_2 = ('ToLog', FunctionTransformer(log1p)) #新建将部分特征矩阵进行二值化类的对象 step2_3 = ('ToBinary', Binarizer()) #新建部分并行处理对象,返回值为每个并行工作的输出的合并 step2 = ('FeatureUnionExt', FeatureUnionExt(transformer_list=[step2_1, step2_2, step2_3], idx_list=[[0], [1, 2, 3], [4]])) #新建无量纲化对象 step3 = ('MinMaxScaler', MinMaxScaler()) #新建卡方校验选择特征的对象 step4 = ('SelectKBest', SelectKBest(chi2, k=3)) #新建PCA降维的对象 step5 = ('PCA', PCA(n_components=2)) #新建逻辑回归的对象,其为待训练的模型作为流水线的最后一步 step6 = ('LogisticRegression', LogisticRegression(penalty='l2')) #新建流水线处理对象 #参数steps为需要流水线处理的对象列表,该列表为二元组列表,第一元为对象的名称,第二元为对象 pipeline = Pipeline(steps=[step1, step2, step3, step4, step5, step6])
四、自动化调参
网格搜索为自动化调参的常见技术之一,grid_search包提供了自动化调参的工具,包括GridSearchCV类。对组合好的对象进行训练以及调参的代码如下:
from sklearn.grid_search import GridSearchCV iris = load_iris() #新建网格搜索对象 #第一参数为待训练的模型 #param_grid为待调参数组成的网格,字典格式,键为参数名称(格式“对象名称__子对象名称__参数名称”),值为可取的参数值列表 grid_search = GridSearchCV(pipeline, param_grid={'FeatureUnionExt__ToBinary__threshold':[1.0, 2.0, 3.0, 4.0], 'LogisticRegression__C':[0.1, 0.2, 0.4, 0.8]}) #训练以及调参 grid_search.fit(iris.data, iris.target)
五、持久化
externals.joblib包提供了dump和load方法来持久化和加载内存数据: #持久化数据 #第一个参数为内存中的对象 #第二个参数为保存在文件系统中的名称 #第三个参数为压缩级别,0为不压缩,3为合适的压缩级别 dump(grid_search, 'grid_search.dmp', compress=3) #从文件系统中加载数据到内存中 grid_search = load('grid_search.dmp')
⭐回顾
注意:组合和持久化都会涉及pickle技术,在sklearn的技术文档中有说明,将lambda定义的函数作为FunctionTransformer的自定义转换函数将不能pickle化。