Apache Spark源码走读(十一)浅谈mllib中线性回归的算法实现&Spark MLLib中拟牛顿法L-BFGS的源码实现

简介: 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读。第二部分讲解Spark MLLib中拟牛顿法L-BFGS的源码实现。

<一>浅谈mllib中线性回归的算法实现

概要

本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读。

线性回归模型

机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路

  1. 假设函数
  2. 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准
  3. 根据损失函数推出目标函数
  4. 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

具体到线性回归来说,上述就转换为:

梯度下降法

那么如何求得损失函数的最优解,针对最小二乘法来说可以使用梯度下降法。

算法实现

随机梯度下降

正则化

 如何解决这些问题呢?可以采用收缩方法(shrinkage method),收缩方法又称为正则化(regularization)。 主要是岭回归(ridge regression)和lasso回归。通过对最小二乘估计加 入罚约束,使某些系数的估计为0。

线性回归的代码实现

上面讲述了一些数学基础,在将这些数学理论用代码来实现的时候,最主要的是把握住相应的假设函数和最优化算法是什么,有没有相应的正则化规则。

对于线性回归,这些都已经明确,分别为

  1. Y = A*X + B 假设函数
  2. 随机梯度下降法
  3. 岭回归或Lasso法,或什么都没有

那么Spark mllib针对线性回归的代码实现也是依据该步骤来组织的代码,其类图如下所示

函数调用路径

train->run,run函数的处理逻辑

  1. 利用最优化算法来求得最优解,optimizer.optimize
  2. 根据最优解创建相应的回归模型, createModel

runMiniBatchSGD是真正计算Gradient和Loss的地方

def runMiniBatchSGD(
      data: RDD[(Double, Vector)],
      gradient: Gradient,
      updater: Updater,
      stepSize: Double,
      numIterations: Int,
      regParam: Double,
      miniBatchFraction: Double,
      initialWeights: Vector): (Vector, Array[Double]) = {

    val stochasticLossHistory = new ArrayBuffer[Double](numIterations)

    val numExamples = data.count()
    val miniBatchSize = numExamples * miniBatchFraction

    // if no data, return initial weights to avoid NaNs
    if (numExamples == 0) {

      logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
      return (initialWeights, stochasticLossHistory.toArray)

    }

    // Initialize weights as a column vector
    var weights = Vectors.dense(initialWeights.toArray)
    val n = weights.size

    /**
     * For the first iteration, the regVal will be initialized as sum of weight squares
     * if it's L2 updater; for L1 updater, the same logic is followed.
     */
    var regVal = updater.compute(
      weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

    for (i  (c, v) match { case ((grad, loss), (label, features)) =>
            val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
            (grad, loss + l)
          },
          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
            (grad1 += grad2, loss1 + loss2)
          })

      /**
       * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
       * and regVal is the regularization value computed in the previous iteration as well.
       */
      stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
      val update = updater.compute(
        weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
      weights = update._1
      regVal = update._2
    }

    logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
      stochasticLossHistory.takeRight(10).mkString(", ")))

    (weights, stochasticLossHistory.toArray)

  }

 上述代码中最需要引起重视的部分是aggregate函数的使用,先看下aggregate函数的定义

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

aggregate函数有三个入参,一是初始值ZeroValue,二是seqOp,三为combOp.

  1. seqOp seqOp会被并行执行,具体由各个executor上的task来完成计算
  2. combOp combOp则是串行执行, 其中combOp操作在JobWaiter的taskSucceeded函数中被调用

为了进一步加深对aggregate函数的理解,现举一个小小例子。启动spark-shell后,运行如下代码

val z = sc. parallelize (List (1 ,2 ,3 ,4 ,5 ,6),2)
z.aggregate (0)(math.max(_, _), _ + _)
// 运 行 结 果 为 9
res0: Int = 9

仔细观察一下运行时的日志输出, aggregate提交的job由一个stage(stage0)组成,由于整个数据集被分成两个partition,所以为stage0创建了两个task并行处理。

LeastSquareGradient

讲完了aggregate函数的执行过程, 回过头来继续讲组成seqOp的gradient.compute函数。

LeastSquareGradient用来计算梯度和误差,注意cmopute中cumGraident会返回改变后的结果。这里计算公式依据的就是cost-function中的▽Q(w)

class LeastSquaresGradient extends Gradient {
  override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
    val brzData = data.toBreeze
    val brzWeights = weights.toBreeze
    val diff = brzWeights.dot(brzData) - label
    val loss = diff * diff
    val gradient = brzData * (2.0 * diff)

    (Vectors.fromBreeze(gradient), loss)
  }

  override def compute(
      data: Vector,
      label: Double,
      weights: Vector,
      cumGradient: Vector): Double = {
    val brzData = data.toBreeze
    val brzWeights = weights.toBreeze
    //dot表示点积,是接受在实数R上的两个向量并返回一个实数标量的二元运算,它的结果是欧几里得空间的标准内积。
    //两个向量的点积写作a·b。点乘的结果叫做点积,也称作数量积
    val diff = brzWeights.dot(brzData) - label

    //下面这句话完成y += a*x
    brzAxpy(2.0 * diff, brzData, cumGradient.toBreeze)

    diff * diff
  }
}

