SparkML机器学习实战:应用回归算法,预测二手房

简介: SparkML机器学习实战:应用回归算法,预测二手房

一、业务场景

受某房产中介委托,需开发一套机器学习系统,当用户将要售卖的二手房挂到二手房网站上时,该机器学习系统能自动根据该二手房的相关信息给出合理的销售价格预测,以指导客户报价。


二、数据集说明

本案例所使用的数据集为纯文本文件,说明如下:

 数据集路径:/data/dataset/ml/house/train.csv


三、操作步骤

阶段一、启动HDFS、Spark集群服务和zeppelin服务器

1、启动HDFS集群

 在Linux终端窗口下,输入以下命令,启动HDFS集群:

1.  $ start-dfs.sh

2、启动Spark集群

 在Linux终端窗口下,输入以下命令,启动Spark集群:


1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh

3、启动zeppelin服务器

 在Linux终端窗口下,输入以下命令,启动zeppelin服务器:

1.  $ zeppelin-daemon.sh start

4、验证以上进程是否已启动

 在Linux终端窗口下,输入以下命令,查看启动的服务进程:


1.  $ jps

如果显示以下6个进程,则说明各项服务启动正常,可以继续下一阶段。

2288 NameNode
2402 DataNode
2603 SecondaryNameNode
2769 Master
2891 Worker
2984 ZeppelinServer

阶段二、准备案例中用到的数据集

1、将本案例要用到的数据集上传到HDFS文件系统的/data/dataset/ml/目录下。在Linux终端窗口下,输入以下命令:

1.  $ hdfs dfs -mkdir -p /data/dataset/ml
2.  $ hdfs dfs -put /data/dataset/ml/house /data/dataset/ml/

2、在Linux终端窗口下,输入以下命令,查看HDFS上是否已经上传了该数据集:

1.  $ hdfs dfs -ls /data/dataset/ml/house

这时应该看到house目录及其中的训练数据集已经上传到了HDFS的/data/dataset/ml/目录下。


阶段三、对数据集进行探索和分析

1、新建一个zeppelin notebook文件,并命名为house_project。

 2、先导入案例中要用到的机器学习库。在notebook单元格中,输入以下代码:

1.  // 导入相关的包
2.  import org.apache.spark.sql.functions._
3.  import org.apache.spark.ml.Pipeline
4.  import org.apache.spark.ml.feature.{StringIndexer,VectorAssembler,RFormula}
5.  import org.apache.spark.ml.regression.LinearRegression
6.  import org.apache.spark.ml.evaluation.RegressionEvaluator
7.  import org.apache.spark.mllib.evaluation.RegressionMetrics

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

import org.apache.spark.sql.functions._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, RFormula}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.mllib.evaluation.RegressionMetrics

3、加载训练数据集。在notebook单元格中,输入以下代码:

1.  // 加载数据文件
2.  val file = "hdfs://localhost:9000/data/dataset/ml/house/train.csv"
3.  val house_data = spark.read.option("header", "true").option("inferSchema","true").csv(file)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:


file: String = /data/dataset/ml/house/train.csv
house_data: org.apache.spark.sql.DataFrame = [Id: int, MSSubClass: int … 79 more fields]

由以上输出内容可以看出,该数据集共有81个字段。


4、简单数据探索,查看数据模式。在notebook单元格中,输入以下代码:


1.  // 简单的数据探索
2.  house_data.printSchema

同时按下【Shift+Enter】,执行以上代码,输出内容如下:

