PySpark ML (评估器)

简介: PySpark 估计器使用方法介绍

PySpark ML (评估器)

评估器简介

ML中的评估器主要是对于机器学习算法的使用,包括预测、分类、聚类等,本文中会介绍多种模型的使用方式以及使用一些模型来实现简单的案例。

分类

  • LogisticRegression

    逻辑回归(仅支持二分类问题)

  • DecisionTreeClassifier

    决策树

  • GBTClassifier

    提督提升决策树

  • RandomForestClassifier

    随机森林

  • NaiveBayes

    朴素贝叶斯

  • MultilayerPerceptronClassifier

    多层感知器

  • OneVsRest

    将多分类问题简化为二分类问题

回归

  • AFTSurvivalRegression

    加速失效时间回归模型

  • DecisionTreeRegressor

    决策树回归

  • GBTRegressor

    梯度提升决策树回归

  • GeneralizedLinearRegression

    广义线性回归

  • IsotonicRegression

    拟合一个形式自由、非递减的行到数据中。

  • LinearRegression

    线性回归

  • RandomForestRegressor

    随机森林回归(预测)

聚类

  • BisectingKMeans

    二分K均值算法

  • KMeans

    K均值算法

  • GaussianMixture

    高斯混合模型

  • LDA

    LDA模型

评估器应用(分类)

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler

spark = SparkSession.builder.master('local[1]').appName(
    'learn_ml').getOrCreate()

# 载入数据
df0 = spark.read.csv('mushrooms.csv',
                     header=True,
                     inferSchema=True,
                     encoding='utf-8')
# 查看是否有缺失值
df0.toPandas().isna().values.any()
# False 没有缺失值

# 先使用StringIndexer将字符转化为数值,然后将特征整合到一起
old_columns_names = df0.columns
new_columns_names = [name + '-new' for name in old_columns_names]
for i in range(len(old_columns_names)):
    indexer = StringIndexer(inputCol=old_columns_names[i],
                            outputCol=new_columns_names[i])
    df0 = indexer.fit(df0).transform(df0)
vecAss = VectorAssembler(inputCols=new_columns_names[1:], outputCol='features')
df0 = vecAss.transform(df0)
# 更换label列名
df0 = df0.withColumnRenamed(new_columns_names[0], 'label')

# 创建新的只有label和features的表
dfi = df0.select(['label', 'features'])

# 查看数据
# dfi.show(5, truncate=0)

# 将数据集分为训练集和测试集
train_data, test_data = dfi.randomSplit([4.0, 1.0], 100)

blor = LogisticRegression(regParam=0.01)
blorModel = blor.fit(train_data)
result = blorModel.transform(test_data)

# 计算准确率
result.filter(result.label == result.prediction).count() / result.count()
# 0.9661954517516902

评估器应用(预测/回归)

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName('learn_regression').master(
    'local[1]').getOrCreate()
# 数据导入
df_train = spark.read.csv('boston/train.csv',
                          header=True,
                          inferSchema=True,
                          encoding='utf-8')
df_test = spark.read.csv('boston/test.csv',
                         header=True,
                         inferSchema=True,
                         encoding='utf-8')
# 表合并
from pyspark.sql.functions import lit
df_test = df_test.withColumn('medv', lit(22.77))
df0 = df_train.union(df_test).sort('ID')
# df0.show(3)



def feature_converter(df):
    vecAss = VectorAssembler(inputCols=df0.columns[1:-1], outputCol='features')
    df_va = vecAss.transform(df)
    return df_va


# 按照7:3的方式划分训练集和测试集
train_data, test_data = feature_converter(df0).select(
    ['features', 'medv']).randomSplit([7.0, 3.0], 101)

# 选择算法并训练数据
gbt = GBTRegressor(maxIter=10, labelCol='medv', maxDepth=3)
gbt_model = gbt.fit(train_data)
# 对数据进行预测
result = gbt_model.transform(test_data)
# 计算测试数据的均方根误差
gbt_evaluator = RegressionEvaluator(labelCol='medv',
                                    metricName="rmse",
                                    predictionCol='prediction')