在上述代码中频繁出现breeze相关的函数,你一定会很好奇,这是个什么新鲜玩艺。

说 开 了 其 实 一 点 也 不 稀 奇, 由 于 计 算 中 有 大 量 的 矩 阵(Matrix)及 向量(Vector)计算,为了更好支持和封装这些计算引入了breeze库。

Breeze, Epic及Puck是scalanlp中三大支柱性项目, 具体可参数www.scalanlp.org

正则化过程

根据本次迭代出来的梯度和误差对权重系数进行更新,这个时候就需要用上正则化规则了。也就是下述语句会触发权重系数的更新

  val update = updater.compute(
     weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)

以岭回归为例,看其更新过程的代码实现。

class SquaredL2Updater extends Updater {
  override def compute(
      weightsOld: Vector,
      gradient: Vector,
      stepSize: Double,
      iter: Int,
      regParam: Double): (Vector, Double) = {
    // add up both updates from the gradient of the loss (= step) as well as
    // the gradient of the regularizer (= regParam * weightsOld)
    // w' = w - thisIterStepSize * (gradient + regParam * w)
    // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
    val thisIterStepSize = stepSize / math.sqrt(iter)
    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
    brzWeights :*= (1.0 - thisIterStepSize * regParam)
    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
    val norm = brzNorm(brzWeights, 2.0)

    (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
  }
}

结果预测

计算出权重系数(weights)和截距intecept,就可以用来创建线性回归模型LinearRegressionModel,利用模型的predict函数来对观测值进行预测

class LinearRegressionModel (
    override val weights: Vector,
    override val intercept: Double)
  extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {

  override protected def predictPoint(
      dataMatrix: Vector,
      weightMatrix: Vector,
      intercept: Double): Double = {
    weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
  }
}

 注意LinearRegression的构造函数需要权重(weights)和截距(intercept)作为入参,对新的变量做出预测需要调用predictPoint。

一个完整的示例程序

在spark-shell中执行如下语句来亲自体验一下吧。

import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
val parsedData = data.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}

// Building the model
val numIterations = 100
val model = LinearRegressionWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error
val valuesAndPreds = parsedData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
println("training Mean Squared Error = " + MSE)

小结

再次强调,找到对应的假设函数,用于评估的损失函数最优化求解方法正则化规则。

<二>Spark MLLib中拟牛顿法L-BFGS的源码实现

概要

本文就拟牛顿法L-BFGS的由来做一个简要的回顾,然后就其在spark mllib中的实现进行源码走读。

拟牛顿法

数学原理

 

代码实现

L-BFGS算法中使用到的正则化方法是SquaredL2Updater。

算法实现上使用到了由scalanlp的成员项目breeze库中的BreezeLBFGS函数,mllib中自定义了BreezeLBFGS所需要的DiffFunctions.



runLBFGS函数的源码实现如下:

def runLBFGS(
      data: RDD[(Double, Vector)],
      gradient: Gradient,
      updater: Updater,
      numCorrections: Int,
      convergenceTol: Double,
      maxNumIterations: Int,
      regParam: Double,
      initialWeights: Vector): (Vector, Array[Double]) = {

    val lossHistory = new ArrayBuffer[Double](maxNumIterations)

    val numExamples = data.count()

    val costFun =
      new CostFun(data, gradient, updater, regParam, numExamples)

    val lbfgs = new BreezeLBFGS[BDV[Double]](maxNumIterations, numCorrections, convergenceTol)

    val states =
      lbfgs.iterations(new CachedDiffFunction(costFun), initialWeights.toBreeze.toDenseVector)

    /**
     * NOTE: lossSum and loss is computed using the weights from the previous iteration
     * and regVal is the regularization value computed in the previous iteration as well.
     */
    var state = states.next()
    while(states.hasNext) {
      lossHistory.append(state.value)
      state = states.next()
    }
    lossHistory.append(state.value)
    val weights = Vectors.fromBreeze(state.x)

    logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format(
      lossHistory.takeRight(10).mkString(", ")))

    (weights, lossHistory.toArray)
  }

costFun函数是算法实现中的重点