root
  |— Id: integer (nullable = true)
  |— MSSubClass: integer (nullable = true)
  |— MSZoning: string (nullable = true)
  |— LotFrontage: string (nullable = true)
  |— LotArea: integer (nullable = true)
  |— Street: string (nullable = true)
  |— Alley: string (nullable = true)
  |— LotShape: string (nullable = true)
  |— LandContour: string (nullable = true)
  |— Utilities: string (nullable = true)
  |— LotConfig: string (nullable = true)
  |— LandSlope: string (nullable = true)
  |— Neighborhood: string (nullable = true)
  |— Condition1: string (nullable = true)
  |— Condition2: string (nullable = true)
  |— BldgType: string (nullable = true)
  |— HouseStyle: string (nullable = true)
  |— OverallQual: integer (nullable = true)
  |— OverallCond: integer (nullable = true)
  |— YearBuilt: integer (nullable = true)
  |— YearRemodAdd: integer (nullable = true)
  |— RoofStyle: string (nullable = true)
  |— RoofMatl: string (nullable = true)
  |— Exterior1st: string (nullable = true)
  |— Exterior2nd: string (nullable = true)
  |— MasVnrType: string (nullable = true)
  |— MasVnrArea: string (nullable = true)
  |— ExterQual: string (nullable = true)
  |— ExterCond: string (nullable = true)
  |— Foundation: string (nullable = true)
  |— BsmtQual: string (nullable = true)
  |— BsmtCond: string (nullable = true)
  |— BsmtExposure: string (nullable = true)
  |— BsmtFinType1: string (nullable = true)
  |— BsmtFinSF1: integer (nullable = true)
  |— BsmtFinType2: string (nullable = true)
  |— BsmtFinSF2: integer (nullable = true)
  |— BsmtUnfSF: integer (nullable = true)
  |— TotalBsmtSF: integer (nullable = true)
  |— Heating: string (nullable = true)
  |— HeatingQC: string (nullable = true)
  |— CentralAir: string (nullable = true)
  |— Electrical: string (nullable = true)
  |— 1stFlrSF: integer (nullable = true)
  |— 2ndFlrSF: integer (nullable = true)
  |— LowQualFinSF: integer (nullable = true)
  |— GrLivArea: integer (nullable = true)
  |— BsmtFullBath: integer (nullable = true)
  |— BsmtHalfBath: integer (nullable = true)
  |— FullBath: integer (nullable = true)
  |— HalfBath: integer (nullable = true)
  |— BedroomAbvGr: integer (nullable = true)
  |— KitchenAbvGr: integer (nullable = true)
  |— KitchenQual: string (nullable = true)
  |— TotRmsAbvGrd: integer (nullable = true)
  |— Functional: string (nullable = true)
  |— Fireplaces: integer (nullable = true)
  |— FireplaceQu: string (nullable = true)
  |— GarageType: string (nullable = true)
  |— GarageYrBlt: string (nullable = true)
  |— GarageFinish: string (nullable = true)
  |— GarageCars: integer (nullable = true)
  |— GarageArea: integer (nullable = true)
  |— GarageQual: string (nullable = true)
  |— GarageCond: string (nullable = true)
  |— PavedDrive: string (nullable = true)
  |— WoodDeckSF: integer (nullable = true)
  |— OpenPorchSF: integer (nullable = true)
  |— EnclosedPorch: integer (nullable = true)
  |— 3SsnPorch: integer (nullable = true)
  |— ScreenPorch: integer (nullable = true)
  |— PoolArea: integer (nullable = true)
  |— PoolQC: string (nullable = true)
  |— Fence: string (nullable = true)
  |— MiscFeature: string (nullable = true)
  |— MiscVal: integer (nullable = true)
  |— MoSold: integer (nullable = true)
  |— YrSold: integer (nullable = true)
  |— SaleType: string (nullable = true)
  |— SaleCondition: string (nullable = true)
  |— SalePrice: integer (nullable = true)

5、选择数据列子集,只保留作为特征使用的11个列。在notebook单元格中,输入以下代码:

1.  // 选择作为特征使用的列(11个字段)
2.  val cols = Seq[String]("SalePrice", "LotArea", "RoofStyle", "Heating", "1stFlrSF", "2ndFlrSF", "BedroomAbvGr","KitchenAbvGr", "GarageCars", "TotRmsAbvGrd", "YearBuilt")
3.  val colNames = cols.map(n => col(n))       // Column
4.       
5.  // 只选择所需的列
6.  val skinny_house_data = house_data.select(colNames:_*)     // 可变参数

同时按下【Shift+Enter】,执行以上代码,输出内容如下:


1.  // 选择作为特征使用的列(11个字段)
2.  val cols = Seq[String]("SalePrice", "LotArea", "RoofStyle", "Heating", "1stFlrSF", "2ndFlrSF", "BedroomAbvGr","KitchenAbvGr", "GarageCars", "TotRmsAbvGrd", "YearBuilt")
3.  val colNames = cols.map(n => col(n))       // Column
4.       
5.  // 只选择所需的列
6.  val skinny_house_data = house_data.select(colNames:_*)     // 可变参数

由以上输出内容可以看出,经过转换以后,最终的DataFrame只保留了11列。


6、查看转换以后的数据。在notebook单元格中,输入以下代码:

cols: Seq[String] = List(SalePrice, LotArea, RoofStyle, Heating, 1stFlrSF, 2ndFlrSF, BedroomAbvGr, KitchenAbvGr, GarageCars, TotRmsAbvGrd, YearBuilt)
colNames: Seq[org.apache.spark.sql.Column] = List(SalePrice, LotArea, RoofStyle, Heating, 1stFlrSF, 2ndFlrSF, BedroomAbvGr, KitchenAbvGr, GarageCars, TotRmsAbvGrd, YearBuilt)
skinny_house_data: org.apache.spark.sql.DataFrame = [SalePrice: int, LotArea: int … 9 more fields]

同时按下【Shift+Enter】,执行以上代码,输出内容如下:


Array[org.apache.spark.sql.Row] = Array([208500,8450,Gable,GasA,856,854,3,1,2,8,2003])

7、查看Schema。在notebook单元格中,输入以下代码:

1.  skinny_house_data.printSchema

同时按下【Shift+Enter】,执行以上代码,输出内容如下:

root
  |— SalePrice: integer (nullable = true)
  |— LotArea: integer (nullable = true)
  |— RoofStyle: string (nullable = true)
  |— Heating: string (nullable = true)
  |— 1stFlrSF: integer (nullable = true)
  |— 2ndFlrSF: integer (nullable = true)
  |— BedroomAbvGr: integer (nullable = true)
  |— KitchenAbvGr: integer (nullable = true)
  |— GarageCars: integer (nullable = true)
  |— TotRmsAbvGrd: integer (nullable = true)
  |— YearBuilt: integer (nullable = true)

8、对数据集进行整理,通过相加”1stFlrSF”和”2ndFlrSF”列的值,创建一个名为”TotalSF”的新列(即将1楼面积和2楼面积相加),并将”SalePrice”列转换为double类型。在notebook单元格中,输入以下代码:

1.  // 通过相加"1stFlrSF"和"2ndFlrSF"列的值,创建一个名为"TotalSF"的新列(即将1楼面积和2楼面积相加)
2.  // 将"SalePrice"列转换为double
3.  val skinny_house_data1 = skinny_house_data.withColumn("TotalSF", $"1stFlrSF" + $"2ndFlrSF").
4.                                             drop("1stFlrSF","2ndFlrSF").
5.                                             withColumn("SalePrice", $"SalePrice".cast("double"))
6.  skinny_house_data1.show(3)

同时按下【Shift+Enter】,执行以上代码,输出内容如下:


834ae105134d42c58c662f8a737f0153.png

由以上输出内容可以看出,在将”1stFlrSF”和”2ndFlrSF”列合并之后,将这两个列删除,现在DataFrame仅有10列了。


9、检查名为”SalePrice”的标签列的统计信息。在notebook单元格中,输入以下代码:

1.  skinny_house_data1.describe("SalePrice").show

同时按下【Shift+Enter】,执行以上代码,输出内容如下:

7f8b62100beb409b87c4c62bad89e1b2.png


由以上输出内容可以看出所有二手房的均价、最高价和最低价。


10、将数据拆分为训练数据集和测试数据集。在notebook单元格中,输入以下代码:

1.  val Array(training, test) = skinny_house_data1.randomSplit(Array(0.8, 0.2))

