Spark排序算法系列之GBTs使用方式介绍

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 在本篇文章中你可以学到: Spark MLLib包中的GBDT使用方式 模型的通过保存、加载、预测 PipeLine ML包中的GBDT

01

前言


【Spark排序算法系列】主要介绍的是目前推荐系统或者广告点击方面用的比较广的几种算法,和他们在Spark中的应用实现,本篇文章主要介绍GBDT算法,本系列还包括(持续更新):

  • Spark排序算法系列之LR(逻辑回归)

  • Spark排序算法系列之模型融合(GBDT+LR)

  • Spark排序算法系列之XGBoost

  • Spark排序算法系列之FTRL(Follow-the-regularized-Leader)

  • Spark排序算法系列之FM与FFM


在本篇文章中你可以学到:

  • Spark MLLib包中的GBDT使用方式

  • 模型的通过保存、加载、预测

  • PipeLine

  • ML包中的GBDT

02


概述


LR因为其容易并行最早应用到推荐排序中的,但学习能力有限,需要大量的特征工程来增加模型的学习能力。但大量的特征工程耗时耗力,且不一定带来效果的提升,因此在如何能有效的发现特征组合,来缩短LR特征实验周期的背景下,GBDT被应用了起来。GBDT模型全称是Gradient Boosting Decision Tree,梯度提升决策树。是属于Boosing算法中的一种,关于Boosting的介绍可以参考文章集成学习(Ensemble Learning)

关于GBDT算法理解可参考:


其实相信很多人对Spark 机器学习包(ml和mllib)中的GBDT傻傻分不清楚,这里我们先来捋一捋。Spark中的GBDT较GBTs——梯度提升树,因为其是基于决策树(Decision Tree,DT)实现的,所以叫GBDT。Spark 中的GBDT算法存在于ml包和mllib包中,mllib是基于RDD的,ml包则是针对DataFrame的,ml包中的GBDT分为分类和回归,在实际使用过程中,需要根据具体情况进行衡量和选择。由于在实际生产环境中使用基于RDD的较多,所以下面将着重介绍下MLLib包中的GBTs,ML包中的将进行简单说明。


03


MLLib中的GBTs

在前边提到了GBTs是基于决策树(Decision Tree,DT),所以不了解DT的朋友可以先进行学习,这里就不做过多解释。GBDT 在Spark中的使用方式如下描述


创建Spark对象并加载数据

 
  1. // 文件路径

  2. val file = "data/new_sample_libsvm_data.txt"

  3. val model_path = "output/gbdt/"

  4. // 创建spark对象

  5. val spark = SparkSession.builder().master("local[5]")

  6.       .appName("GBDT_MLLib_Model_Train")

  7.       .getOrCreate()

  8. Logger.getRootLogger.setLevel(Level.WARN)

  9. // 使用MLUtils加载文件,并拆分成训练集和测试集

  10. val split = MLUtils.loadLibSVMFile(spark.sparkContext, file).randomSplit(Array(0.7,0.3))

  11. val (train, test) = (split(0), split(1))



GBDT参数初始化

 
  1. def getBoostingParam(): BoostingStrategy = {

  2.   val boostingStrategy = BoostingStrategy.defaultParams("Regression")

  3.   //一般不需要修改模型的学习率,除非模型训练结果值变化很大时,降低学习率的值

  4.   boostingStrategy.setLearningRate(boostingStrategy.getLearningRate())

  5.   // 迭代次数

  6.   boostingStrategy.setNumIterations(10)

  7.   // 设置树的深度,若树过深,容易导致过拟合

  8.   boostingStrategy.treeStrategy.setMaxDepth(5)

  9.   // 连续特征离散化的最大数量

  10.   boostingStrategy.treeStrategy.setMaxBins(10)

  11.   // 分裂后节点包含最少的实例个数

  12.   boostingStrategy.treeStrategy.setMinInstancesPerNode(2)

  13.   // 设置分类的数量

  14.   boostingStrategy.treeStrategy.setNumClasses(2)

  15.   // 设置最小信息增益值

  16.   boostingStrategy.treeStrategy.setMinInfoGain(1e-4)

  17.   //设置基类纯度值的计算方法,针对分类问题,支持基尼系数和信息增益(org.apache.spark.mllib.tree.impurity.Entropy)

  18.   boostingStrategy.treeStrategy.setImpurity(org.apache.spark.mllib.tree.impurity.Gini)

  19.   boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()

  20.   boostingStrategy

  21. }