rmse = gbt_evaluator.evaluate(result)
print('测试数据的均方根误差(rmse):{}'.format(rmse))
# 测试数据的均方根误差(rmse):5.624145397622545

评估器应用(聚类)

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from plotly.offline import iplot, init_notebook_mode
import plotly.graph_objs as go
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

spark = SparkSession.builder.master('local[1]').appName(
    'learn_cluster').getOrCreate()
# 导入数据
df = spark.read.csv('Mall_Customers.csv', header=True, inferSchema=True)
# 更换列名
df = df.withColumnRenamed('Annual Income (k$)',
                          'Income').withColumnRenamed('Spending Score (1-100)',
                                                      'Spend')
# 查看数据
# df.show(3)

# 查看是否有缺失值
df.toPandas().isna().sum()

#选取特征项,将特征项合并成向量
vecAss = VectorAssembler(inputCols=df.columns[3:], outputCol='features')
df_km = vecAss.transform(df).select('CustomerID', 'features')

# k=5 创建模型
kmeans = KMeans(k=5, seed=1)
km_model = kmeans.fit(df_km)
centers = km_model.clusterCenters()
# 集簇中心点
centers
[
    np.array([55.2962963, 49.51851852]),
    np.array([25.72727273, 79.36363636]),
    np.array([86.53846154, 82.12820513]),
    np.array([88.2, 17.11428571]),
    np.array([26.30434783, 20.91304348])
]

# 获取聚类预测结果
transformed = km_model.transform(df_km).select('CustomerID', 'prediction')

# 合并表格
df_pred = df.join(transformed, 'CustomerID')

# 转化pandas dataframe 然后可视化
pd_df = df_pred.toPandas()
trace = go.Scatter(x=pd_df.Income,
                   y=pd_df.Spend,
                   mode='markers',
                   marker={
                       'size': 10,
                       'color': pd_df.prediction,
                       'colorscale': 'Viridis'
                   })
iplot([trace])

在这里插入图片描述

相关文章
|
3月前
|
分布式计算 API Apache
Dask与Apache Spark的对比
【8月更文挑战第10天】随着数据量激增,高效处理成为关键。本文对比了Python领域的两大工具——Dask与Apache Spark。Dask提供类似NumPy和Pandas的API,适用于中小规模数据;而Spark作为内存型处理引擎,擅长超大规模数据处理。我们通过代码实例展示了两者的使用方式,并分析了它们在性能、API及生态系统方面的异同。无论您追求易用性还是高性能,都能从中找到合适的选择。
95 4
|
5月前
|
SQL 分布式计算 大数据
PySpark
【6月更文挑战第15天】PySpark
64 6
|
6月前
|
分布式计算 Java Shell
PySpark部署安装
PySpark部署安装
170 0
|
分布式计算 资源调度 Java
Spark笔记(pyspark)2
Spark笔记(pyspark)
114 0
|
存储 分布式计算 资源调度
Spark笔记(pyspark)1
Spark笔记(pyspark)
107 0
|
机器学习/深度学习 数据采集 存储
初探 Spark ML 第一部分
初探 Spark ML 第一部分
121 1
|
SQL 机器学习/深度学习 分布式计算
spark与pyspark教程(一)
spark与pyspark教程(一)
401 0
|
机器学习/深度学习 人工智能 分布式计算
PySpark数据分析基础:PySpark原理详解
PySpark数据分析基础:PySpark原理详解
418 1
PySpark数据分析基础:PySpark原理详解
|
机器学习/深度学习 数据采集 分布式计算
PySpark ML(转换器)
PySpark转换器详解
|
分布式计算 Spark
Spark:pyspark的WordCount实现
Spark:pyspark的WordCount实现
172 0
Spark:pyspark的WordCount实现