一、推荐项目流程
1.预处理阶段
对三个数据进行预处理,合并用户与物品相关信息,处理后的merge_base.data数据字段包含itemid、userid、用户信息(年龄、性别、收入、地区)、物品信息(名字、描述、时长、标签)、用户行为数据(收听时长)等。
2.粗排召回阶段
使用CB算法,基于内容进行jieba中文分词,计算itemid对应分词的tfidf分数,整理训练数据;使用mr协同过滤进行相关性计算,训练得到物品之间对应分数item-item能得到II矩阵;而CF算法则通过协同过滤将UI矩阵转成II矩阵。最后,将格式化后数据结果按k/v形式批量灌入redis数据库。
3.精排阶段
利用LR算法进行推荐排序,得到权重w、b用于模型构建rank_model。结合用户与物品标签获取用户与物品特征训练数据。
4.推荐流程阶段
加载特征数据及排序模型,检索redis数据库获取候选集,利用逻辑回归sigmoid函数打分并排序,最终利用可视化页面实现itemid->name进行top10评分相关推荐。
二、推荐系统思路框架
流程思路:
1、数据预处理(用户画像数据、物品元数据、用户行为数据)
2、召回(CB、CF算法)
3、LR训练模型的数据准备,即用户特征数据,物品特征数据
4、模型准备,即通过LR算法训练模型数据得到w,b
5、推荐系统流程:初始化加载data,获取user_feature.data,item_feature.data特征 ,用字典维护。
(1)解析请求:userid,itemid
(2)加载模型:加载排序模型(model.w,model.b)
(3)检索候选集合:利用cb,cf去redis里面检索数据库,得到候选集合
(4)获取用户特征user_feature.data :userid
(5)获取物品特征item_feature.data :itemid
(6)打分(逻辑回归函数sigmoid: 1 / (1 + exp(-wx))),排序
(7)top-n过滤(精排)
(8)数据包装(itemid->name),返回
数据集: 默认以’\001’分割
1.用户画像数据(user_profile.data)
字段:userid, gender, age, salary, location
userid,性别,年龄,收入,地域 00dee4bd83b6c115b865a64e92745000,男,26-35,10000-20000,台湾 000798c9a4cab6d6b3065e287d917000,女,36-45,5000-10000,陕西 00a2fe08da2621d2e9dd3ff7f89e7000,女,19-25,20000-100000,宁夏 009cb9b50e1c38e6e8d70b9accc08000,男,26-35,10000-20000,新疆 0182020bfb3e321a852abc857e2c9000,男,46-100,10000-20000,山西
2.物品元数据(music_meta)
字段:itemid, name, desc, total_timelen, location, tags
itemid,name,内容 ,时长,地域,标签 1084709100 outta my head 周杰伦MV-范特西 227 周杰伦MV 1119509100 GEE 少女时代 舞蹈,舞蹈教学 197 舞蹈,舞蹈教学,流行 1126809100 SPICA - You Don't Love Me 敬请期待先上预告 24 他不爱我预告 1144209100TAE MIN - Steps 总理和我在OST 218 东方神起TAE,流行
3.用户行为数据(user_watch_pref.sml)
userid,itemid,用户对item收听时长,点击时间(小时)
01e3fdf415107cd6046a07481fbed499 6470209102 555 16 01e3fdf415107cd6046a07481fbed499 6470209102 2024 22 012e5c128fbe16c302c9a12f9238f871 6470209102 1047 13 012e5c128fbe16c302c9a12f9238f871 6470209102 1155 10 01e3fdf415107cd6046a07481fbed499 6470209102 795 22
三、推荐系统步骤
1、数据预处理
总体思路:处理原始的数据,将用户画像数据 、物品元数据、用户行为数据,3份融合到一起,得到处理后merge_base.data,用于cb、cf算法进行计算。
python gen_base.py #coding=utf-8 import sys #找到三类原始数据文件,用户画像数据、物品元数据,用户行为数据 user_action_data = '../data/user_watch_pref.sml' music_meta_data = '../data/music_meta' user_profile_data = '../data/user_profile.data' ##将合并后的元数据放到新的文件里 output_file = '../data/merge_base.data' # 将3份数据merge后的结果输出,供下游数据处理 ofile = open(output_file, 'w') # step 1. 处理物品元数据,将处理后的结果放入字典里面,key是itemid,value为物品对应的信息,为最后写入做准备 item_info_dict = {} with open(music_meta_data, 'r') as fd: for line in fd: ss = line.strip().split('\001') if len(ss) != 6: continue itemid, name, desc, total_timelen, location, tags = ss item_info_dict[itemid] = '\001'.join([name, desc, total_timelen, location, tags]) # step 2. 处理用户画像数据,将处理后的结果放入字典里面,key是用户id,value是用户信息 user_profile_dict = {} with open(user_profile_data, 'r') as fd: for line in fd: ss = line.strip().split(',') if len(ss) != 5: continue userid, gender, age, salary, location = ss user_profile_dict[userid] = '\001'.join([gender, age, salary, location]) # step 3. 写入最后的信息,将用户行为数据进行处理,把step1和step2得到的数据一并归纳在文件里面 with open(user_action_data, 'r') as fd: for line in fd: ss = line.strip().split('\001') if len(ss) != 4: continue userid, itemid, watch_len, hour = ss if userid not in user_profile_dict: continue if itemid not in item_info_dict: continue ofile.write('\001'.join([userid, itemid, watch_len, hour, \ user_profile_dict[userid], item_info_dict[itemid]])) ofile.write("\n") ofile.close()
合并后得到类似下面数据merge_base.data
数据字段包含itemid、userid、用户信息(年龄、性别、收入、地区)、物品信息(名字、描述、时长、标签)、用户行为数据(收听时长)
2、【召回】CB算法
【注:CF、CB只是数据形态不一样,逻辑是一样的,都是计算矩阵两两元素之间的分数,即相关性,都可以套用协同过滤模板得到ii矩阵】
进一步说明:最后得到基于cb的 II 矩阵,输出得到 cb_result 数据,即 mr_cf 程序(只是数据源变了)。在这里输入数据是 cb_train.data ,内容是 token、itemid、score ,相当于CB算法基于item属性的CB,输出则是itemid、itemid、score。
即:正排表:item ——> token、token、token
倒排表:token ——> item、item、item
CF:user,item,score -》item item sim
CB:token,item,score -》item item sim
总体思路:将初始化好的用户,物品,用户行为数据进行处理,对item去重,目的是为了得到token,itemid,score,对于生成的数据里面的name,将itemName进行分词,得到tfidf权重,同时将desc进行分词,处理name和desc。
代码思路:元数据中tags已经分类好无需再次进行切分,只需要用idf词表查处权重即可,针对name、desc、tags三个分词结果,适当调整name权重比例,分别对这三类得出的分数再次进行分数权重划分,最后得到cb的初始数据。
(1)利用jieba分词,对item name进行中文分词,item —> name desc tag ,对每个item的这三个字段进行分词并计算score,将数据用字典token_dict维护执行python gen_cb_train.py
#coding=utf-8 import sys sys.path.append('../') reload(sys) sys.setdefaultencoding('utf-8') import jieba import jieba.posseg import jieba.analyse #读入初始数据 input_file = "../data/merge_base.data" # 输出cb训练数据 output_file = '../data/cb_train.data' outfile = open(output_file, 'w') #定义三类的权重分数(大小可自行设定) RATIO_FOR_NAME = 0.9 RATIO_FOR_DESC = 0.1 RATIO_FOR_TAGS = 0.05 #为tags读入idf权重值 idf_file = '../data/idf.txt' idf_dict = {} with open(idf_file, 'r') as fd: for line in fd: token, idf_score = line.strip().split(' ') idf_dict[token] = idf_score #开始处理初始数据 itemid_set = set() with open(input_file, 'r') as fd: for line in fd: ss = line.strip().split('\001') # 用户行为 userid = ss[0].strip() itemid = ss[1].strip() watch_len = ss[2].strip() hour = ss[3].strip() # 用户画像 gender = ss[4].strip() age = ss[5].strip() salary = ss[6].strip() user_location = ss[7].strip() # 物品元数据 name = ss[8].strip() desc = ss[9].strip() total_timelen = ss[10].strip() item_location = ss[11].strip() tags = ss[12].strip() # 对item去重,相同的itemid不用再计算,因为都一样,这里用到continue特性,当不同的时候才继续执行下面的代码 if itemid not in itemid_set: itemid_set.add(itemid) else: continue # 去掉重复后的itemid,然后我们进行分词,计算权重,放到字典里面 token_dict = {} #对name统计 for a in jieba.analyse.extract_tags(name, withWeight=True): token = a[0] score = float(a[1]) token_dict[token] = score * RATIO_FOR_NAME #对desc进行分词,这里需要注意的是描述一般会含有name中的词,这里我们把有的词的分数进行相加,没有的放入 for a in jieba.analyse.extract_tags(desc, withWeight=True): token = a[0] score = float(a[1]) if token in token_dict: token_dict[token] += score * RATIO_FOR_DESC else: token_dict[token] = score * RATIO_FOR_DESC # 对tags 进行分数计算 for tag in tags.strip().split(','): if tag not in idf_dict: continue else: if tag in token_dict: token_dict[tag] += float(idf_dict[tag]) * RATIO_FOR_TAGS else: token_dict[tag] = float(idf_dict[tag]) * RATIO_FOR_TAGS #循环遍历token_dict,输出toke,itemid,score for k, v in token_dict.items(): token = k.strip() score = str(v) ofile.write(','.join([token, itemid, score])) ofile.write("\n") outfile.close()
经过数据预处理,得到如下格式的cb训练数据:
tokenid itemid,score(itemid中的各个token在该item中的重要性)
哲,4090309101,0.896607562717
大连,4090309101,0.568628215367
舞曲,4090309101,0.713898826298
大美妞,4090309101,0.896607562717
网络,4090309101,0.465710816584
伤感,4090309101,0.628141853463
(注:最后一个字段不是传统的TF-IDF,因为分出的词在name,desc,tag里面权重不同,即切分单词在desc中重要性不同)
(2)用协同过滤算法跑出item-item数据
求相似度的II矩阵(相似的item配对,形成II矩阵)。
相似度计算:要用到MapReduce的框架来进行,只要用到shuffle阶段,对map出来的结果排序,reduce进行两两配对,主要是wordcount逻辑,主要说下注意的部分:我们需要把两两分数的过滤掉,或是把itemA和itemB相同的item过滤掉,因为这部分数据没有任何意义。
3.map阶段
总体思路:这里需要把初始化后的结果进行map排序,为了后续两两取 pair对,所以这里我们需要进行map,其实什么也不用操作输出即可
import sys import re for line in sys.stdin: ss = line.strip().split(',') if len(ss) != 3: continue r1 = u'[a-zA-Z0-9’!"#$%&\'()*+,-./:;<=>?@,。?★、…【】《》?“”‘’![\\]^_`{|}~]+' ss[0] = re.sub(r1,'',ss[0]) if len(ss[0]) == 0: continue print ','.join([ss[0], ss[1], ss[2]])
4.reduce阶段:
在pair reduce之前做过map操作,以token,item,score输出,所以token排序好的, 这里我们相当于求的是II矩阵,所以是相同的token的item进行相似度计算
总体思路:
1、进行user统计,若相同,把相同的user的item和score放入list里面
2、不相同,开始进行两两配对,循环该list,进行两两配对,求出相似度
import sys import math cur_token = None item_score_list = [] for line in sys.stdin: ss = line.strip().split(',') itemid = ss[1] score = float(ss[2]) if len(ss) != 3: continue if cur_token == None: cur_token = ss[0] if cur_token != ss[0]: #这里需要注意的是range的区间前闭后开,同时注意range中即使前闭后开,刚开始是从0即列表里面的第一个,循环到列表最后一个的前一个 for i in range(0,len(item_score_list)-1): for j in range(i+1,len(item_score_list)): item_a,score_a = item_score_list[i] item_b,score_b = item_score_list[j] #score = float(score_a * score_b)/float(math.sqrt(pow(score_a,2))*math.sqrt(pow(score_b,2))) #输出两遍的目的是为了形成II矩阵的对称 score = float(score_a*score_b) if item_a == item_b: continue if score < 0.08: continue print "%s\t%s\t%s" % (item_a, item_b, score) print "%s\t%s\t%s" % (item_b, item_a, score) cur_token = ss[0] item_score_list = [] item_score_list.append((itemid,float(score))) for i in range(0, len(item_score_list) - 1): for j in range(i + 1, len(item_score_list)): item_a, score_a = item_score_list[i] item_b, score_b = item_score_list[j] #score = (score_a * score_b) / (math.sqrt(pow(score_a, 2)) * math.sqrt(pow(score_b, 2)) # 输出两遍的目的是为了形成II矩阵的对称 score = float(score_a * score_b) if item_a == item_b: continue if score < 0.08: continue print "%s\t%s\t%s" % (item_a, item_b, score) print "%s\t%s\t%s" % (item_b, item_a, score)
最后得到基于cb的ii矩阵
(3)对数据格式化,item-> item list形式,整理出KV形式
python gen_reclist.py
思路:上步通过CB算法得到itemA,itemB,score需要把放入到redis库,先进行数据格式化,以itemA为key与itemA有相似度的itemB,和分数,以value的形式存入内存库
1、创建一个字典,将key放入itemA,value 放入与A对应的不同b和分数
2、循环遍历字典,将key加上前缀CB,value以从大到小的分数进行排序,并且相同的item以——分割,item和score间用:分割
#coding=utf-8 import sys infile = '../data/cb.result' outfile = '../data/cb_reclist.redis' ofile = open(outfile, 'w') MAX_RECLIST_SIZE = 100 PREFIX = 'CB_' rec_dict = {} with open(infile, 'r') as fd: for line in fd: itemid_A, itemid_B, sim_score = line.strip().split('\t') #判断itemA在不在该字典里面,若不在,创建一个key为itemA的列表,把与itemA相关联的itemB和score添加进去 if itemid_A not in rec_dict: rec_dict[itemid_A] = [] rec_dict[itemid_A].append((itemid_B, sim_score)) #循环遍历字典,格式化数据,把itemB和score中间以:分割,不同的itemB以_分割 for k, v in rec_dict.items(): key_item = PREFIX + k #接下来格式化数据,将数据以从大到小排列后再格式化 #排序,由于数据量大,我们只取100个 #排好序后,我们来格式化数据 reclist_result = '_'.join([':'.join([tu[0], str(round(float(tu[1]), 6))]) \ for tu in sorted(v, key=lambda x: x[1], reverse=True)[:MAX_RECLIST_SIZE]]) ofile.write(' '.join(['SET', key_item, reclist_result])) ofile.write("\n") ofile.close()
得到类似如下数据:根据itemid_a,返回相似对应itemid_b的score
SET CB_53051091
76 726100303:0.393048_953500302:0.393048_6193109237:0.348855
5.【召回】CF算法:
下文是利用Mapreduce实现CF算法,如果想利用Spark实现CF算法请参考的这篇文章https://blog.csdn.net/qq_36816848/article/details/113184759
(1)以userid itemid score形式整理训练数据
总体思路:首先和cb一样,对处理完的用户元数据,物品元数据,行为数据进行cf数据准备工作,我们的目的事输出: user,item
score,其中主要是的到用户对item的score,其中用户收听的音乐的时常和总的时长相除得到score
python gen_cf_train.py #coding=utf-8 import sys input_file = "../data/merge_base.data" # 输出cf训练数据 output_file = '../data/cf_train.data' ofile = open(output_file, 'w') key_dict = {} with open(input_file, 'r') as fd: for line in fd: ss = line.strip().split('\001') # 用户行为 userid = ss[0].strip() itemid = ss[1].strip() watch_len = ss[2].strip() hour = ss[3].strip() # 用户画像 gender = ss[4].strip() age = ss[5].strip() salary = ss[6].strip() user_location = ss[7].strip() # 物品元数据 name = ss[8].strip() desc = ss[9].strip() total_timelen = ss[10].strip() item_location = ss[11].strip() tags = ss[12].strip() #拼接key,为了将同一个用户对相同物品的时长全部得到,需要做个聚合 key = '_'.join([userid, itemid]) if key not in key_dict: key_dict[key] = [] key_dict[key].append((int(watch_len), int(total_timelen))) #循环处理相同用户对相同item的分数 for k, v in key_dict.items(): t_finished = 0 t_all = 0 # 对<userid, itemid>为key进行分数聚合 for vv in v: t_finished += vv[0] t_all += vv[1] # 得到userid对item的最终分数 score = float(t_finished) / float(t_all) userid, itemid = k.strip().split('_') ofile.write(','.join([userid, itemid, str(score)])) ofile.write("\n") ofile.close()
得到如下数据:cf_train.data
userid, itemid, score 0189c9fecdd47bb64720c23a960272d3,935400252,1.3
014e7a8f4544bcd156365d3f348399c2,068800255,1.21889952153
00af96daaf12d1afa11d102f9f98fc3b,405100213,0.0581395348837
00383d3536ce00ad469cb1c57946686a,732009535,1.5703125
00fa3f43730a4374a43e1edfab614bb4,720400256,1.41509433962
0027834e40d613c175f715052aa341af,411500272,1.30735930736
00e5dd1b98a94f1976e49ffedc830e84,177200319,0.764705882353
(2)用ALS协同过滤算法跑出item-item数据 (套用协同过滤)
II矩阵数据准备,redis数据分为这么几个部分,这部分的数据需要利用到MapReduce框架,进行map和reduce排序。