PySpark ML (评估器)
评估器简介
ML中的评估器主要是对于机器学习算法的使用,包括预测、分类、聚类等,本文中会介绍多种模型的使用方式以及使用一些模型来实现简单的案例。
分类
- LogisticRegression
逻辑回归(仅支持二分类问题)
- DecisionTreeClassifier
决策树
- GBTClassifier
提督提升决策树
- RandomForestClassifier
随机森林
- NaiveBayes
朴素贝叶斯
- MultilayerPerceptronClassifier
多层感知器
- OneVsRest
将多分类问题简化为二分类问题
回归
- AFTSurvivalRegression
加速失效时间回归模型
- DecisionTreeRegressor
决策树回归
- GBTRegressor
梯度提升决策树回归
- GeneralizedLinearRegression
广义线性回归
- IsotonicRegression
拟合一个形式自由、非递减的行到数据中。
- LinearRegression
线性回归
- RandomForestRegressor
随机森林回归(预测)
聚类
- BisectingKMeans
二分K均值算法
- KMeans
K均值算法
- GaussianMixture
高斯混合模型
- LDA
LDA模型
评估器应用(分类)
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler
spark = SparkSession.builder.master('local[1]').appName(
'learn_ml').getOrCreate()
# 载入数据
df0 = spark.read.csv('mushrooms.csv',
header=True,
inferSchema=True,
encoding='utf-8')
# 查看是否有缺失值
df0.toPandas().isna().values.any()
# False 没有缺失值
# 先使用StringIndexer将字符转化为数值,然后将特征整合到一起
old_columns_names = df0.columns
new_columns_names = [name + '-new' for name in old_columns_names]
for i in range(len(old_columns_names)):
indexer = StringIndexer(inputCol=old_columns_names[i],
outputCol=new_columns_names[i])
df0 = indexer.fit(df0).transform(df0)
vecAss = VectorAssembler(inputCols=new_columns_names[1:], outputCol='features')
df0 = vecAss.transform(df0)
# 更换label列名
df0 = df0.withColumnRenamed(new_columns_names[0], 'label')
# 创建新的只有label和features的表
dfi = df0.select(['label', 'features'])
# 查看数据
# dfi.show(5, truncate=0)
# 将数据集分为训练集和测试集
train_data, test_data = dfi.randomSplit([4.0, 1.0], 100)
blor = LogisticRegression(regParam=0.01)
blorModel = blor.fit(train_data)
result = blorModel.transform(test_data)
# 计算准确率
result.filter(result.label == result.prediction).count() / result.count()
# 0.9661954517516902
评估器应用(预测/回归)
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
spark = SparkSession.builder.appName('learn_regression').master(
'local[1]').getOrCreate()
# 数据导入
df_train = spark.read.csv('boston/train.csv',
header=True,
inferSchema=True,
encoding='utf-8')
df_test = spark.read.csv('boston/test.csv',
header=True,
inferSchema=True,
encoding='utf-8')
# 表合并
from pyspark.sql.functions import lit
df_test = df_test.withColumn('medv', lit(22.77))
df0 = df_train.union(df_test).sort('ID')
# df0.show(3)
def feature_converter(df):
vecAss = VectorAssembler(inputCols=df0.columns[1:-1], outputCol='features')
df_va = vecAss.transform(df)
return df_va
# 按照7:3的方式划分训练集和测试集
train_data, test_data = feature_converter(df0).select(
['features', 'medv']).randomSplit([7.0, 3.0], 101)
# 选择算法并训练数据
gbt = GBTRegressor(maxIter=10, labelCol='medv', maxDepth=3)
gbt_model = gbt.fit(train_data)
# 对数据进行预测
result = gbt_model.transform(test_data)
# 计算测试数据的均方根误差
gbt_evaluator = RegressionEvaluator(labelCol='medv',
metricName="rmse",
predictionCol='prediction')
rmse = gbt_evaluator.evaluate(result)
print('测试数据的均方根误差(rmse):{}'.format(rmse))
# 测试数据的均方根误差(rmse):5.624145397622545
评估器应用(聚类)
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from plotly.offline import iplot, init_notebook_mode
import plotly.graph_objs as go
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
spark = SparkSession.builder.master('local[1]').appName(
'learn_cluster').getOrCreate()
# 导入数据
df = spark.read.csv('Mall_Customers.csv', header=True, inferSchema=True)
# 更换列名
df = df.withColumnRenamed('Annual Income (k$)',
'Income').withColumnRenamed('Spending Score (1-100)',
'Spend')
# 查看数据
# df.show(3)
# 查看是否有缺失值
df.toPandas().isna().sum()
#选取特征项,将特征项合并成向量
vecAss = VectorAssembler(inputCols=df.columns[3:], outputCol='features')
df_km = vecAss.transform(df).select('CustomerID', 'features')
# k=5 创建模型
kmeans = KMeans(k=5, seed=1)
km_model = kmeans.fit(df_km)
centers = km_model.clusterCenters()
# 集簇中心点
centers
[
np.array([55.2962963, 49.51851852]),
np.array([25.72727273, 79.36363636]),
np.array([86.53846154, 82.12820513]),
np.array([88.2, 17.11428571]),
np.array([26.30434783, 20.91304348])
]
# 获取聚类预测结果
transformed = km_model.transform(df_km).select('CustomerID', 'prediction')
# 合并表格
df_pred = df.join(transformed, 'CustomerID')
# 转化pandas dataframe 然后可视化
pd_df = df_pred.toPandas()
trace = go.Scatter(x=pd_df.Income,
y=pd_df.Spend,
mode='markers',
marker={
'size': 10,
'color': pd_df.prediction,
'colorscale': 'Viridis'
})
iplot([trace])