模型训练与测试集加载

 
  1. // 创建模型

  2. val model = GradientBoostedTrees.train(train, boostingStrategy)

  3. // 测试集测试

  4. val test_result = test.map( line => {

  5.   val pre_label = model.predict(line.features)

  6.   //计算各个树预测值与树权重的点积;计算得到的结果值作为模型计算得到的分数值,用于排序

  7.   // 查看predict源码

  8.   val treePredictions = model.trees.map(_.predict(line.features))

  9.   val pre_score = blas.ddot(model.numTrees, treePredictions, 1, model.treeWeights, 1)

  10.   // 将森林生成的各个树得到的预测分值进行变形,得到最终用户点击的预估值

  11.   val score = Math.pow(1+Math.pow(Math.E, -2 * pre_score), -1)

  12.   (line.label, pre_label, score)

  13. })

  14. // 打印10条结果

  15. test_result.take(10).foreach(println)


效果验证

 
  1. def evaluteResult(test_result: RDD[(Double, Double, Double)]): Unit = {

  2.   // MSE

  3.   val testMSE = test_result.map{ case(real, pre, _) => math.pow((real - pre), 2)}.mean()

  4.   println(s"Test Mean Squared Error = $testMSE")

  5.   // AUC

  6.   val metrics = new BinaryClassificationMetrics(test_result.map(x => (x._2,x._1)).sortByKey(ascending = true),numBins = 2)

  7.   println(s"0-1 label AUC is = ${metrics.areaUnderROC}")

  8.   val metrics1 = new BinaryClassificationMetrics(test_result.map(x => (x._3,x._1)).sortByKey(ascending = true),numBins = 2)

  9.   println(s"score-label AUC is = ${metrics1.areaUnderROC}")

  10.   // 错误率

  11.   val error = test_result.filter(x => x._1!=x._2).count().toDouble / test_result.count()

  12.   println(s"error is = $error")

  13.   // 准确率

  14.   val accuracy = test_result.filter(x => x._1==x._2).count().toDouble / test_result.count()

  15.   println(s"accuracy is = $accuracy")

  16. }

模型保存

 
  1. def saveModel(model: GradientBoostedTreesModel, model_path: Any): Unit = {

  2.   // 保存模型文件 obj

  3.   val out_obj = new ObjectOutputStream(new FileOutputStream(model_path+"model.obj"))

  4.   out_obj.writeObject(model)


  5.   // 保存模型信息

  6.   val model_info=new BufferedWriter(new FileWriter(model_path+"model_info.txt"))

  7.   model_info.write(model.toDebugString)

  8.   model_info.flush()

  9.   model_info.close()


  10.   // 保存模型权重

  11.   val model_weights=new BufferedWriter(new FileWriter(model_path+"model_weights.txt"))

  12.   model_weights.write(model.treeWeights.zipWithIndex.map(x=>s"第${x._2+1}棵树的权重是:${x._1}").mkString("\n"))

  13.   model_weights.flush()

  14.   model_weights.close()

  15. }


模型加载

 
  1. def getModel(path: String): Option[GradientBoostedTreesModel] = {

  2.   try {

  3.      val in = new ObjectInputStream(new FileInputStream(path))

  4.      val model = Option(in.readObject().asInstanceOf[GradientBoostedTreesModel])

  5.      in.close()

  6.      println("model load success !")

  7.      model

  8.   }catch {

  9.      case ex: ClassNotFoundException => {

  10.         println(ex.printStackTrace())

  11.         None

  12.      }

  13.      case ex: IOException => {

  14.         ex.printStackTrace()

  15.         None

  16.      }

  17.      case _: Throwable => throw new Exception

  18.   }

  19. }


数值预测

 
  1. def predict(model: GradientBoostedTreesModel,feature:Array[Double]):Double={

  2.   val featureScore=Vectors.dense(feature)

  3.   val treePredictions = model.trees.map(_.predict(featureScore))

  4.   //计算各个树预测值与树权重的点积;计算得到的结果值作为模型计算得到的分数值,用于排序。

  5.   val predictScore=blas.ddot(model.numTrees,treePredictions,1,model.treeWeights,1)

  6.   // 将森林生成的各个树得到的预测分值进行变形,得到最终用户点击的预估值

  7.   val score=Math.pow(1 + Math.pow(Math.E, -2 * predictScore),-1)

  8.   score

  9. }