同时按下【Shift+Enter】,执行以上代码,输出内容如下:

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [SalePrice: double, LotArea: int … 8 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [SalePrice: double, LotArea: int … 8 more fields]

11、创建estimators和transformers以设置一个管道(pipeline)。

  在notebook单元格中,输入以下代码:

1.  // 设置无效的分类值处理策略为skip,以避免在评估(evaluation)时发生错误
2.  val roofStyleIndxr = new StringIndexer().setInputCol("RoofStyle").
3.                                           setOutputCol("RoofStyleIdx").
4.                                           setHandleInvalid("skip")
5.       
6.  // 同理
7.  val heatingIndxr = new StringIndexer().setInputCol("Heating").
8.                                         setOutputCol("HeatingIdx").
9.                                         setHandleInvalid("skip")
10.      
11. // 将多个特征组装为一个特性向量
12. val assembler = new VectorAssembler().setInputCols(Array("LotArea", "RoofStyleIdx","HeatingIdx","LotArea", "BedroomAbvGr","KitchenAbvGr", "GarageCars","TotRmsAbvGrd", "YearBuilt","TotalSF")).setOutputCol("features")
13.      
14. // 创建线性回归算法(一个Estimator),指定SalePrice为标签列
15. val linearRegression = new LinearRegression().setLabelCol("SalePrice")

同时按下【Shift+Enter】,执行以上代码,输出内容如下:

roofStyleIndxr: org.apache.spark.ml.feature.StringIndexer = strIdx_b6f3651520c7
heatingIndxr: org.apache.spark.ml.feature.StringIndexer = strIdx_05d504adda48
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_51b012c1b303
linearRegression: org.apache.spark.ml.regression.LinearRegression = linReg_be6dd30866fd

12、查看线性加归算法的各参数含义和默认值。在notebook单元格中,输入以下代码:

1.  linearRegression.explainParams

同时按下【Shift+Enter】,执行以上代码,输出内容如下:

String =
aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. (default: 1.35)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label, current: SalePrice)
loss: The loss function to be optimized. Supported options: squaredError, huber. (Default squaredError) (default: squaredError)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
regParam: regulariza…

由以上输出内容可以看出,LinearRegression算法具有哪些参数,以及这些参数的默认值和当前值分别是多少。


13、设置管道并使用该管道训练模型。在notebook单元格中,输入以下代码:

1.  // 设置pipeline
2.  val pipeline = new Pipeline().setStages(Array(roofStyleIndxr, heatingIndxr, assembler, linearRegression))
3.  
4.  // 训练该pipeline
5.  val model = pipeline.fit(training)

同时按下【Shift+Enter】,执行以上代码,输出内容如下:

pipeline: org.apache.spark.ml.Pipeline = pipeline_df9c90368f8d
model: org.apache.spark.ml.PipelineModel = pipeline_df9c90368f8d

这样,我们就得到了一个经过回归算法训练的预测模型。


14、执行预测。在notebook单元格中,输入以下代码:

pipeline: org.apache.spark.ml.Pipeline = pipeline_df9c90368f8d
model: org.apache.spark.ml.PipelineModel = pipeline_df9c90368f8d

同时按下【Shift+Enter】,执行以上代码,输出内容如下:

predictions: org.apache.spark.sql.DataFrame = [SalePrice: double, LotArea: int … 12 more fields]

由以上输出内容可以看出,在拟合数据后,得到一个Word2VecModel的模型对象。


15、查看预测结果。在notebook单元格中,输入以下代码:

1.  predictions.select($"SalePrice",$"prediction").show(10,false)

同时按下【Shift+Enter】,执行以上代码,输出内容如下:

47f81aec971e452eb09259d35af6b4d6.png


16、执行模型性能的评估。在notebook单元格中,输入以下代码:

1.  // 执行模型性能的评估,默认的度量标准是ROC下面的面积(这里改用rmse)
2.  val evaluator = new RegressionEvaluator().setLabelCol("SalePrice").
3.                                            setPredictionCol("prediction").
4.                                            setMetricName("rmse")
5.  val rmse = evaluator.evaluate(predictions)

同时按下【Shift+Enter】,执行以上代码,输出内容如下:


evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_9371eb8014a0
rmse: Double = 42370.856215530395

由以上输出内容可以看出,该次预测的结果并不理想。可以尝试从数据拆分开始多运行几次,每次的结果可能都不相同。


