Spark机器学习3·推荐引擎(spark-shell)

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: ![](http://img3.douban.com/lpic/s28277325.jpg) [Spark机器学习](http://book.douban.com/subject/26593179/) ### 准备环境 - jblas https://gcc.


Spark机器学习

准备环境

git clone https://github.com/mikiobraun/jblas.git
cd jblas
mvn install

运行环境

cd /Users/erichan/Garden/spark-1.5.1-bin-cdh4

bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2

推荐引擎

1 提取有效特征

val PATH = "/Users/erichan/sourcecode/book/Spark机器学习"
val rawData = sc.textFile(PATH+"/ml-100k/u.data")
rawData.first()

res1: String = 196 242 3 881250949

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
val rawRatings=rawData.map(_.split("\t").take(3))
val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }
ratings.first()

res2: org.apache.spark.mllib.recommendation.Rating = Rating(196,242,3.0)

2 训练推荐模型

val model = ALS.train(ratings, 50, 10, 0.01) //rank=50, iterations=10, lambda=0.01
model.userFeatures.count

res3: Long = 943

model.productFeatures.count

res4: Long = 1682

3 使用模型

3.1.1 用户推荐
val predictedRating = model.predict(789, 123)

val userId = 789
val K = 10
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString("\n"))

Rating(789,176,5.732688958436494)
Rating(789,201,5.682340265545152)
Rating(789,182,5.5902224300291214)
Rating(789,183,5.5877871075408585)
Rating(789,96,5.4425266495153455)
Rating(789,76,5.39730369058763)
Rating(789,195,5.356822356978749)
Rating(789,589,5.1464233861748925)
Rating(789,134,5.109287533257644)
Rating(789,518,5.106161562126567)

3.1.2 校验推荐
val movies = sc.textFile(PATH+"/ml-100k/u.item")
val titles = movies.map(line => line.split("\\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
titles(123)
val moviesForUser = ratings.keyBy(_.user).lookup(789)
println(moviesForUser.size)

33

moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)

(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)

topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)

(Aliens (1986),5.732688958436494)
(Evil Dead II (1987),5.682340265545152)
(GoodFellas (1990),5.5902224300291214)
(Alien (1979),5.5877871075408585)
(Terminator 2: Judgment Day (1991),5.4425266495153455)
(Carlito's Way (1993),5.39730369058763)
(Terminator, The (1984),5.356822356978749)
(Wild Bunch, The (1969),5.1464233861748925)
(Citizen Kane (1941),5.109287533257644)
(Miller's Crossing (1990),5.106161562126567)

3.2.1 物品推荐
import org.jblas.DoubleMatrix
val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
    vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}
val itemId = 567
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)
cosineSimilarity(itemVector, itemVector)

res10: Double = 1.0

val sims = model.productFeatures.map{ case (id, factor) =>
    val factorVector = new DoubleMatrix(factor)
    val sim = cosineSimilarity(factorVector, itemVector)
    (id, sim)
}
val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
println(sortedSims.mkString("\n"))

(567,1.0)
(413,0.7309050775072655)
(895,0.6992030886048359)
(853,0.6960095521899471)
(219,0.6806270119940826)
(302,0.6757242121714326)
(257,0.6721490667554395)
(160,0.6672080746572076)
(563,0.6621573120106216)
(1019,0.6591520069387037)

3.2.2 校验推荐
println(titles(itemId))

Wes Craven's New Nightmare (1994)

val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("\n")

res13: String =
(Tales from the Crypt Presents: Bordello of Blood (1996),0.7309050775072655)
(Scream 2 (1997),0.6992030886048359)
(Braindead (1992),0.6960095521899471)
(Nightmare on Elm Street, A (1984),0.6806270119940826)
(L.A. Confidential (1997),0.6757242121714326)
(Men in Black (1997),0.6721490667554395)
(Glengarry Glen Ross (1992),0.6672080746572076)
(Stephen King's The Langoliers (1995),0.6621573120106216)
(Die xue shuang xiong (Killer, The) (1989),0.6591520069387037)
(Evil Dead II (1987),0.655134288821937)

4 模型效果评估

4.1 均方差(Mean Squared Error,MSE)
val actualRating = moviesForUser.take(1)(0)
val predictedRating = model.predict(789, actualRating.product)
val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)
val usersProducts = ratings.map{ case Rating(user, product, rating)  => (user, product)}
val predictions = model.predict(usersProducts).map{
    case Rating(user, product, rating) => ((user, product), rating)
}
val ratingsAndPredictions = ratings.map{
    case Rating(user, product, rating) => ((user, product), rating)
}.join(predictions)
val MSE = ratingsAndPredictions.map{
    case ((user, product), (actual, predicted)) =>  math.pow((actual - predicted), 2)
}.reduce(_ + _) / ratingsAndPredictions.count
println("Mean Squared Error = " + MSE)

Mean Squared Error = 0.08527363423596633

val RMSE = math.sqrt(MSE)
println("Root Mean Squared Error = " + RMSE)

Root Mean Squared Error = 0.2920164965134099

