# Apache Spark源码走读（十一）浅谈mllib中线性回归的算法实现&Spark MLLib中拟牛顿法L-BFGS的源码实现

+关注继续查看

### 线性回归模型

1. 假设函数
2. 为了找到最好的假设函数，需要找到合理的评估标准，一般来说使用损失函数来做为评估标准
3. 根据损失函数推出目标函数
4. 现在问题转换成为如何找到目标函数的最优解，也就是目标函数的最优化

### 正则化

如何解决这些问题呢？可以采用收缩方法(shrinkage method)，收缩方法又称为正则化(regularization)。 主要是岭回归(ridge regression)和lasso回归。通过对最小二乘估计加 入罚约束,使某些系数的估计为0。

### 线性回归的代码实现

1. Y = A*X + B 假设函数
2. 随机梯度下降法
3. 岭回归或Lasso法，或什么都没有

train->run,run函数的处理逻辑

1. 利用最优化算法来求得最优解,optimizer.optimize
2. 根据最优解创建相应的回归模型, createModel

def runMiniBatchSGD(
data: RDD[(Double, Vector)],
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector): (Vector, Array[Double]) = {

val stochasticLossHistory = new ArrayBuffer[Double](numIterations)

val numExamples = data.count()
val miniBatchSize = numExamples * miniBatchFraction

// if no data, return initial weights to avoid NaNs
if (numExamples == 0) {

return (initialWeights, stochasticLossHistory.toArray)

}

// Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)
val n = weights.size

/**
* For the first iteration, the regVal will be initialized as sum of weight squares
* if it's L2 updater; for L1 updater, the same logic is followed.
*/
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

for (i  (c, v) match { case ((grad, loss), (label, features)) =>
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
})

/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
weights = update._1
regVal = update._2
}

stochasticLossHistory.takeRight(10).mkString(", ")))

(weights, stochasticLossHistory.toArray)

}


上述代码中最需要引起重视的部分是aggregate函数的使用,先看下aggregate函数的定义

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}


aggregate函数有三个入参,一是初始值ZeroValue,二是seqOp,三为combOp.

val z = sc. parallelize (List (1 ,2 ,3 ,4 ,5 ,6),2)
z.aggregate (0)(math.max(_, _), _ + _)
// 运 行 结 果 为 9
res0: Int = 9


class LeastSquaresGradient extends Gradient {
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val brzData = data.toBreeze
val brzWeights = weights.toBreeze
val diff = brzWeights.dot(brzData) - label
val loss = diff * diff
val gradient = brzData * (2.0 * diff)

}

override def compute(
data: Vector,
label: Double,
weights: Vector,
val brzData = data.toBreeze
val brzWeights = weights.toBreeze
//dot表示点积，是接受在实数R上的两个向量并返回一个实数标量的二元运算，它的结果是欧几里得空间的标准内积。
//两个向量的点积写作a·b。点乘的结果叫做点积，也称作数量积
val diff = brzWeights.dot(brzData) - label

//下面这句话完成y += a*x

diff * diff
}
}


Breeze, Epic及Puck是scalanlp中三大支柱性项目, 具体可参数www.scalanlp.org

### 正则化过程

  val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)


class SquaredL2Updater extends Updater {
override def compute(
weightsOld: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
// the gradient of the regularizer (= regParam * weightsOld)
// w' = w - thisIterStepSize * (gradient + regParam * w)
// w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
brzWeights :*= (1.0 - thisIterStepSize * regParam)
val norm = brzNorm(brzWeights, 2.0)

(Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
}
}


### 结果预测

class LinearRegressionModel (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

override protected def predictPoint(
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double): Double = {
weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
}
}


注意LinearRegression的构造函数需要权重(weights)和截距(intercept)作为入参，对新的变量做出预测需要调用predictPoint。

### 一个完整的示例程序

import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}

// Building the model
val numIterations = 100
val model = LinearRegressionWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
val valuesAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
println("training Mean Squared Error = " + MSE)


### 代码实现

L-BFGS算法中使用到的正则化方法是SquaredL2Updater。

runLBFGS函数的源码实现如下:

def runLBFGS(
data: RDD[(Double, Vector)],
updater: Updater,
numCorrections: Int,
convergenceTol: Double,
maxNumIterations: Int,
regParam: Double,
initialWeights: Vector): (Vector, Array[Double]) = {

val lossHistory = new ArrayBuffer[Double](maxNumIterations)

val numExamples = data.count()

val costFun =
new CostFun(data, gradient, updater, regParam, numExamples)

val lbfgs = new BreezeLBFGS[BDV[Double]](maxNumIterations, numCorrections, convergenceTol)

val states =
lbfgs.iterations(new CachedDiffFunction(costFun), initialWeights.toBreeze.toDenseVector)

/**
* NOTE: lossSum and loss is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
var state = states.next()
while(states.hasNext) {
lossHistory.append(state.value)
state = states.next()
}
lossHistory.append(state.value)
val weights = Vectors.fromBreeze(state.x)

logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format(
lossHistory.takeRight(10).mkString(", ")))

(weights, lossHistory.toArray)
}


costFun函数是算法实现中的重点

private class CostFun(
data: RDD[(Double, Vector)],
updater: Updater,
regParam: Double,
numExamples: Long) extends DiffFunction[BDV[Double]] {

private var i = 0

override def calculate(weights: BDV[Double]) = {
// Have a local copy to avoid the serialization of CostFun object which is not serializable.
val localData = data

val (gradientSum, lossSum) = localData.aggregate((BDV.zeros[Double](weights.size), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
})

/**
* regVal is sum of weight squares if it's L2 updater;
* for other updater, the same logic is followed.
*/
val regVal = updater.compute(
Vectors.fromBreeze(weights),
Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

val loss = lossSum / numExamples + regVal
/**
* It will return the gradient part of regularization using updater.
*
* Given the input parameters, the updater basically does the following,
*
* Note that regGradient is function of w
*
* If we set gradient = 0, thisIterStepSize = 1, then
*
* regGradient(w) = w - w'
*
* TODO: We need to clean it up by separating the logic of regularization out
*       from updater to regularizer.
*/
// Will add the gradientSum computed from the data with weights in the next step.
val gradientTotal = weights - updater.compute(
Vectors.fromBreeze(weights),
Vectors.dense(new Array[Double](weights.size)), 1, 1, regParam)._1.toBreeze

i += 1

}
}

}

741 0
asp中的md5/sha1/sha256算法收集

866 0
NOIP-C++大神培养计划 Step1.1.2基础算法——模拟算法2

948 0
DL之Attention：基于ClutteredMNIST手写数字图片数据集分别利用CNN_Init、ST_CNN算法(CNN+SpatialTransformer)实现多分类预测（二）
DL之Attention：基于ClutteredMNIST手写数字图片数据集分别利用CNN_Init、ST_CNN算法(CNN+SpatialTransformer)实现多分类预测
21 0
DL之Attention：基于ClutteredMNIST手写数字图片数据集分别利用CNN_Init、ST_CNN算法(CNN+SpatialTransformer)实现多分类预测（一）
DL之Attention：基于ClutteredMNIST手写数字图片数据集分别利用CNN_Init、ST_CNN算法(CNN+SpatialTransformer)实现多分类预测
31 0
+关注
39

0

《SaaS模式云原生数据仓库应用场景实践》

《看见新力量：二》电子书