Spark高级数据分析· 3推荐引擎

简介: ![](http://img3.douban.com/lpic/s28323514.jpg) ### 推荐算法流程 [推荐算法](http://www.atatech.org/articles/48372) ### 预备 ``` wget http://www.

推荐算法流程

推荐算法

预备

wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz

cd /Users/erichan/garden/spark-1.6.0-bin-hadoop2.6/bin
./spark-shell --master local --driver-memory 6g

1 准备数据

val data ="/Users/erichan/AliDrive/ml_spark/data/profiledata_06-May-2005"
val rawUserArtistData = sc.textFile(data+"/user_artist_data.txt",10)
// ALS 需要ID必须为数值型
rawUserArtistData.first
//res3: String = 1092764  1000311
//rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
//res10: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000)
//rawUserArtistData.map(_.split(' ')(1).toDouble).stats()
//res11: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)

val rawArtistData = sc.textFile(data+"/artist_data.txt")
//rawArtistData.first
//res12: String = 1134999    06Crazy Life
val artistByID = rawArtistData.flatMap { line =>
  val (id, name) = line.span(_ != '\t')
  if (name.isEmpty) {
    None
  }else{
    try {
      Some((id.toInt, name.trim))
    } catch {
      case e: NumberFormatException => None
    }
  }
}

val rawArtistAlias = sc.textFile(data+"/artist_alias.txt")
val artistAlias = rawArtistAlias.flatMap { line =>
  val tokens = line.split('\t')
  if (tokens(0).isEmpty) {
    None
  }else{
    Some((tokens(0).toInt, tokens(1).toInt))
  }
}.collectAsMap()
//artistByID.lookup(1000010).head
//res14: String = Aerosmith       

2 建模

import org.apache.spark.mllib.recommendation._
val bArtistAlias = sc.broadcast(artistAlias)
val trainData = rawUserArtistData.map { line =>
  val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
  val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
  Rating(userID, finalArtistID, count)
}.cache()
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

3 检验

val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).filter {
  case Array(user,_,_) => user.toInt == 2093760
}

val existingProducts = rawArtistsForUser.map {
  case Array(_,artist,_) => artist.toInt
}.collect().toSet

artistByID.filter {
  case (id, name) => existingProducts.contains(id)
}.values.collect().foreach(println)

val recommendations = model.recommendProducts(2093760, 5)
recommendations.foreach(println)

val recommendedProductIDs = recommendations.map(_.product).toSet
artistByID.filter {
  case (id, name) => recommendedProductIDs.contains(id)
}.values.collect().foreach(println)

4 评价

:load /Users/erichan/sourcecode/book/aas/ch03-recommender/src/main/scala/RunAUC.scala
val bArtistAlias = sc.broadcast(RunAUC.buildArtistAlias(rawArtistAlias))
val allData = RunAUC.buildRatings(rawUserArtistData, bArtistAlias)
val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()

val allItemIDs = allData.map(_.product).distinct().collect()
val bAllItemIDs = sc.broadcast(allItemIDs)

val mostListenedAUC = RunAUC.areaUnderCurve(cvData, bAllItemIDs, RunAUC.predictMostListened(sc, trainData))
println(mostListenedAUC)
//0.9395286660878177
trainData.unpersist()
cvData.unpersist()

5 推荐

val someUsers = allData.map(_.user).distinct().take(100)
val someRecommendations = someUsers.map(userID => model.recommendProducts(userID, 5))
someRecommendations.map(
  recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ")
).foreach(println)

附录

RunAUC.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.recommendation._
import org.apache.spark.rdd.RDD

import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

/**
  * Created by erichan
  * on 16/1/26.
  */