private class CostFun(
    data: RDD[(Double, Vector)],
    gradient: Gradient,
    updater: Updater,
    regParam: Double,
    numExamples: Long) extends DiffFunction[BDV[Double]] {

    private var i = 0

    override def calculate(weights: BDV[Double]) = {
      // Have a local copy to avoid the serialization of CostFun object which is not serializable.
      val localData = data
      val localGradient = gradient

      val (gradientSum, lossSum) = localData.aggregate((BDV.zeros[Double](weights.size), 0.0))(
          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
            val l = localGradient.compute(
              features, label, Vectors.fromBreeze(weights), Vectors.fromBreeze(grad))
            (grad, loss + l)
          },
          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
            (grad1 += grad2, loss1 + loss2)
          })

      /**
       * regVal is sum of weight squares if it's L2 updater;
       * for other updater, the same logic is followed.
       */
      val regVal = updater.compute(
        Vectors.fromBreeze(weights),
        Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2

      val loss = lossSum / numExamples + regVal
      /**
       * It will return the gradient part of regularization using updater.
       *
       * Given the input parameters, the updater basically does the following,
       *
       * w' = w - thisIterStepSize * (gradient + regGradient(w))
       * Note that regGradient is function of w
       *
       * If we set gradient = 0, thisIterStepSize = 1, then
       *
       * regGradient(w) = w - w'
       *
       * TODO: We need to clean it up by separating the logic of regularization out
       *       from updater to regularizer.
       */
      // The following gradientTotal is actually the regularization part of gradient.
      // Will add the gradientSum computed from the data with weights in the next step.
      val gradientTotal = weights - updater.compute(
        Vectors.fromBreeze(weights),
        Vectors.dense(new Array[Double](weights.size)), 1, 1, regParam)._1.toBreeze

      // gradientTotal = gradientSum / numExamples + gradientTotal
      axpy(1.0 / numExamples, gradientSum, gradientTotal)

      i += 1

      (loss, gradientTotal)
    }
  }

}
目录
相关文章
|
14天前
|
机器学习/深度学习 前端开发 算法
婚恋交友系统平台 相亲交友平台系统 婚恋交友系统APP 婚恋系统源码 婚恋交友平台开发流程 婚恋交友系统架构设计 婚恋交友系统前端/后端开发 婚恋交友系统匹配推荐算法优化
婚恋交友系统平台通过线上互动帮助单身男女找到合适伴侣,提供用户注册、个人资料填写、匹配推荐、实时聊天、社区互动等功能。开发流程包括需求分析、技术选型、系统架构设计、功能实现、测试优化和上线运维。匹配推荐算法优化是核心,通过用户行为数据分析和机器学习提高匹配准确性。
47 3
|
2月前
|
搜索推荐 算法 C语言
【排序算法】八大排序(下)(c语言实现)(附源码)
本文继续学习并实现了八大排序算法中的后四种:堆排序、快速排序、归并排序和计数排序。详细介绍了每种排序算法的原理、步骤和代码实现,并通过测试数据展示了它们的性能表现。堆排序利用堆的特性进行排序,快速排序通过递归和多种划分方法实现高效排序,归并排序通过分治法将问题分解后再合并,计数排序则通过统计每个元素的出现次数实现非比较排序。最后,文章还对比了这些排序算法在处理一百万个整形数据时的运行时间,帮助读者了解不同算法的优劣。
139 7
|
2月前
|
搜索推荐 算法 C语言
【排序算法】八大排序(上)(c语言实现)(附源码)
本文介绍了四种常见的排序算法:冒泡排序、选择排序、插入排序和希尔排序。通过具体的代码实现和测试数据,详细解释了每种算法的工作原理和性能特点。冒泡排序通过不断交换相邻元素来排序,选择排序通过选择最小元素进行交换,插入排序通过逐步插入元素到已排序部分,而希尔排序则是插入排序的改进版,通过预排序使数据更接近有序,从而提高效率。文章最后总结了这四种算法的空间和时间复杂度,以及它们的稳定性。
118 8
|
7月前
|
搜索推荐 算法 小程序
基于Java协同过滤算法的电影推荐系统设计和实现(源码+LW+调试文档+讲解等)
基于Java协同过滤算法的电影推荐系统设计和实现(源码+LW+调试文档+讲解等)
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
67 1
|
3月前
|
存储 算法 安全
ArrayList简介及使用全方位手把手教学(带源码),用ArrayList实现洗牌算法,3个人轮流拿牌(带全部源码)
文章全面介绍了Java中ArrayList的使用方法,包括其构造方法、常见操作、遍历方式、扩容机制,并展示了如何使用ArrayList实现洗牌算法的实例。
26 0
|
5月前
|
JSON 算法 API
京东以图搜图功能API接口调用算法源码python
京东图搜接口是一款强大工具,通过上传图片即可搜索京东平台上的商品。适合电商平台、比价应用及需商品识别服务的场景。使用前需了解接口功能并注册开发者账号获取Key和Secret;准备好图片的Base64编码和AppKey;生成安全签名后,利用HTTP客户端发送POST请求至接口URL;最后解析JSON响应数据以获取商品信息。
|
5月前
|
数据采集 机器学习/深度学习 算法
【python】python客户信息审计风险决策树算法分类预测(源码+数据集+论文)【独一无二】
【python】python客户信息审计风险决策树算法分类预测(源码+数据集+论文)【独一无二】
|
6月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
167 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
5月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
83 0

热门文章

最新文章

推荐镜像

更多