协同过滤思想:
协同过滤算法是一种较为著名和常用的推荐算法,它基于对用户历史行为数据的挖掘发现用户的喜好偏向,并预测用户可能喜好的产品进行推荐。也就是常见的“猜你喜欢”,和“购买了该商品的人也喜欢”等功能。它的主要实现由:
●根据和你有共同喜好的人给你推荐
●根据你喜欢的物品给你推荐相似物品
●根据以上条件综合推荐
通过用户桥梁知道不同物品相关度:如正在访问itemA,看itemA又被哪些用户访问过,在进一步看这些用户历史上还看了哪些物品,比如物品B,C,D,经过倒排索引思路形成itemA=itemB,itemC,itemD,直接灌倒数据库(也可以通过切词获得的倒排表灌到数据库)。
协同过滤算法分类:
本文讨论以下两种情况:
1.基于用户协同过滤 User-based cf:朋友之间推荐
2.基于物品协同过滤 Item-based cf: 过去买过得东西推荐相关物品
喜欢程度,数越大,越喜欢。
User-Based CF:跟朋友之间,好友关系程度不一样,所以先算出用户相似关系程度再进行评分预测,预估打分。
item-based cf: 过去买过得东西推荐相关物品,构建物品与物品相似矩阵
一、MR实现协同过滤(开发代码多),分为3步map和reduce.
1、归一UI矩阵
1_gen_ui_map.py #!/usr/local/bin/python import sys for line in sys.stdin: #ss=line.strip().split(’\t’) ss=line.strip().split(’,’) if len(ss) != 3: continue u, i, s = ss print “%s\t%s\t%s” % (i, u, s) 1_gen_ui_reduce #!/usr/local/bin/python import sys import math cur_item = None user_score_list = [] for line in sys.stdin: item, user, score = line.strip().split("\t") if not cur_item: cur_item = item if item != cur_item: sum = 0.0 for tuple in user_score_list: (u, s) = tuple sum += pow(s, 2) sum = math.sqrt(sum) for tuple in user_score_list: (u, s) = tuple print “%s\t%s\t%s” % (u, cur_item, float(s / sum)) user_score_list = [] cur_item = item user_score_list.append((user, float(score))) sum = 0.0 for tuple in user_score_list: (u, s) = tuple sum += pow(s, 2) sum = math.sqrt(sum) for tuple in user_score_list: (u, s) = tuple print “%s\t%s\t%s” % (u, cur_item, float(s / sum))
2.两两取对
2_gen_ii_pair_map #!/usr/local/bin/python import sys for line in sys.stdin: u, i, s =line.strip().split(’\t’) print “%s\t%s\t%s” % (u, i, s) 2_gen_ii_pair_reduce #!/usr/local/bin/python import sys cur_user = None item_score_list = [] for line in sys.stdin: user, item, score = line.strip().split("\t") if not cur_user: cur_user = user if user != cur_user: for i in range(0, len(item_score_list) - 1): for j in range(i + 1, len(item_score_list)): item_a, score_a = item_score_list[i] item_b, score_b = item_score_list[j] print “%s\t%s\t%s” % (item_a, item_b, score_a * score_b) print “%s\t%s\t%s” % (item_b, item_a, score_a * score_b) item_score_list = [] cur_user = user item_score_list.append((item, float(score))) for i in range(0, len(item_score_list) - 1): for j in range(i + 1, len(item_score_list)): item_a, score_a = item_score_list[i] item_b, score_b = item_score_list[j] print “%s\t%s\t%s” % (item_a, item_b, score_a * score_b) print “%s\t%s\t%s” % (item_b, item_a, score_a * score_b
3.求和计算,相当wordcount
cat 3_sum_map.py #!/usr/local/bin/python import sys for line in sys.stdin: i_a, i_b, s = line.strip().split(’\t’) print “%s\t%s” % (i_a + “” + i_b, s) cat 3_sum_reduce.py #!/usr/local/bin/python import sys cur_ii_pair = None score = 0.0 for line in sys.stdin: ii_pair, s = line.strip().split("\t") if not cur_ii_pair: cur_ii_pair = ii_pair if ii_pair != cur_ii_pair: #item_a, item_b = cur_ii_pair.split(’’) ss = cur_ii_pair.split(’’) if len(ss) != 2: continue item_a, item_b = ss print “%s\t%s\t%s” % (item_a, item_b, score) cur_ii_pair = ii_pair score = 0.0 score += float(s) ss = cur_ii_pair.split(’’) if len(ss) != 2: sys.exit() item_a, item_b = ss print “%s\t%s\t%s” % (item_a, item_b, score)
二、spark协同过滤(代码易实现)
import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer import scala.math._ object cf { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster(“local[2]”) conf.setAppName(“CF Spark”) val sc = new SparkContext(conf) val lines = sc.textFile(args(0)) val output_path = args(1).toString val max_prefs_per_user = 20 val topn = 5 //Step 1. normalization val ui_rdd = lines.map { x => val ss = x.split("\t") val userid = ss(0).toString val itemid = ss(1).toString val score = ss(2).toDouble (userid, (itemid, score)) }.groupByKey().flatMap { x => val userid = x._1 val is_list = x._2 #is_arr 为 itemid,score val is_arr = is_list.toArray var is_arr_len = is_arr.length if (is_arr_len > max_prefs_per_user) { is_arr_len = max_prefs_per_user } var i_us_arr = new ArrayBuffer[(String, (String, Double))] for (i <- 0 until is_arr_len) { val itemid = is_arr(i)._1 val score = is_arr(i)._2 i_us_arr += ((itemid, (userid, score))) } i_us_arr }.groupByKey().flatMap { x => val itemid = x._1 val us_list = x._2 val us_arr = us_list.toArray var sum:Double = 0.0 for (i <- 0 until us_arr.length) { sum += pow(us_arr(i)._2, 2) } sum = sqrt(sum) var u_is_arr = new ArrayBuffer[(String, (String, Double))] for (i <- 0 until us_arr.length) { val userid = us_arr(i)._1 val score = us_arr(i)._2 / sum u_is_arr += ((userid, (itemid, score))) } u_is_arr }.groupByKey() //Step 2. pairs val pairs_rdd = ui_rdd.flatMap { x => val is_arr = x._2.toArray var ii_s_arr = new ArrayBuffer((String, String), Double) for (i <- 0 until is_arr.length - 1) { for (j <- i + 1 until is_arr.length) { val item_a = is_arr(i)._1 val item_b = is_arr(j)._1 val score_a = is_arr(i)._2 val score_b = is_arr(j)._2 ii_s_arr += (((item_a, item_b), score_a * score_b)) ii_s_arr += (((item_b, item_a), score_a * score_b)) } } ii_s_arr }.groupByKey() // Step3. sum and output pairs_rdd.map { x=> val ii_pair = x._1 val s_list = x._2 val s_arr = s_list.toArray val len = s_arr.length var score:Double = 0.0 for (i <- 0 until len){ score += s_arr(i) } val item_a = ii_pair._1 val item_b = ii_pair._2 (item_a, (item_b, score)) }.groupByKey().map { x=> val item_a = x._1 val rec_list = x.2 val rec_arr = rec_list.toArray.sortWith(._2 > _._2) var len = rec_arr.length #取前5 if (len > topn) { len = topn } val s = new StringBuilder for (i <- 0 until len){ val item = rec_arr(i)._1 val score = “%1.3f” format rec_arr(i)._2 s.append(item + “:” + score.toString) if (i != len -1) { s.append(",") } } item_a + “\t” + s }.saveAsTextFile(output_path) } }
三、Python实现(运用sklearn库)
import pandas as pd import numpy as np import warnings import random, math, os from tqdm import tqdm from sklearn.model_selection import train_test_split warnings.filterwarnings(‘ignore’)
评价指标
推荐系统推荐正确的商品数量占用户实际点击的商品数量
def Recall(Rec_dict, Val_dict): ‘’’ Rec_dict: 推荐算法返回的推荐列表, 形式:{uid: {item1, item2,…}, uid: {item1, item2,…}, …} Val_dict: 用户实际点击的商品列表, 形式:{uid: {item1, item2,…}, uid: {item1, item2,…}, …} ‘’’ hit_items = 0 all_items = 0 for uid, items in Val_dict.items(): rel_set = items rec_set = Rec_dict[uid] for item in rec_set: if item in rel_set: hit_items += 1 all_items += len(rel_set) return round(hit_items / all_items * 100, 2) #推荐系统推荐正确的商品数量占给用户实际推荐的商品数 def Precision(Rec_dict, Val_dict): ‘’’ Rec_dict: 推荐算法返回的推荐列表, 形式:{uid: {item1, item2,…}, uid: {item1, item2,…}, …} Val_dict: 用户实际点击的商品列表, 形式:{uid: {item1, item2,…}, uid: {item1, item2,…}, …} ‘’’ hit_items = 0 all_items = 0 for uid, items in Val_dict.items(): rel_set = items rec_set = Rec_dict[uid] for item in rec_set: if item in rel_set: hit_items += 1 all_items += len(rec_set) return round(hit_items / all_items * 100, 2) #所有被推荐的用户中,推荐的商品数量占这些用户实际被点击的商品数量 def Coverage(Rec_dict, Trn_dict): ‘’’ Rec_dict: 推荐算法返回的推荐列表, 形式:{uid: {item1, item2,…}, uid: {item1, item2,…}, …} Trn_dict: 训练集用户实际点击的商品列表, 形式:{uid: {item1, item2,…}, uid: {item1, item2,…}, …} ‘’’ rec_items = set() all_items = set() for uid in Rec_dict: for item in Trn_dict[uid]: all_items.add(item) for item in Rec_dict[uid]: rec_items.add(item) return round(len(rec_items) / len(all_items) * 100, 2) #使用平均流行度度量新颖度,如果平均流行度很高(即推荐的商品比较热门),说明推荐的新颖度比较低 def Popularity(Rec_dict, Trn_dict): ‘’’ Rec_dict: 推荐算法返回的推荐列表, 形式:{uid: {item1, item2,…}, uid: {item1, item2,…}, …} Trn_dict: 训练集用户实际点击的商品列表, 形式:{uid: {item1, item2,…}, uid: {item1, item2,…}, …} ‘’’ pop_items = {} for uid in Trn_dict: for item in Trn_dict[uid]: if item not in pop_items: pop_items[item] = 0 pop_items[item] += 1 pop, num = 0, 0 for uid in Rec_dict: for item in Rec_dict[uid]: pop += math.log(pop_items[item] + 1) # 物品流行度分布满足长尾分布,取对数可以使得平均值更稳定 num += 1 return round(pop / num, 3) #将几个评价指标指标函数一起调用 def rec_eval(val_rec_items, val_user_items, trn_user_items): print(‘recall:’,Recall(val_rec_items, val_user_items)) print(‘precision’,Precision(val_rec_items, val_user_items)) print(‘coverage’,Coverage(val_rec_items, trn_user_items)) print(‘Popularity’,Popularity(val_rec_items, trn_user_items)) def get_data(root_path): #读取数据 rnames = [‘user_id’,‘movie_id’,‘rating’,‘timestamp’] ratings = pd.read_csv(os.path.join(root_path, ‘ratings.dat’), sep=’::’, engine=‘python’, names=rnames) #分割训练和验证集 trn_data, val_data, _, _ = train_test_split(ratings, ratings, test_size=0.2) trn_data = trn_data.groupby(‘user_id’)[‘movie_id’].apply(list).reset_index() val_data = val_data.groupby(‘user_id’)[‘movie_id’].apply(list).reset_index() trn_user_items = {} val_user_items = {} #将数组构造成字典的形式{user_id: [item_id1, item_id2,…,item_idn]} for user, movies in zip(*(list(trn_data[‘user_id’]), list(trn_data[‘movie_id’]))): trn_user_items[user] = set(movies) for user, movies in zip(*(list(val_data[‘user_id’]), list(val_data[‘movie_id’]))): val_user_items[user] = set(movies) return trn_user_items, val_user_items def User_CF_Rec(trn_user_items, val_user_items, K, N): ‘’’ trn_user_items: 表示训练数据,格式为:{user_id1: [item_id1, item_id2,…,item_idn], user_id2…} val_user_items: 表示验证数据,格式为:{user_id1: [item_id1, item_id2,…,item_idn], user_id2…} K: K表示的是相似用户的数量,每个用户都选择与其最相似的K个用户 N: N表示的是给用户推荐的商品数量,给每个用户推荐相似度最大的N个商品 ‘’’ #建立item->users倒排表 .倒排表的格式为: {item_id1: {user_id1, user_id2, … , user_idn}, item_id2: …} 也就是每个item对应有那些用户有过点击 #建立倒排表的目的就是为了更好的统计用户之间共同交互的商品数量 print(‘建立倒排表…’) item_users = {} for uid, items in tqdm(trn_user_items.items()): # 遍历每一个用户的数据,其中包含了该用户所有交互的item for item in items: # 遍历该用户的所有item, 给这些item对应的用户列表添加对应的uid if item not in item_users: item_users[item] = set() item_users[item].add(uid) #计算用户协同过滤矩阵 #即利用item-users倒排表统计用户之间交互的商品数量,用户协同过滤矩阵的表示形式为:sim = {user_id1: {user_id2: num1}, user_id3:{user_id4: num2}, …} #协同过滤矩阵是一个双层的字典,用来表示用户之间共同交互的商品数量 #在计算用户协同过滤矩阵的同时还需要记录每个用户所交互的商品数量,其表示形式为: num = {user_id1:num1, user_id2:num2, …} sim = {} num = {} print(‘构建协同过滤矩阵…’) for item, users in tqdm(item_users.items()): # 遍历所有的item去统计,用户两辆之间共同交互的item数量 for u in users: if u not in num: # 如果用户u不在字典num中,提前给其在字典中初始化为0,否则后面的运算会报key error num[u] = 0 num[u] += 1 # 统计每一个用户,交互的总的item的数量 if u not in sim: # 如果用户u不在字典sim中,提前给其在字典中初始化为一个新的字典,否则后面的运算会报key error sim[u] = {} for v in users: if u != v: # 只有当u不等于v的时候才计算用户之间的相似度 if v not in sim[u]: sim[u][v] = 0 sim[u][v] += 1 #计算用户相似度矩阵 #用户协同过滤矩阵其实相当于是余弦相似度的分子部分,还需要除以分母,即两个用户分别交互的item数量的乘积 #两个用户分别交互的item数量的乘积就是上面统计的num字典 print(‘计算相似度…’) for u, users in tqdm(sim.items()): for v, score in users.items(): sim[u][v] = score / math.sqrt(num[u] * num[v]) # 余弦相似度分母部分 #对验证数据中的每个用户进行TopN推荐 #在对用户进行推荐之前需要先通过相似度矩阵得到与当前用户最相思的前K个用户, #然后对这K个用户交互的商品中除当前测试用户训练集中交互过的商品以外的商品计算最终的相似度分数 #最终推荐的候选商品的相似度分数是由多个用户对该商品分数的一个累加和 print(‘给测试用户进行推荐…’) items_rank = {} for u, _ in tqdm(val_user_items.items()): # 遍历测试集用户,给测试集中的每个用户进行推荐 items_rank[u] = {} # 初始化用户u的候选item的字典 for v, score in sorted(sim[u].items(), key=lambda x: x[1], reverse=True)[:K]: # 选择与用户u最相思的k个用户 for item in trn_user_items[v]: # 遍历相似用户之间交互过的商品 if item not in trn_user_items[u]: # 如果相似用户交互过的商品,测试用户在训练集中出现过,就不用进行推荐,直接跳过 if item not in items_rank[u]: items_rank[u][item] = 0 # 初始化用户u对item的相似度分数为0 items_rank[u][item] += score # 累加所有相似用户对同一个item的分数 print(‘为每个用户筛选出相似度分数最高的N个商品…’) items_rank = {k: sorted(v.items(), key=lambda x: x[1], reverse=True)[:N] for k, v in items_rank.items()} items_rank = {k: set([x[0] for x in v]) for k, v in items_rank.items()} # 将输出整合成合适的格式输出 return items_rank if name == “main”: root_path = ‘./data/ml-1m/’ trn_user_items, val_user_items = get_data(root_path) rec_items = User_CF_Rec(trn_user_items, val_user_items, 80, 10) rec_eval(rec_items, val_user_items, trn_user_items)