基于Spark的电影推荐系统实现

简介: 基于Spark的电影推荐系统实现

一、业务场景

受某视频网站委托,需开发一套基于Spark的大数据机器学习系统,应用协同过滤算法对网站客户进行电影推荐。


二、数据集说明

本案例所使用的数据集说明如下:

 评分数据集:/data/dataset/ml/movielens/ratings.csv

 电影数据集:/data/dataset/ml/movielens/movies.csv


三、操作步骤

阶段一、启动HDFS、Spark集群服务和zeppelin服务器

1、启动HDFS集群

 在Linux终端窗口下,输入以下命令,启动HDFS集群:

1.  $ start-dfs.sh

2、启动Spark集群

 在Linux终端窗口下,输入以下命令,启动Spark集群:

1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh

3、启动zeppelin服务器

 在Linux终端窗口下,输入以下命令,启动zeppelin服务器:

1.  $ zeppelin-daemon.sh start

4、验证以上进程是否已启动

 在Linux终端窗口下,输入以下命令,查看启动的服务进程:

1.  $ jps

如果显示以下6个进程,则说明各项服务启动正常,可以继续下一阶段。

1.  2288 NameNode
2.  2402 DataNode
3.  2603 SecondaryNameNode
4.  2769 Master
5.  2891 Worker
6.  2984 ZeppelinServer

阶段二、准备案例中用到的数据集

1、将本案例要用到的数据集上传到HDFS文件系统的”/data/dataset/ml/“目录下。在Linux终端窗口下,输入以下命令:

1.  $ hdfs dfs -mkdir -p /data/dataset/ml
2.  $ hdfs dfs -put /data/dataset/ml/movielens /data/dataset/ml/

2、在Linux终端窗口下,输入以下命令,查看HDFS上是否已经上传了该数据集:

1.  $ hdfs dfs -ls /data/dataset/ml/movielens

这时应该看到movielens目录及其中的训练数据集已经上传到了HDFS的”/data/dataset/ml/“目录下。


阶段三、对数据集进行探索和分析

1、新建一个zeppelin notebook文件,并命名为movie_project。

 2、先导入案例中要用到的机器学习库。在notebook单元格中,输入以下代码:

1.  // 导入相关的包
2.  import org.apache.spark.mllib.evaluation.RankingMetrics
3.  import org.apache.spark.ml.evaluation.RegressionEvaluator
4.  import org.apache.spark.ml.recommendation.ALS
5.  import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
6.  import org.apache.spark.sql.functions._
7.  import org.apache.spark.sql.types._
8.  import spark.implicits._

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.sql.functions.
import org.apache.spark.sql.types.
import spark.implicits._

3、加载数据集。在notebook单元格中,输入以下代码:

1.  // 加载数据。因为不需要timestamp列,因此立即删除它
2.  val file = "hdfs://localhost:9000/data/dataset/ml/movielens/ratings.csv"
3.       
4.  val ratingsDF1 = spark.read.option("header", "true").
5.                              option("inferSchema", "true").
6.                              csv(file).
7.                              drop("timestamp")
8.       
9.  ratingsDF1.count
10. ratingsDF1.printSchema

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

file: String = /data/spark_demo/movielens/ratings.csv
ratingsDF1: org.apache.spark.sql.DataFrame = [userId: int, movieId: int … 1 more field]
res97: Long = 100836
root
  |— userId: integer (nullable = true)
  |— movieId: integer (nullable = true)
  |— rating: double (nullable = true)

由以上输出内容可以看出,该数据集共有3个字段,分别是用户id、电影id和该用户对该电影的评分。


4、查看前5条数据。在notebook单元格中,输入以下代码:

1.  ratingsDF1.show(5)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

274f8ef7ebda49b59c66adf8143e00ed.png


5、对数据进行简单统计。在notebook单元格中,输入以下代码:

1.  // 看看被评分的电影总共有多少部:
2.  ratingsDF1.select("movieId").distinct().count
3.       
4.  // 看看有多少用户参与评分:
5.  ratingsDF1.select("userId").distinct().count

同时按下Shift+Enter键,执行以上代码,输出内容如下:

Long = 9724
Long = 610

由以上输出内容可以看出,该评分数据集中,参与的用户有610名,被评论的电影有9724部。


6、快速检查谁是活跃的电影评分者。在notebook单元格中,输入以下代码:

1.  // 快速检查谁是活跃的电影评分者
2.  val ratingsByUserDF = ratingsDF1.groupBy("userId").count()
3.  ratingsByUserDF.orderBy($"count".desc).show(10)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

2d05849c03b746509271539b0e6bed32.png


由以上输出内容可以看出,参与度最高用的用户其id是414。


7、分析每部电影的最大评分数量。在notebook单元格中,输入以下代码:

1.  // 分析每部电影的最大评分数量
2.  val ratingsByMovieDF = ratingsDF1.groupBy("movieId").count()
3.  ratingsByMovieDF.orderBy($"count".desc).show(10)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

058085cb7ac147378074001a9132c624.png


由以上输出内容可以看出,评价数超过300的电影有三部,其中评价数最多的电影其id是356。