04


Pipeline

Spark ML Pipeline 的出现,是受到了 scikit-learn 项目的启发,并且总结了 MLlib 在处理复杂机器学习问题上的弊端,旨在向用户提供基于 DataFrame 之上的更加高层次的 API 库,以更加方便的构建复杂的机器学习工作流式应用。一个 Pipeline 在结构上会包含一个或多个 PipelineStage,每一个 PipelineStage 都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等,这样的 PipelineStage 在 ML 里按照处理问题类型的不同都有相应的定义和实现。接下来,我们先来了解几个重要概念。

  • DataFrame

相比RDD,DF包含了 schema 信息,更类似传统数据库中的二维表格。它被 ML Pipeline 用来存储源数据。DataFrame 可以被用来保存各种类型的数据,如我们可以把特征向量存储在 DataFrame 的一列中,这样用起来是非常方便的。

  • Transformer

Transformer 中文可以被翻译成转换器,是一个 PipelineStage,实现上也是继承自 PipelineStage 类,主要是用来把 一个 DataFrame 转换成另一个 DataFrame,比如一个模型就是一个 Transformer,因为它可以把 一个不包含预测标签的测试数据集 DataFrame 打上标签转化成另一个包含预测标签的 DataFrame,显然这样的结果集可以被用来做分析结果的可视化。

  • Estimator

Estimator 中文可以被翻译成评估器或适配器,在 Pipeline 里通常是被用来操作 DataFrame 数据并生产一个 Transformer,如一个随机森林算法就是一个 Estimator,因为它可以通过训练特征数据而得到一个随机森林模型。实现上 Estimator 也是继承自 PipelineStage 类。

  • Parameter

Parameter 被用来设置 Transformer 或者 Estimator 的参数。

要构建一个 Pipeline,首先我们需要定义 Pipeline 中的各个 PipelineStage,如指标提取和转换模型训练等。有了这些处理特定问题的 Transformer 和 Estimator,我们就可以按照具体的处理逻辑来有序的组织 PipelineStages 并创建一个 Pipeline,如 :

val pipeline = new Pipeline().setStages(Array(stage1,stage2,stage3,…))

然后就可以把训练数据集作为入参并调用 Pipelin 实例的 fit 方法来开始以流的方式来处理源训练数据,这个调用会返回一个 PipelineModel 类实例,进而被用来预测测试数据的标签,它是一个 Transformer。

比如在构建一个GBDT模型的时候,流程是这样的

3a85e4976fb6a307015dd40d02d85a703597a69f



05


ML包中的GBTs

从Spark 2.0开始,spark.mllib包中基于RDD的API已进入维护模式。 Spark的主要机器学习API现在是spark.ml包中基于DataFrame的API。接下来我们看下ml包中的GBDT(regression)。

加载数据训练模型

 

  1. // 文件路径

  2. val file = "data/sample_libsvm_data.txt"

  3. val model_path = "output/gbdt_ml/"

  4. // 创建spark对象

  5. val spark = SparkSession.builder().master("local[5]").appName("GBDT_ML_Model_Train").getOrCreate()

  6. Logger.getRootLogger.setLevel(Level.WARN)


  7. // 加载数据,并拆分为训练集和测试集


  8. // http://blog.leanote.com/post/kobeliuziyang/Spark%E8%AF%BB%E5%86%99%E6%95%B0%E6%8D%AE%E9%9B%86

  9. val data = spark.read.format("libsvm").load(file)

  10. data.printSchema()

  11. data.show(10)

  12. val Array(train, test) = data.randomSplit(Array(0.7,0.3))

  13. // 当特征值的不同个数大于4时,才认为该特征为连续型.

  14. val featureindexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(data)


  15. // 创建gbdt model,这里只设置了最大迭代次数,其他也可以通过.setXXX进行设置

  16. val gbdt = new GBTRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures").setMaxIter(10)


  17. // 在pipeline中进行设置

  18. val pipeline = new Pipeline().setStages(Array(featureindexer, gbdt))

  19. // 训练模型

  20. val model = pipeline.fit(train)

  21. // 测试集预测

  22. val predictions = model.transform(test)

  23. predictions.show(10)

  24. predictions.select("prediction","label","features").show(10)