4.2 K值平均准确率(MAPK)
def avgPrecisionK(actual: Seq[Int], predicted: Seq[Int], k: Int): Double = {
  val predK = predicted.take(k)
  var score = 0.0
  var numHits = 0.0
  for ((p, i) <- predK.zipWithIndex) {
    if (actual.contains(p)) {
      numHits += 1.0
      score += numHits / (i.toDouble + 1.0)
    }
  }
  if (actual.isEmpty) {
    1.0
  } else {
    score / scala.math.min(actual.size, k).toDouble
  }
}
val actualMovies = moviesForUser.map(_.product)
val predictedMovies = topKRecs.map(_.product)
val apk10 = avgPrecisionK(actualMovies, predictedMovies, 10)
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)

(1682,50)

val imBroadcast = sc.broadcast(itemMatrix)
val allRecs = model.userFeatures.map{ case (userId, array) =>
  val userVector = new DoubleMatrix(array)
  val scores = imBroadcast.value.mmul(userVector)
  val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
  val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
  (userId, recommendedIds)
}
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)
val K = 10
val MAPK = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
  val actual = actualWithIds.map(_._2).toSeq
  avgPrecisionK(actual, predicted, K)
}.reduce(_ + _) / allRecs.count
println("Mean Average Precision at K = " + MAPK)

Mean Average Precision at K = 0.030001472840815356

4.3 MLib内置评估函数·RMSE和MSE
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
println("Mean Squared Error = " + regressionMetrics.meanSquaredError)

Mean Squared Error = 0.08527363423596633

println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)

Root Mean Squared Error = 0.2920164965134099

4.4 MLib内置评估函数·MAP(平均准确率)
import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
  val actual = actualWithIds.map(_._2)
  (predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)

Mean Average Precision = 0.07208991526855565

val MAPK2000 = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
  val actual = actualWithIds.map(_._2).toSeq
  avgPrecisionK(actual, predicted, 2000)
}.reduce(_ + _) / allRecs.count
println("Mean Average Precision = " + MAPK2000)

Mean Average Precision = 0.07208991526855561

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
6月前
|
机器学习/深度学习 分布式计算 算法
Spark中的机器学习库MLlib是什么?请解释其作用和常用算法。
Spark中的机器学习库MLlib是什么?请解释其作用和常用算法。
110 0
|
6月前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
115 6
|
6月前
|
机器学习/深度学习 分布式计算 算法
【机器学习】Spark ML 对数据特征进行 One-Hot 编码
One-Hot 编码是机器学习中将离散特征转换为数值表示的方法,每个取值映射为一个二进制向量,常用于避免特征间大小关系影响模型。Spark ML 提供 OneHotEncoder 进行编码,输入输出列可通过 `inputCol` 和 `outputCol` 参数设置。在示例中,先用 StringIndexer 对类别特征编码,再用 OneHotEncoder 转换,最后展示编码结果。注意 One-Hot 编码可能导致高维问题,可结合实际情况选择编码方式。
78 6
|
5月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
39 0
|
6月前
|
机器学习/深度学习 分布式计算 算法
使用Spark进行机器学习
【5月更文挑战第2天】使用Spark进行机器学习
69 2
|
6月前
|
机器学习/深度学习 分布式计算 算法
Spark MLlib简介与机器学习流程
Spark MLlib简介与机器学习流程
|
4天前
|
机器学习/深度学习 人工智能 算法
【手写数字识别】Python+深度学习+机器学习+人工智能+TensorFlow+算法模型
手写数字识别系统,使用Python作为主要开发语言,基于深度学习TensorFlow框架,搭建卷积神经网络算法。并通过对数据集进行训练,最后得到一个识别精度较高的模型。并基于Flask框架,开发网页端操作平台,实现用户上传一张图片识别其名称。
17 0
【手写数字识别】Python+深度学习+机器学习+人工智能+TensorFlow+算法模型
|
1月前
|
机器学习/深度学习 人工智能 自然语言处理
【MM2024】阿里云 PAI 团队图像编辑算法论文入选 MM2024
阿里云人工智能平台 PAI 团队发表的图像编辑算法论文在 MM2024 上正式亮相发表。ACM MM(ACM国际多媒体会议)是国际多媒体领域的顶级会议,旨在为研究人员、工程师和行业专家提供一个交流平台,以展示在多媒体领域的最新研究成果、技术进展和应用案例。其主题涵盖了图像处理、视频分析、音频处理、社交媒体和多媒体系统等广泛领域。此次入选标志着阿里云人工智能平台 PAI 在图像编辑算法方面的研究获得了学术界的充分认可。
【MM2024】阿里云 PAI 团队图像编辑算法论文入选 MM2024
|
25天前
|
机器学习/深度学习 算法 Java
机器学习、基础算法、python常见面试题必知必答系列大全:(面试问题持续更新)
机器学习、基础算法、python常见面试题必知必答系列大全:(面试问题持续更新)
|
1月前
|
机器学习/深度学习 人工智能 算法
【玉米病害识别】Python+卷积神经网络算法+人工智能+深度学习+计算机课设项目+TensorFlow+模型训练
玉米病害识别系统,本系统使用Python作为主要开发语言,通过收集了8种常见的玉米叶部病害图片数据集('矮花叶病', '健康', '灰斑病一般', '灰斑病严重', '锈病一般', '锈病严重', '叶斑病一般', '叶斑病严重'),然后基于TensorFlow搭建卷积神经网络算法模型,通过对数据集进行多轮迭代训练,最后得到一个识别精度较高的模型文件。再使用Django搭建Web网页操作平台,实现用户上传一张玉米病害图片识别其名称。
54 0
【玉米病害识别】Python+卷积神经网络算法+人工智能+深度学习+计算机课设项目+TensorFlow+模型训练