PySpark ML (评估器)

简介: PySpark 估计器使用方法介绍

PySpark ML (评估器)

评估器简介

ML中的评估器主要是对于机器学习算法的使用,包括预测、分类、聚类等,本文中会介绍多种模型的使用方式以及使用一些模型来实现简单的案例。

分类

  • LogisticRegression

    逻辑回归(仅支持二分类问题)

  • DecisionTreeClassifier

    决策树

  • GBTClassifier

    提督提升决策树

  • RandomForestClassifier

    随机森林

  • NaiveBayes

    朴素贝叶斯

  • MultilayerPerceptronClassifier

    多层感知器

  • OneVsRest

    将多分类问题简化为二分类问题

回归

  • AFTSurvivalRegression

    加速失效时间回归模型

  • DecisionTreeRegressor

    决策树回归

  • GBTRegressor

    梯度提升决策树回归

  • GeneralizedLinearRegression

    广义线性回归

  • IsotonicRegression

    拟合一个形式自由、非递减的行到数据中。

  • LinearRegression

    线性回归

  • RandomForestRegressor

    随机森林回归(预测)

聚类

  • BisectingKMeans

    二分K均值算法

  • KMeans

    K均值算法

  • GaussianMixture

    高斯混合模型

  • LDA

    LDA模型

评估器应用(分类)

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler

spark = SparkSession.builder.master('local[1]').appName(
    'learn_ml').getOrCreate()

# 载入数据
df0 = spark.read.csv('mushrooms.csv',
                     header=True,
                     inferSchema=True,
                     encoding='utf-8')
# 查看是否有缺失值
df0.toPandas().isna().values.any()
# False 没有缺失值

# 先使用StringIndexer将字符转化为数值,然后将特征整合到一起
old_columns_names = df0.columns
new_columns_names = [name + '-new' for name in old_columns_names]
for i in range(len(old_columns_names)):
    indexer = StringIndexer(inputCol=old_columns_names[i],
                            outputCol=new_columns_names[i])
    df0 = indexer.fit(df0).transform(df0)
vecAss = VectorAssembler(inputCols=new_columns_names[1:], outputCol='features')
df0 = vecAss.transform(df0)
# 更换label列名
df0 = df0.withColumnRenamed(new_columns_names[0], 'label')

# 创建新的只有label和features的表
dfi = df0.select(['label', 'features'])

# 查看数据
# dfi.show(5, truncate=0)

# 将数据集分为训练集和测试集
train_data, test_data = dfi.randomSplit([4.0, 1.0], 100)

blor = LogisticRegression(regParam=0.01)
blorModel = blor.fit(train_data)
result = blorModel.transform(test_data)

# 计算准确率
result.filter(result.label == result.prediction).count() / result.count()
# 0.9661954517516902

评估器应用(预测/回归)

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName('learn_regression').master(
    'local[1]').getOrCreate()
# 数据导入
df_train = spark.read.csv('boston/train.csv',
                          header=True,
                          inferSchema=True,
                          encoding='utf-8')
df_test = spark.read.csv('boston/test.csv',
                         header=True,
                         inferSchema=True,
                         encoding='utf-8')
# 表合并
from pyspark.sql.functions import lit
df_test = df_test.withColumn('medv', lit(22.77))
df0 = df_train.union(df_test).sort('ID')
# df0.show(3)



def feature_converter(df):
    vecAss = VectorAssembler(inputCols=df0.columns[1:-1], outputCol='features')
    df_va = vecAss.transform(df)
    return df_va


# 按照7:3的方式划分训练集和测试集
train_data, test_data = feature_converter(df0).select(
    ['features', 'medv']).randomSplit([7.0, 3.0], 101)

# 选择算法并训练数据
gbt = GBTRegressor(maxIter=10, labelCol='medv', maxDepth=3)
gbt_model = gbt.fit(train_data)
# 对数据进行预测
result = gbt_model.transform(test_data)
# 计算测试数据的均方根误差
gbt_evaluator = RegressionEvaluator(labelCol='medv',
                                    metricName="rmse",
                                    predictionCol='prediction')
rmse = gbt_evaluator.evaluate(result)
print('测试数据的均方根误差(rmse):{}'.format(rmse))
# 测试数据的均方根误差(rmse):5.624145397622545

评估器应用(聚类)

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from plotly.offline import iplot, init_notebook_mode
import plotly.graph_objs as go
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

spark = SparkSession.builder.master('local[1]').appName(
    'learn_cluster').getOrCreate()
# 导入数据
df = spark.read.csv('Mall_Customers.csv', header=True, inferSchema=True)
# 更换列名
df = df.withColumnRenamed('Annual Income (k$)',
                          'Income').withColumnRenamed('Spending Score (1-100)',
                                                      'Spend')
# 查看数据
# df.show(3)

# 查看是否有缺失值
df.toPandas().isna().sum()

#选取特征项,将特征项合并成向量
vecAss = VectorAssembler(inputCols=df.columns[3:], outputCol='features')
df_km = vecAss.transform(df).select('CustomerID', 'features')

# k=5 创建模型
kmeans = KMeans(k=5, seed=1)
km_model = kmeans.fit(df_km)
centers = km_model.clusterCenters()
# 集簇中心点
centers
[
    np.array([55.2962963, 49.51851852]),
    np.array([25.72727273, 79.36363636]),
    np.array([86.53846154, 82.12820513]),
    np.array([88.2, 17.11428571]),
    np.array([26.30434783, 20.91304348])
]

# 获取聚类预测结果
transformed = km_model.transform(df_km).select('CustomerID', 'prediction')

# 合并表格
df_pred = df.join(transformed, 'CustomerID')

# 转化pandas dataframe 然后可视化
pd_df = df_pred.toPandas()
trace = go.Scatter(x=pd_df.Income,
                   y=pd_df.Spend,
                   mode='markers',
                   marker={
                       'size': 10,
                       'color': pd_df.prediction,
                       'colorscale': 'Viridis'
                   })
iplot([trace])

在这里插入图片描述

相关文章
|
7月前
|
SQL 分布式计算 大数据
PySpark
【6月更文挑战第15天】PySpark
92 6
|
8月前
|
分布式计算 Java Shell
PySpark部署安装
PySpark部署安装
218 0
|
存储 分布式计算 资源调度
Spark笔记(pyspark)1
Spark笔记(pyspark)
122 0
|
分布式计算 资源调度 Java
Spark笔记(pyspark)2
Spark笔记(pyspark)
128 0
|
机器学习/深度学习 数据采集 存储
初探 Spark ML 第一部分
初探 Spark ML 第一部分
126 1
|
SQL 机器学习/深度学习 分布式计算
spark与pyspark教程(一)
spark与pyspark教程(一)
427 0
|
存储 分布式计算 Spark
PySpark|RDD编程基础
PySpark数据结构RDD编程基础
PySpark|RDD编程基础
|
机器学习/深度学习 人工智能 分布式计算
PySpark数据分析基础:PySpark原理详解
PySpark数据分析基础:PySpark原理详解
436 1
PySpark数据分析基础:PySpark原理详解
|
机器学习/深度学习 数据采集 分布式计算
PySpark ML(转换器)
PySpark转换器详解
|
分布式计算 Spark
Spark:pyspark的WordCount实现
Spark:pyspark的WordCount实现
182 0
Spark:pyspark的WordCount实现