8、数据拆分,将原始数据集拆分为训练集和测试集,其中训练集占80%,测试集占20%。在notebook单元格中,输入以下代码:

1.  // 为训练和测试准备数据
2.  val Array(trainingData, testData) = ratingsDF1.randomSplit(Array(0.8, 0.2))

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:


trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [userId: int, movieId: int … 1 more field]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [userId: int, movieId: int … 1 more field]

9、Spark实现了ALS算法(Alternating Least Square),这一步建立ALS的实例。在notebook单元格中,输入以下代码:

1.  // 建立ALS的实例
2.  val als = new ALS().setRank(12).
3.                      setMaxIter(10).
4.                      setRegParam(0.03).
5.                      setUserCol("userId").
6.                      setItemCol("movieId").
7.                      setRatingCol("rating")

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

als: org.apache.spark.ml.recommendation.ALS = als_10a2c5c69e40

10、训练模型,并设置模型的冷启动策略。在notebook单元格中,输入以下代码:

1.  // 训练模型
2.  val model = als.fit(trainingData)
3.       
4.  // 从Spark 2.2.0开始,可以将coldStartStrategy参数设置为drop,以便删除包含NaN值的预测的DataFrame中的任何行。
5.  // 然后将在非nan数据上计算评估度量,该度量将是有效的。
6.  model.setColdStartStrategy("drop")

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:


model: org.apache.spark.ml.recommendation.ALSModel = als_10a2c5c69e40
res169: model.type = als_10a2c5c69e40

11、执行预测,并查看预测结果。在notebook单元格中,输入以下代码:

1.  // 执行预测
2.  val predictions = model.transform(testData)
3.       
4.  // 查看预测结果
5.  predictions.sort("userId").show(10)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

4dbf6af0c4fc40268b1cb6971eb4d1e8.png


由以上输出内容可以看出,rating列为标签列,prediction为预测结果列。


12、有的预测值为NaN(非数字),这会影响到rmse的计算,因些需要先删除结果集中的NaN值。在notebook单元格中,输入以下代码:


1.  val predictions_dropNaN = predictions.na.drop(Array("prediction"))
2.  predictions_dropNaN.count

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

predictions_dropNaN: org.apache.spark.sql.DataFrame = [userId: int, movieId: int … 2 more fields]
res178: Long = 19333

由以上输出内容可以看出,删除prediction列具有NaN值的记录以后,结果集中还有19333条记录。


13、设置一个评估器(evaluator)来计算RMSE度量指标。在notebook单元格中,输入以下代码:

1.  // 设置一个评估器(evaluator)来计算RMSE度量指标
2.  val evaluator = new RegressionEvaluator().setMetricName("rmse").
3.                                            setLabelCol("rating").
4.                                            setPredictionCol("prediction")
5.       
6.  val rmse = evaluator.evaluate(predictions_dropNaN)
7.  println(s"Root-mean-square error = ${rmse}")

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:


evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_7943cc497104
rmse: Double = 1.017470307395966
Root-mean-square error = 1.017470307395966

由以上输出内容可以看出,根均方差(rmse)值为

1.017470307395966

14、使用ALSModel来执行推荐。在notebook单元格中,输入以下代码:

1.  // 为所有用户推荐排名前五的电影
2.  model.recommendForAllUsers(5).show(false)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:


c4de20ce65374258890dcfec14e49c03.png


15、为每部电影推荐top 3个用户。在notebook单元格中,输入以下代码:

1.  // 为每部电影推荐top 3个用户
2.  val recMovies = model.recommendForAllItems(3)
3.  recMovies.show(5,false)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:


b91332055b8f4bff8b2376c232cae99c.png

由以上输出内容可以看出,为每部电影推荐前3个用户。但是不好的一点是,我们只能看到电影的id,而不是电影的名称。


16、读取电影数据集,这样就能看到电影标题了。在notebook单元格中,输入以下代码:

1.  // 读取电影数据集,这样就能看到电影标题了
2.  val moviesFile = "hdfs://localhost:9000/data/dataset/ml/movielens/movies.csv"
3.  val moviesDF = spark.read.option("header", "true").option("inferSchema", "true").csv(moviesFile)
4.       
5.  val recMoviesWithInfoDF = recMovies.join(moviesDF, "movieId")
6.  recMoviesWithInfoDF.select("movieId", "title", "recommendations").show(5, false)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:


a7a61fdd01e54354a25eeebafd794eb7.png

由以上输出内容可以看出,现在每部电影的id和名称都是可见的了。


17、使用CrossValidator对ALS模型进行调优。在notebook单元格中,输入以下代码:

1.  // 使用CrossValidator对ALS模型进行调优
2.  val paramGrid = new ParamGridBuilder()
3.                      .addGrid(als.regParam, Array(0.05, 0.15))
4.                      .addGrid(als.rank, Array(12,20))
5.                      .build
6.       
7.  val crossValidator=new CrossValidator()
8.                          .setEstimator(als)
9.                          .setEvaluator(evaluator)
10.                         .setEstimatorParamMaps(paramGrid)
11.                         .setNumFolds(3)
12.      
13. // 打印出4个超参数组合
14. crossValidator.getEstimatorParamMaps.foreach(println)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
  als_10a2c5c69e40-rank: 12,
  als_10a2c5c69e40-regParam: 0.05
}, {
  als_10a2c5c69e40-rank: 20,
  als_10a2c5c69e40-regParam: 0.05
}, {
  als_10a2c5c69e40-rank: 12,
  als_10a2c5c69e40-regParam: 0.15
}, {
  als_10a2c5c69e40-rank: 20,
  als_10a2c5c69e40-regParam: 0.15
})
crossValidator: org.apache.spark.ml.tuning.CrossValidator = cv_efc8cf15a3ab
{
  als_10a2c5c69e40-rank: 12,
  als_10a2c5c69e40-regParam: 0.05
}
{
  als_10a2c5c69e40-rank: 20,
  als_10a2c5c69e40-regParam: 0.05
}
{
  als_10a2c5c69e40-rank: 12,
  als_10a2c5c69e40-regParam: 0.15
}
{
  als_10a2c5c69e40-rank: 20,
  als_10a2c5c69e40-regParam: 0.15
}

由以上输出内容可以看出,共打印了四组参数组合。


18、使用找到的最优模型来再次进行预测,并对预测结果进行评估。在notebook单元格中,输入以下代码:

1.  //这需要一段时间才能完成超过10个实验
2.  val cvModel = crossValidator.fit(trainingData)
3.  
4.  // 执行预测并删除空值
5.  val predictions2 = cvModel.transform(testData).na.drop
6.       
7.  val evaluator2 = new RegressionEvaluator()
8.                        .setMetricName("rmse")
9.                        .setLabelCol("rating")
10.                       .setPredictionCol("prediction")
11.      
12. val rmse2 = evaluator2.evaluate(predictions2)

同时按下【Shift+Enter】键,执行以上代码,输出内容如下:

cvModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_efc8cf15a3ab
predictions2: org.apache.spark.sql.DataFrame = [userId: int, movieId: int … 2 more fields]
evaluator2: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_4dd08e13c0e9
rmse2: Double = 0.9471342462991672
相关文章
|
16天前
|
机器学习/深度学习 数据采集 搜索推荐
利用Python和机器学习构建电影推荐系统
利用Python和机器学习构建电影推荐系统
32 1
|
5月前
|
搜索推荐 算法 小程序
基于Java协同过滤算法的电影推荐系统设计和实现(源码+LW+调试文档+讲解等)
基于Java协同过滤算法的电影推荐系统设计和实现(源码+LW+调试文档+讲解等)
|
1月前
|
搜索推荐 关系型数据库 数据库
#646253#基于django/neo4j的电影打分推荐系统
#646253#基于django/neo4j的电影打分推荐系统
18 0
|
3月前
|
存储 分布式计算 资源调度
Hadoop生态系统概览:从HDFS到Spark
【8月更文第28天】Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它由多个组件构成,旨在提供高可靠性、高可扩展性和成本效益的数据处理解决方案。本文将介绍Hadoop的核心组件,包括HDFS、MapReduce、YARN,并探讨它们如何与现代大数据处理工具如Spark集成。
265 0
|
3月前
|
分布式计算 Java Linux
【Deepin 20系统】Linux 系统安装Spark教程及使用
在Deepin 20系统上安装和使用Apache Spark的详细教程,包括安装Java JDK、下载和解压Spark安装包、配置环境变量和Spark配置文件、启动和关闭Spark集群的步骤,以及使用Spark Shell和PySpark进行简单操作的示例。
68 0
|
5月前
|
搜索推荐 算法 Java
基于SpringBoot+Vue电影推荐系统设计和实现(源码+LW+调试文档+讲解等)
基于SpringBoot+Vue电影推荐系统设计和实现(源码+LW+调试文档+讲解等)
|
5月前
|
机器学习/深度学习 分布式计算 算法
基于Spark中随机森林模型的天气预测系统
基于Spark中随机森林模型的天气预测系统
130 1
|
4月前
|
JavaScript Java 测试技术
基于springboot+vue.js+uniapp小程序的协同过滤电影推荐系统附带文章源码部署视频讲解等
基于springboot+vue.js+uniapp小程序的协同过滤电影推荐系统附带文章源码部署视频讲解等
47 0
|
5月前
|
JavaScript Java 测试技术
基于springboot+vue.js+uniapp小程序的个性化电影推荐系统附带文章源码部署视频讲解等
基于springboot+vue.js+uniapp小程序的个性化电影推荐系统附带文章源码部署视频讲解等
47 0
基于springboot+vue.js+uniapp小程序的个性化电影推荐系统附带文章源码部署视频讲解等
|
6月前
|
搜索推荐 算法 Java
电影推荐|基于SSM+vue的电影推荐系统的设计与实现(源码+数据库+文档)
电影推荐|基于SSM+vue的电影推荐系统的设计与实现(源码+数据库+文档)
133 3
下一篇
无影云桌面