Spark机器学习库现支持两种接口的API:RDD-based和DataFrame-based,Spark官方网站上说,RDD-based APIs在2.0后进入维护//代码效果参考:http://hnjlyzjd.com/xl/wz_25104.html
模式,主要的机器学习API是spark-ml包中的DataFrame-based API,并将在3.0后完全移除RDD-based API。在学习了两周Spark MLlib后,准备转向DataFrame-based接口。由于现有的文档资料均是RDD-based接口,于是便去看了看Spark MLlib的源码。DataFrame-based API 包含在org.apache.spark.ml包中,其中主要的类结构如下:
咱先看一个线性回归的例子examples/ml/LinearRegressionExample.scala,其首先定义了一个LinearRegression的对象:
val lir = new LinearRegression()
.setFeaturesCol("features")
.setLabelCol("label")
.setRegParam(params.regParam)
.setElasticNetParam(params.elasticNetParam)
.setMaxIter(params.maxIter)
.setTol(params.tol)
然后,调用fit方法训练数据,得到一个训练好的模型lirModel,它是一个LinearRegressionModel类的对象。
val lirModel = lir.fit(training)
现在,我们大概可以理清MLlib机器学习的流程,和很多单机机器学习库一样,先定义一个模型并设置好参数,然后训练数据,最后返回一个训练好了的模型。
我们现在在源码中去查看LinearRegression和LinearRegressionModel,其类的依赖关系如下:
LinearRegression是一个Predictor,LinearRegressionModel是一个Model,那么Predictor是学习算法,Model是训练得到的模型。除此之外,还有一类继承自Params的类,这是一个表示参数的类。Predictor 和Model 共享一套参数。
现在用Spark MLlib来完成第一个机器学习例子,数据是我之前放在txt文件里的回归数据,一共550多万条,共13列,第一列是Label,后面是Features。分别演示两种接口,先用旧的接口:
1.读取原始数据:
scala> import org.apache.spark.mllib.linalg.
import org.apache.spark.mllib.linalg.
scala
import org.apache.spark.mllib.regression._
scala
raw_data: org.apache.spark.rdd.RDD【String】 = data/my/y_x.txt MapPartitionsRDD【1】 at textFile at :30
2.转换格式,RDD-based接口以LabeledPoint为输入数据的格式:
scala> val data = rawdata.map{ line =>
| val arr = line.split(' ').map(.toDouble)
| val label = arr.head
| val features = Vectors.dense(arr.tail)| LabeledPoint(label,features)
| }
data: org.apache.spark.rdd.RDD【org.apache.spark.mllib.regression.LabeledPoint】 = MapPartitionsRDD【2】 at map at :32
3.划分train、test数据集:
scala> val splits = data.randomSplit(Array(0.8, 0.2))
splits: Array【org.apache.spark.rdd.RDD【org.apache.spark.mllib.regression.LabeledPoint】】 = Array(MapPartitionsRDD【3】 at randomSplit at :34, MapPartitionsRDD【4】 at randomSplit at :34)
scala
train_set: org.apache.spark.rdd.RDD【org.apache.spark.mllib.regression.LabeledPoint】 = MapPartitionsRDD【3】 at randomSplit at :34
scala
test_set: org.apache.spark.rdd.RDD【org.apache.spark.mllib.regression.LabeledPoint】 = MapPartitionsRDD【4】 at randomSplit at :34
4.使用LinearRegressionWithSGD.train训练模型:
scala> val lr = LinearRegressionWithSGD.train(train_set,100,0.0001)
warning: there was one deprecation warning; re-run with -deprecation for details
16/08/26 09:20:44 WARN Executor: 1 block locks were not released by TID = 0:
【rdd_3_0】
lr: org.apache.spark.mllib.regression.LinearRegressionModel = org.apache.spark.mllib.regression.LinearRegressionModel: intercept = 0.0, numFeatures = 12
5.模型评估:
scala> val pred_labels = test_set.map(lp => (lp.label, lr.predict(lp.features)))
predlabels: org.apache.spark.rdd.RDD【(Double, Double)】 = MapPartitionsRDD【17】 at map at :42
scala math.pow(p-v,2)}.mean
mse: Double = 0.05104150735910074
再用新的接口:
1.读取原始数据:
scala> import org.apache.spark.ml.linalg.
import org.apache.spark.ml.linalg.
scala</span] import org.apache.spark.ml.regression.
import org.apache.spark.ml.regression.
scala</span] import org.apache.spark.sql.
import org.apache.spark.sql._
scala
raw_data: org.apache.spark.sql.DataFrame = 【value: string】
2.转换数据
scala> val data = rawdata.rdd.map { case Row(line:String) =>
| val arr = line.split(' ').map(.toDouble)
| val label = arr.head
| val features = Vectors.dense(arr.tail)
| (label,features)
| }
data: org.apache.spark.rdd.RDD【(Double, org.apache.spark.ml.linalg.Vector)】 = MapPartitionsRDD【4】 at map at :34
3.划分数据集
scala> val splits = data.randomSplit(Array(0.8, 0.2))
splits: Array【org.apache.spark.rdd.RDD【(Double, org.apache.spark.ml.linalg.Vector)】】 = Array(MapPartitionsRDD【5】 at randomSplit at :36, MapPartitionsRDD【6】 at randomSplit at :36)
scala
train_set: org.apache.spark.sql.Dataset【(Double, org.apache.spark.ml.linalg.Vector)】 = 【_1: double, _2: vector】
scala
test_set: org.apache.spark.sql.Dataset【(Double, org.apache.spark.ml.linalg.Vector)】 = 【_1: double, _2: vector】
4.创建LinearRegression对象,并设置模型参数。这里设置类LabelCol和FeaturesCol列,默认为“label”和“features”,而我们的数据是"_1"和”_2“。
scala> val lir = new LinearRegression
lir: org.apache.spark.ml.regression.LinearRegression = linReg_c4e70a01bcd3
scala
res0: org.apache.spark.ml.regression.LinearRegression = linReg_c4e70a01bcd3
scala
res1: org.apache.spark.ml.regression.LinearRegression = linReg_c4e70a01bcd3
5.训练模型
val model = lir.fit(train_set)
16/08/26 09:45:16 WARN Executor: 1 block locks were not released by TID = 0:
【rdd_9_0】
16/08/26 09:45:16 WARN WeightedLeastSquares: regParam is zero, which might cause numerical instability and overfitting.
model: org.apache.spark.ml.regression.LinearRegressionModel = linReg_c4e70a01bcd3
6.模型评估
scala> val res = model.transform(test_set)
res: org.apache.spark.sql.DataFrame = 【_1: double, 2: vector ... 1 more field】
scala</span] import org.apache.spark.ml.evaluation.
import //代码效果参考:http://hnjlyzjd.com/xl/wz_25102.html
org.apache.spark.ml.evaluation._scala
eva: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_8fc6cce63aa9
scala
res6: eva.type = regEval_8fc6cce63aa9
scala
res7: eva.type = regEval_8fc6cce63aa9
scala
res8: Double = 0.027933653533088666