object RunAUC {
  def areaUnderCurve(
                      positiveData: RDD[Rating],
                      bAllItemIDs: Broadcast[Array[Int]],
                      predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = {
    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".

    // Take held-out data as the "positive", and map to tuples
    val positiveUserProducts = positiveData.map(r => (r.user, r.product))
    // Make predictions for each of them, including a numeric score, and gather by user
    val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user)

    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.

    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other items, excluding those that are "positive" for the user.
    val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions {
      // mapPartitions operates on many (user,positive-items) pairs at once
      userIDAndPosItemIDs => {
        // Init an RNG and the item IDs set once for partition
        val random = new Random()
        val allItemIDs = bAllItemIDs.value
        userIDAndPosItemIDs.map { case (userID, posItemIDs) =>
          val posItemIDSet = posItemIDs.toSet
          val negative = new ArrayBuffer[Int]()
          var i = 0
          // Keep about as many negative examples per user as positive.
          // Duplicates are OK
          while (i < allItemIDs.size && negative.size < posItemIDSet.size) {
            val itemID = allItemIDs(random.nextInt(allItemIDs.size))
            if (!posItemIDSet.contains(itemID)) {
              negative += itemID
            }
            i += 1
          }
          // Result is a collection of (user,negative-item) tuples
          negative.map(itemID => (userID, itemID))
        }
      }
    }.flatMap(t => t)
    // flatMap breaks the collections above down into one big set of tuples

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user)

    // Join positive and negative by user
    positivePredictions.join(negativePredictions).values.map {
      case (positiveRatings, negativeRatings) =>
        // AUC may be viewed as the probability that a random positive item scores
        // higher than a random negative one. Here the proportion of all positive-negative
        // pairs that are correctly ranked is computed. The result is equal to the AUC metric.
        var correct = 0L
        var total = 0L
        // For each pairing,
        for (positive <- positiveRatings;
             negative <- negativeRatings) {
          // Count the correctly-ranked pairs
          if (positive.rating > negative.rating) {
            correct += 1
          }
          total += 1
        }
        // Return AUC: fraction of pairs ranked correctly
        correct.toDouble / total
    }.mean() // Return mean AUC over users
  }

  def predictMostListened(sc: SparkContext, train: RDD[Rating])(allData: RDD[(Int,Int)]) = {
    val bListenCount =
      sc.broadcast(train.map(r => (r.product, r.rating)).reduceByKey(_ + _).collectAsMap())
    allData.map { case (user, product) =>
      Rating(user, product, bListenCount.value.getOrElse(product, 0.0))
    }
  }

  def buildArtistAlias(rawArtistAlias: RDD[String]): Map[Int,Int] =
    rawArtistAlias.flatMap { line =>
      val tokens = line.split('\t')
      if (tokens(0).isEmpty) {
        None
      } else {
        Some((tokens(0).toInt, tokens(1).toInt))
      }
    }.collectAsMap()

  def buildRatings(
                    rawUserArtistData: RDD[String],
                    bArtistAlias: Broadcast[Map[Int,Int]]) = {
    rawUserArtistData.map { line =>
      val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
      val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID)
      Rating(userID, finalArtistID, count)
    }
  }
}
目录
相关文章
|
2月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
52 2
|
4月前
|
分布式计算 大数据 Apache
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
【大数据技术】流数据、流计算、Spark Streaming、DStream的讲解(图文解释 超详细)
62 0
|
4月前
|
SQL 分布式计算 数据挖掘
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
73 0
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
76 1
|
1月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
107 2
|
3月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
3月前
|
JSON 分布式计算 关系型数据库
Spark中使用DataFrame进行数据转换和操作
Spark中使用DataFrame进行数据转换和操作
|
3月前
|
存储 分布式计算 调度
Spark任务调度与数据本地性
Spark任务调度与数据本地性
|
4月前
|
分布式计算 Java Spark
Spark Driver和Executor数据传递使用问题
Spark Driver和Executor数据传递使用问题
30 0
|
4月前
|
SQL 分布式计算 Apache
流数据湖平台Apache Paimon(六)集成Spark之DML插入数据
流数据湖平台Apache Paimon(六)集成Spark之DML插入数据
77 0