相关文章
|
4天前
|
机器学习/深度学习 自然语言处理 算法
机器学习算法原理与应用:深入探索与实战
【5月更文挑战第2天】本文深入探讨机器学习算法原理,包括监督学习(如线性回归、SVM、神经网络)、非监督学习(聚类、PCA)和强化学习。通过案例展示了机器学习在图像识别(CNN)、自然语言处理(RNN/LSTM)和推荐系统(协同过滤)的应用。随着技术发展,机器学习正广泛影响各领域,但也带来隐私和算法偏见问题,需关注解决。
|
6天前
|
机器学习/深度学习 数据采集 TensorFlow
【Python机器学习专栏】使用Python进行图像分类的实战案例
【4月更文挑战第30天】本文介绍了使用Python和深度学习库TensorFlow、Keras进行图像分类的实战案例。通过CIFAR-10数据集,展示如何构建和训练一个卷积神经网络(CNN)模型,实现对10个类别图像的识别。首先安装必要库,然后加载数据集并显示图像。接着,建立基本CNN模型,编译并训练模型,最后评估其在测试集上的准确性。此案例为初学者提供了图像分类的入门教程,为进一步学习和优化打下基础。
|
6天前
|
机器学习/深度学习 运维 算法
【Python机器学习专栏】异常检测算法在Python中的实践
【4月更文挑战第30天】本文介绍了异常检测的重要性和在不同领域的应用,如欺诈检测和网络安全。文章概述了四种常见异常检测算法:基于统计、距离、密度和模型的方法。在Python实践中,使用scikit-learn库展示了如何实现这些算法,包括正态分布拟合、K-means聚类、局部异常因子(LOF)和孤立森林(Isolation Forest)。通过计算概率密度、距离、LOF值和数据点的平均路径长度来识别异常值。
|
6天前
|
机器学习/深度学习 数据可视化 算法
【Python机器学习专栏】t-SNE算法在数据可视化中的应用
【4月更文挑战第30天】t-SNE算法是用于高维数据可视化的非线性降维技术,通过最小化Kullback-Leibler散度在低维空间保持数据点间关系。其特点包括:高维到二维/三维映射、保留局部结构、无需预定义簇数量,但计算成本高。Python中可使用`scikit-learn`的`TSNE`类实现,结合`matplotlib`进行可视化。尽管计算昂贵,t-SNE在揭示复杂数据集结构上极具价值。
|
6天前
|
机器学习/深度学习 算法 数据挖掘
【Python机器学习专栏】关联规则学习:Apriori算法详解
【4月更文挑战第30天】Apriori算法是一种用于关联规则学习的经典算法,尤其适用于购物篮分析,以发现商品间的购买关联。该算法基于支持度和置信度指标,通过迭代生成频繁项集并提取满足阈值的规则。Python中可借助mlxtend库实现Apriori,例如处理购物篮数据,设置支持度和置信度阈值,找出相关规则。
|
6天前
|
机器学习/深度学习 算法 数据挖掘
【Python机器学习专栏】层次聚类算法的原理与应用
【4月更文挑战第30天】层次聚类是数据挖掘中的聚类技术,无需预设簇数量,能生成数据的层次结构。分为凝聚(自下而上)和分裂(自上而下)两类,常用凝聚层次聚类有最短/最长距离、群集平均和Ward方法。优点是自动确定簇数、提供层次结构,适合小到中型数据集;缺点是计算成本高、过程不可逆且对异常值敏感。在Python中可使用`scipy.cluster.hierarchy`进行实现。尽管有局限,层次聚类仍是各领域强大的分析工具。
|
2月前
|
机器学习/深度学习 存储 搜索推荐
利用机器学习算法改善电商推荐系统的效率
电商行业日益竞争激烈,提升用户体验成为关键。本文将探讨如何利用机器学习算法优化电商推荐系统,通过分析用户行为数据和商品信息,实现个性化推荐,从而提高推荐效率和准确性。
|
1月前
|
机器学习/深度学习 算法 搜索推荐
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)
|
2月前
|
机器学习/深度学习 算法 数据可视化
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
30 1
|
2月前
|
机器学习/深度学习 数据采集 算法
解码癌症预测的密码:可解释性机器学习算法SHAP揭示XGBoost模型的预测机制
解码癌症预测的密码:可解释性机器学习算法SHAP揭示XGBoost模型的预测机制
142 0