Spark实现协同过滤CF算法实践

简介: UI矩阵–>II矩阵–>排序

Spark编写Scala实现CF算法


UI矩阵–>II矩阵–>排序


package spark.example
import org.apache.spark._
import SparkContext._
import scala.collection.mutable.ArrayBuffer
import scala.math._
object CollaborativeFiltering {
  def main(args: Array[String]) {
    if (args.length != 5) {
        System.err.println("Usage: spark.example.CollaborativeFiltering <1:input> <2:output> <3:topn> <4:max_prefs_per_user> <5:score_threshold>")
        System.exit(1)
    }
    val conf = new SparkConf().setAppName("CollaborativeFiltering")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val lines = sc.textFile(args(0))
    val output_path = args(1).toString
    val topn = args(2).toInt
    val max_prefs_per_user = args(3).toInt
    val score_threshold = args(4).toDouble
    /*
     * Step 1.
     * Obtain UI Matrix:
     */
    val ui_rdd = lines.map { x =>
      val fields = x.split("  ")
      (fields(0).toString, (fields(1).toString, fields(2).toDouble))
    }.filter { x =>
      x._2._2 > score_threshold
    }.groupByKey(4).flatMap { x =>
      val user = x._1
      val is_list = x._2
      val is_arr = is_list.toArray
      var is_arr_len = is_arr.length
      if (is_arr_len > max_prefs_per_user) {
        is_arr_len = max_prefs_per_user
      }
      val i_us_arr = ArrayBuffer[(String, (String, Double))]()
      for (i <- 0 until is_arr_len) {
        i_us_arr += ((is_arr(i)._1, (user, is_arr(i)._2)))
      }
      i_us_arr
    }.groupByKey().flatMap { x =>
      val item = x._1
      val u_list = x._2
      val us_arr = u_list.toArray
      var sum: Double = 0
      for (i <- 0 until us_arr.length) {
        sum += pow(us_arr(i)._2, 2)
      }
      sum = sqrt(sum)
      val u_is_arr = ArrayBuffer[(String, (String, Double))]()
      for (i <- 0 until us_arr.length) {
        u_is_arr += ((us_arr(i)._1, (item, us_arr(i)._2 / sum)))
      }
      u_is_arr
    }.groupByKey().cache()
    /*
     * Step 2.
     * Obtain II Matrix:
     */
    val ii_rdd = ui_rdd.flatMap { x =>
      val is_arr = x._2.toArray.sortBy(_._1)
      val ii_s_arr = ArrayBuffer[((String, String), Double)]()
      for (i <- 0 until is_arr.length) {
        for (j <- (i + 1) until is_arr.length) {
          ii_s_arr += (((is_arr(i)._1, is_arr(j)._1), is_arr(i)._2 * is_arr(j)._2))
        }
      }
      ii_s_arr
    }.groupByKey().map { x =>
      val ii_pair = x._1
      val s_list = x._2
      val s_arr = s_list.toArray
      val len = s_arr.length
      var s:Double = 0.0
      for (i <- 0 until len) {
        s += s_arr(i)
      }
      (ii_pair._1, (ii_pair._2, s))
    }.flatMap { x =>
      val arr = ArrayBuffer[(String, (String, Double))]()
      arr += ((x._1, (x._2._1, x._2._2)))
      arr += ((x._2._1, (x._1, x._2._2)))
      arr
    }.groupByKey().map { x =>
      val bs_list = x._2
      val bs_arr = bs_list.toArray.sortWith(_._2 > _._2)
      var l = bs_arr.length
      if (l > topn) {
        l = topn
      }
      val s = new StringBuilder
      for (i <- 0 until l) {
        val score = "%1.8f" format bs_arr(i)._2
        val tmp_s = bs_arr(i)._1 + ":" + score
        s.append(tmp_s)
        if (i != (l - 1)) {
          s.append(",")
        }
      }
      x._1 + "\t" + "\t" + s
    }.saveAsTextFile(output_path)
  }
}

输出结果:

image.png

目录
相关文章
|
1月前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
58 4
|
1月前
|
机器学习/深度学习 搜索推荐 算法
协同过滤算法
协同过滤算法
76 0
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
3月前
|
搜索推荐 前端开发 数据可视化
【优秀python web毕设案例】基于协同过滤算法的酒店推荐系统,django框架+bootstrap前端+echarts可视化,有后台有爬虫
本文介绍了一个基于Django框架、协同过滤算法、ECharts数据可视化以及Bootstrap前端技术的酒店推荐系统,该系统通过用户行为分析和推荐算法优化,提供个性化的酒店推荐和直观的数据展示,以提升用户体验。
153 1
|
17天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
机器学习/深度学习 JSON 搜索推荐
深度学习的协同过滤的推荐算法-毕设神器
深度学习的协同过滤的推荐算法-毕设神器
41 4
|
1月前
|
机器学习/深度学习 算法 Python
探索机器学习中的决策树算法:从理论到实践
【10月更文挑战第5天】本文旨在通过浅显易懂的语言,带领读者了解并实现一个基础的决策树模型。我们将从决策树的基本概念出发,逐步深入其构建过程,包括特征选择、树的生成与剪枝等关键技术点,并以一个简单的例子演示如何用Python代码实现一个决策树分类器。文章不仅注重理论阐述,更侧重于实际操作,以期帮助初学者快速入门并在真实数据上应用这一算法。
|
1月前
|
机器学习/深度学习 人工智能 Rust
MindSpore QuickStart——LSTM算法实践学习
MindSpore QuickStart——LSTM算法实践学习
40 2
|
30天前
|
机器学习/深度学习 算法 数据建模
计算机前沿技术-人工智能算法-生成对抗网络-算法原理及应用实践
计算机前沿技术-人工智能算法-生成对抗网络-算法原理及应用实践
25 0