模型评估

 
  1. def evaluteModel(predictions: DataFrame) = {

  2.      // 效果评估, 默认的支持:"mse", "rmse", "r2", "mae"

  3.      val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction")

  4.      val rmse = evaluator.setMetricName("rmse").evaluate(predictions)

  5.      println(s"Root Mean Squared Error(RMSE) on test data: $rmse")

  6.      val mse = evaluator.setMetricName("mse").evaluate(predictions)

  7.      println(s"Mean Squared Error(MSE) on test data: $mse")

  8.      val mae = evaluator.setMetricName("mae").evaluate(predictions)

  9.      println(s"Mean Absolute Error(MAE) on test data: $mae")

  10.      val r2 = evaluator.setMetricName("r2").evaluate(predictions)

  11.      println(s"Coefficient of Determination(r2) on test data: $r2")


  12.      // supports "areaUnderROC" (default), "areaUnderPR"

  13.      val evaluator_binary = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("prediction")

  14.      val auc = evaluator_binary.setMetricName("areaUnderROC").evaluate(predictions)

  15.      println(s"areaUnderROC: $auc")

  16.      val apr = evaluator_binary.setMetricName("areaUnderPR").evaluate(predictions)

  17.      println(s"areaUnderPR: $apr")


  18.      // supports "f1" (default), "weightedPrecision","weightedRecall", "accuracy")

  19.      val evaluator_mul = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction")

  20.      val f1 = evaluator_mul.setMetricName("f1").evaluate(predictions)

  21.      println(s"f1 is : $f1")

  22.      val precision = evaluator_mul.setMetricName("weightedPrecision").evaluate(predictions)

  23.      println(s"precision is : $precision")

  24.      val recall = evaluator_mul.setMetricName("weightedRecall").evaluate(predictions)

  25.      println(s"recall is : $recall")

  26.      val accuracy = evaluator_mul.setMetricName("accuracy").evaluate(predictions)

  27.      println(s"accuracy is : $accuracy")

  28.   }


其余相关代码可参考MLLib中的GBTs


本篇文章中涉及的代码可在github中找到:

https://github.com/Thinkgamer/Hadoop-Spark-Learning/tree/master/Spark



123本文转载自 https://mp.weixin.qq.com/s/2xBZS9pACQfZL1WWxN4oqQ

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
6月前
|
机器学习/深度学习 分布式计算 算法
Spark中的机器学习库MLlib是什么?请解释其作用和常用算法。
Spark中的机器学习库MLlib是什么?请解释其作用和常用算法。
110 0
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
17天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
分布式计算 算法 大数据
大数据Spark MLlib推荐算法
大数据Spark MLlib推荐算法
253 0
|
分布式计算 算法 Java
ALS算法 java spark rdd简单实现
ALS算法 java spark rdd简单实现
129 0
|
分布式计算 算法 搜索推荐
Java编写的Spark ALS协同过滤推荐算法的源代码能共享一下
Java编写的Spark ALS协同过滤推荐算法的源代码能共享一下
116 0
|
分布式计算 数据挖掘 大数据
Spark 入门_代码编写方式|学习笔记
快速学习 Spark 入门_代码编写方式
Spark 入门_代码编写方式|学习笔记
|
分布式计算 算法 搜索推荐
Spark实现协同过滤CF算法实践
UI矩阵–>II矩阵–>排序
175 0
Spark实现协同过滤CF算法实践
|
存储 分布式计算 搜索推荐
【Spark MLlib】(六)协同过滤 (Collaborative Filtering) 算法分析
【Spark MLlib】(六)协同过滤 (Collaborative Filtering) 算法分析
350 0
【Spark MLlib】(六)协同过滤 (Collaborative Filtering) 算法分析
|
SQL 分布式计算 Scala
Pandas vs Spark:获取指定列的N种方式
本篇继续Pandas与Spark常用操作对比系列,针对常用到的获取指定列的多种实现做以对比。 注:此处的Pandas特指DataFrame数据结构,Spark特指spark.sql下的DataFrame数据结构。
619 0
Pandas vs Spark:获取指定列的N种方式