四、MR实现CF算法
1.归一化
归一化阶段我们主要是将相同的item进行单位模计算,把数据映射到0~1范围之内处理,更加便捷快速。因为我们要用到cos相似度计算公式,将相同的item的分数进行平方和再开根号,最后进行单位化。
相似度的计算公式:
2.取pair对 相同用户两两取pair,输出两次,形成II矩阵
3.计算总和 将相同pair的分数相加
(1)归一化:
1.map阶段,只要将转数据换成item,user,score ,因为我们要在reduce阶段进行相同item单位化,要充分用到shuffle阶段的排序。
#!usr/bin/python # -*- coding: UTF-8 -*- ''' 思路:转换成i,u,s的矩阵 ''' import sys for line in sys.stdin: ss = line.strip().split(',') if len(ss) != 3: continue u , i , s = ss print '\t'.join([i,u,s])
2.reduce阶段
我们需要将相同item平方和相加开根号,然后再单位化计算,最后输出。
#!usr/bin/python
# -*- coding: UTF-8 -*-
'''
在map的基础上将每个item进行归一化,map已经将相同的item排好序,根据map的结果进行给先平方再开根号:
思路 :
1、截取字符串,取出item,user,socre
2、在for循环中进行判断,当前的item和下一个是否相同,要是相同,将相同的放到列表(user,score)列表里面,否则往下执行
3、若不相同,循环user和score列表,计算模计算,然后再次循环,进行单位化计算
import sys import math cur_item = None user_score_list = [] for line in sys.stdin: ss = line.strip().split('\t') if len(ss) != 3: continue item = ss[0] userid = ss[1] score = ss[2] #wordcount判断,当前和下一个是否相同,相同添加到列表,不相同进行归一化计算 if cur_item == None: cur_item = item if cur_item != item: #定义sum sum = 0.0 #循环列表进行模向量计算 for ss in user_score_list: user,s = ss sum += pow(s,2) sum = math.sqrt(sum) #单位化计算 for touple in user_score_list: u,s = touple # 进行单位化完成后,我们输出重置成原来的user-item-score输出 print "%s\t%s\t%s" % (u, cur_item, float(s / sum)) #初始化这两个变量 cur_item = item user_score_list = [] user_score_list.append((userid,float(score))) #定义sum sum = 0.0 #循环列表进行模向量计算 for ss in user_score_list: user,s = ss sum += pow(s,2) sum = math.sqrt(sum) #单位化计算 for touple in user_score_list: u,s = touple # 进行单位化完成后,我们输出重置成原来的user-item-score输出 print "%s\t%s\t%s" % (u, cur_item, float(s / sum))
(2)两两取pair对
思路:两两取pair对,我们在map阶段,其实什么都不用做,保证输出user,itemid,score即可。**
map阶段:
#!usr/bin/python # -*- coding: UTF-8 -*- #在进行pair取对之前,什么都不需要做,输出就行 import sys for line in sys.stdin: u, i, s = line.strip().split('\t') print "%s\t%s\t%s" % (u, i, s)
reduce阶段:
将同一个用户下的item进行两两取对,因为要形成II矩阵,就必须以user为参考单位,相反形成uu矩阵,就必须以item参考,所以将同一个用户下的item进行两两取对,并将分数相乘,就得到临时这个相似度,因为还没有对相同pair对的分数相加,这个是最后一步要做的。
#!usr/bin/python
# -*- coding: UTF-8 -*-
'''
思路:进行map排好序之后,我们的会得到相同user对应的不同item和score,这里我们主要的思路是进行相同用户两两取pair
1、进行判断,当前用户和下一个用户是不是一样,若是不一样,我们进行两两取对,形成ii矩阵
2、若是相同,我们将不同的item和score放入list里面
'''
import sys cur_user = None item_score_list = [] for line in sys.stdin: user,item,score = line.strip().split('\t') if cur_user == None: cur_user= user if cur_user != user: #进行两两pair,利用range函数 for i in range(0,len(item_score_list)-1): for j in range(i+1,len(item_score_list)): item_a, score_a = item_score_list[i] item_b, score_b = item_score_list[j] # 输出两遍的目的是为了形成II矩阵的对称 print "%s\t%s\t%s" % (item_a, item_b, score_a * score_b) print "%s\t%s\t%s" % (item_b, item_a, score_a * score_b) cur_user = user item_score_list = [] item_score_list.append((item,float(score))) #进行两两pair,利用range函数 for i in range(0,len(item_score_list)-1): for j in range(i+1,len(item_score_list)): item_a, score_a = item_score_list[i] item_b, score_b = item_score_list[j] # 输出两遍的目的是为了形成II矩阵的对称 print "%s\t%s\t%s" % (item_a, item_b, score_a * score_b) print "%s\t%s\t%s" % (item_b, item_a, score_a * score_b)
(3)进行最终分数求和,我们最后的阶段是要将相同pair的分数相加才能得到两个item的相似度。
map阶段,将相同item对排序到一起,就要将pair组成一个key进行排序,将同一个partition后数据放倒一个reduce桶中,MapReduce框架shuffle阶段,key只是做排序,partition只是做分区,不要搞混了。
#!usr/bin/python # -*- coding: UTF-8 -*- ''' sum的map中,我们需要把相同的itemA,itemB组成key,为了使相同的key能够在shuffle阶段分配到同一个reduce中, 因为是计算item的相似度,要把相同的相加 ''' import sys for line in sys.stdin: item_a,item_b,score = line.strip().split('\t') key = '#'.join([item_a,item_b]) print '%s\t%s' %(key,score) reduce阶段主要任务就是将相同的item的pair对相加. ''' 思路:将相同的item的分数进行相加,得到最后的相似度 ''' import sys cur_item = None score = 0.0 for line in sys.stdin: item, s = line.strip().split('\t') if not cur_item: cur_item = item if cur_item != item: ss = item.split("#") if len(ss) != 2: continue item_a, item_b = ss print "%s\t%s\t%s" % (item_a, item_b, score) cur_item = item score = 0.0 score += float(s) ss = item.split("#") if len(ss) != 2: sys.exit() item_a, item_b = ss print "%s\t%s\t%s" % (item_a, item_b, score)
执行上述程序运行脚本run.sh
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar" #要想cf代码直接改成cf_train.data INPUT_FILE_PATH_1="/cf_train.data" OUTPUT_PATH_1="/output1" OUTPUT_PATH_2="/output2" OUTPUT_PATH_3="/output3" $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH_1 $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH_2 $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH_3 Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH_1 \ -mapper "python 1_gen_ui_map.py" \ -reducer "python 1_gen_ui_reduce.py" \ -jobconf "mapreduce.map.memory.mb=4096" \ -file ./1_gen_ui_map.py \ -file ./1_gen_ui_reduce.py Step 2. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $OUTPUT_PATH_1 \ -output $OUTPUT_PATH_2 \ -mapper "python 2_gen_ii_pair_map.py" \ -reducer "python 2_gen_ii_pair_reduce.py" \ -jobconf "mapreduce.map.memory.mb=4096" \ -file ./2_gen_ii_pair_map.py \ -file ./2_gen_ii_pair_reduce.py Step 3. $HADOOP_CMD jar $STREAM_JAR_PATH \ -input $OUTPUT_PATH_2 \ -output $OUTPUT_PATH_3 \ -mapper "python 3_sum_map.py" \ -reducer "python 3_sum_reduce.py" \ -jobconf "mapreduce.map.memory.mb=8000" \ -file ./3_sum_map.py \ -file ./3_sum_reduce.py 最后得到基于cf的ii矩阵 cf_train.data ,执行得到cf.result (最后一列没有超过1的) 000000228 006900337 0.495383099617 000000228 237400301 0.4655287556 000000228 489600256 0.327370227556 000000228 880800319 0.6522021568 000000228 895300223 0.0654423912424
(3)对数据格式化,item-> item list形式,整理出KV形式
代码:python gen_reclist.py
结果统一放入放入redis,读kv,以itemid_A为key,其余两列追加为value,放到rec_dict。
区分key,加前缀CB_ ,SET为redis命令,实现批量灌入
#coding=utf-8 ''' 思路:这个处理的逻辑和CB中完全一样,不一样的是redis的key是CF开头 ''' import sys infile = '../data/cf.result' outfile = '../data/cf_reclist.redis' ofile = open(outfile, 'w') MAX_RECLIST_SIZE = 100 PREFIX = 'CF_' rec_dict = {} with open(input_file,'r') as fd: for line in fd: itemid_A, itemid_B, score = line.strip().split('\t') #判断itemA在不在该字典里面,若不在,创建一个key为itemA的列表,把与itemA相关联的itemB和score添加进去 if itemid_A not in rec_dict: rec_dict[itemid_A] = [] rec_dict[itemid_A].append((itemid_B, score)) #循环遍历字典,格式化数据,把itemB和score中间以:分割,不同的itemB以_分割 for k,v in rec_dict.items(): key = PREFIX+k #接下来格式化数据,将数据以从大到小排列后再格式化 #排序,由于数据量大,我们只取100个 list = sorted(v,key=lambda x:x[1],reverse=True)[:MAX_RECLIST_SIZE] #拍好序后,我们来格式化数据 result = '_'.join([':'.join([str(val[0]),str(round(float(val[1]),6))]) for val in list]) ofile.write(' '.join(['SET',key,result])) ofile.write("\n") ofile.close()
五、灌库(redis)
1.Centos中安装redis
本文下载的是redis-2.8.3,下载对应安装包并进行源码编译(需要C编译yum install gcc-c++ ),先执行make,然后进入src目录中,得到bin文件(redis-server 服务器,redis-cli 客户端)
2.启动redis server服务两种方法
]# ./src/redis-server
3.导入数据
首先连接服务,换一个终端执行:]# ./src/redis-cli,连接redis。
**3.1 灌数据(批量灌):**需要安装unix2dos进行格式转换(yum install unix2dos),安装完后执行unix2dos cb_reclist.reds命令。
再执行cat cb_reclist.redis | /usr/local/src/redis/redis-2.8.3/src/redis-cli --pipe
进入redis验证执行./src/redis-cli
获取数据并查看:127.0.0.1:6379> get CB_5305109176
726100303:0.393048_953500302:0.393048_6193109237:0.348855"
3.2 同上方法可以将cf灌库
unix2dos cf_reclist.redis
cat cf_reclist.redis | /usr/local/src/redis-2.8.3/src/redis-cli --pipe
4、LR训练模型的数据准备
准备我们自己的训练数据,其中标签label=watch_time / total_time
进入pre_data_for_rankmodel目录:python gen_samples.py,利用最开始的merge_base.data数据,最后输出samples.data。
思路:经过cb,cf算法,将数据已经放库,召回部分已经完成,接下来做排序模型,为逻辑回归准备样本数据
1、处理第一次将用户元数据,物品元数据,用户行为数据一起归并的数据,也就是merge_base.data,我们在这里需要得到用户画像数据,用户信息数据,标签数据
2、收取样本,标签,用户画像信息,物品信息
3、抽取用户画像信息,对性别和年龄生成样本数据
4、抽取item特征信息,分词获得token,score,做样本数据
5、拼接样本,生成最终的样本信息,作为模型进行训练 ‘’’
#coding=utf-8 import sys sys.path.append('../') reload(sys) sys.setdefaultencoding('utf-8') import jieba import jieba.analyse import jieba.posseg merge_base_infile = '../data/merge_base.data' output_file = '../data/samples.data' #我们这里需要再生成两个文件,一个是用户样本和item样本,因为要对实时推荐的化,必须使用这两个样本 output_user_feature_file = '../data/user_feature.data' output_item_feature_file = '../data/item_feature.data' #这里生成个类似name和id对应的字典信息 output_itemid_to_name_file = '../data/name_id.dict' #定义函数,来获取各类数据 def get_base_samples(infile): #放待处理样本数据 ret_samples_list = [] #放user用户数据 user_info_set = set() #放物品数据 item_info_set = set() item_name2id = {} item_id2name = {} with open(infile, 'r') as fd: for line in fd: ss = line.strip().split('\001') if len(ss) != 13: continue userid = ss[0].strip() itemid = ss[1].strip() #这两个时间为了计算label而使用 watch_time = ss[2].strip() total_time = ss[10].strip() #用户数据 gender = ss[4].strip() age = ss[5].strip() user_feature = '\001'.join([userid, gender, age]) #物品数据 name = ss[8].strip() item_feature = '\001'.join([itemid, name]) #计算标签 label = float(watch_time) / float(total_time) final_label = '0' if label >= 0.82: final_label = '1' elif label <= 0.3: final_label = '0' else: continue #接下来装在数据,并返回结果,首先我们装在itemid2name和itemname2id item_name2id[name] = itemid item_id2name[itemid] = name #装在待处理的标签数据 ret_samples_list.append([final_label, user_feature, item_feature]) user_info_set.add(user_feature) item_info_set.add(name) return ret_samples_list, user_info_set, item_info_set, item_name2id, item_id2name #step 1 程序的入口,开始调用函数,开始处理文件,得到相应的数据 base_sample_list, user_info_set, item_info_set, item_name2id, item_id2name = \ get_base_samples(merge_base_infile) #step 2 抽取用户画像信息,用户标签转换,将年龄和age进行转换,用于样本使用 user_fea_dict = {} for info in user_info_set: userid, gender, age = info.strip().split('\001') #设置标签idx,将男(1)和女(0)用数剧的形式表示,权重都设置为1 idx = 0 # default 女 if gender == '男': idx = 1 #将标签和权重拼接起来 gender_fea = ':'.join([str(idx), '1']) #性别设置完成,我们接下来设置年龄,将年龄进行划分,0-18,19-25,26-35,36-45 idx = 0 if age == '0-18': idx = 0 elif age == '19-25': idx = 1 elif age == '26-35': idx = 2 elif age == '36-45': idx = 3 else: idx = 4 idx += 2 age_fea = ':'.join([str(idx), '1']) user_fea_dict[userid] = ' '.join([gender_fea, age_fea]) #step 3 抽取物品特征,这里我们要用到分词,将name进行分词,并且把分词后的token转换成id,这里就需要我们来做生成tokenid词表 token_set = set() item_fs_dict = {} for name in item_info_set: token_score_list = [] for x,w in jieba.analyse.extract_tags(name,withWeight=True): token_score_list.append((x,w)) token_set.add(x) item_fs_dict[name] = token_score_list #进行token2id的转换 token_id_dict = {} #这里我们要用到刚刚利用set去重过的token列表,生成tokenid的字典表 for s in enumerate(list(token_set)): token_id_dict[s[1]] = s[0] #接下来,我们需要把第三步生成的item_fs_dict中name对应的token全部替换成id,然后当作字典,为下面的全量替换做准备 item_fea_dict = {} user_feature_offset = 10 for name ,fea in item_fs_dict.items(): token_score_list = [] for (token,score) in fea: if token not in token_id_dict: continue token_id = token_id_dict[token] + user_feature_offset token_score_list.append(':'.join([str(token_id),str(score)])) #接下来输出到字典中 item_fea_dict[name] = ' '.join(token_score_list) #step 4 将第一步输出的样本数据整体替换并且替换user_feature和item_feature,并输出到文件中 ofile = open(output_file,'w') for (label,userfea,itemfea) in base_sample_list: userid = userfea.strip().split('\001')[0] item_name = itemfea.strip().split('\001')[1] if userid not in user_fea_dict: continue if item_name not in item_fea_dict: continue ofile.write(' '.join([label,user_fea_dict[userid],item_fea_dict[item_name]])) ofile.write('\n') ofile.close() #step 5 为了能够实时使用userfeatre,我们需要输出一下 out_put_file = open(output_user_feature_file,'w') for userid,fea in user_fea_dict.items(): out_put_file.write('\t'.join([userid,fea])) out_put_file.write('\n') out_put_file.close() #step 6 输出item_feature out_file = open(output_item_feature_file,'w') for name,fea in item_fea_dict.items(): if name not in item_name2id: continue itemid = item_name2id[name] out_file.write('\t'.join([itemid,fea])) out_file.write('\n') #step 7 输出id2name的对应的字典 o_file = open(output_itemid_to_name_file,'w') for id,name in item_id2name.items(): o_file.write('\t'.join([id,name])) o_file.write('\n') o_file.close()
得到如下数据:
1.item_feature.data
2.user_feature.data
3.samples.data(将1、2拼接)
由userid、item_name拼接
六、模型准备
lr.py
思路:用到数据,需要写load_data的部分,
1.定义main,方法入口,然后进行load_data的编写
2.调用该方法的到x训练x测试,y训练,y测试,使用L1正则化或是L2正则化使得到结果更加可靠
3. 输出wegiht,和b偏置
# -*- coding: UTF-8 -*- import sys import numpy as np from scipy.sparse import csr_matrix from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression input_file = sys.argv[1] def load_data(): #由于在计算过程用到矩阵计算,这里我们需要根据我们的数据设置行,列,和训练的数据准备 #标签列表 target_list = [] #行数列表 fea_row_list = [] #特征列表 fea_col_list = [] #分数列表 data_list = [] #设置行号计数器 row_idx = 0 max_col = 0 with open(input_file,'r') as fd: for line in fd: ss = line.strip().split(' ') #标签 label = ss[0] #特征 fea = ss[1:] #将标签放入标签列表中 target_list.append(int(label)) #开始循环处理特征: for fea_score in fea: sss = fea_score.strip().split(':') if len(sss) != 2: continue feature, score = sss #增加行 fea_row_list.append(row_idx) #增加列 fea_col_list.append(int(feature)) #填充分数 data_list.append(float(score)) if int(feature) > max_col: max_col = int(feature) row_idx += 1 row = np.array(fea_row_list) col = np.array(fea_col_list) data = np.array(data_list) fea_datasets = csr_matrix((data, (row, col)), shape=(row_idx, max_col + 1)) x_train, x_test, y_train, y_test = train_test_split(fea_datasets, s, test_size=0.2, random_state=0) return x_train, x_test, y_train, y_test def main(): x_train,x_test,y_train,y_test = load_data() #用L2正则话防止过拟合 model = LogisticRegression(penalty='l2') #模型训练 model.fit(x_train,y_train) ff_w = open('model.w', 'w') ff_b = open('model.b', 'w') #写入训练出来的W for w_list in model.coef_: for w in w_list: print >> ff_w, "w: ", w # 写入训练出来的B for b in model.intercept_: print >> ff_b, "b: ", b print "precision: ", model.score(x_test, y_test) print "MSE: ", np.mean((model.predict(x_test) - y_test) ** 2) if __name__ == '__main__': main()
七、推荐系统实现
推荐系统demo流程:
初始化工作:加载data,获取user_feature.data,item_feature.data特征 ,用字典维护。
(1)解析请求:userid,itemid
(2)加载模型:加载排序模型(model.w,model.b)
(3)检索候选集合:利用cb,cf去redis里面检索数据库,得到候选集合
(4)获取用户特征user_feature.data :userid
(5)获取物品特征item_feature.data :itemid
(6)打分(逻辑回归函数sigmoid: 1 / (1 + exp(-wx))),排序
(7)top-n过滤(精排)
(8)数据包装(itemid->name),返回
执行主函数 main.py
#coding=utf-8 import web import sys import redis import json import math urls = ( '/', 'index', '/test', 'test', ) app = web.application(urls, globals()) # 加载user特征 user_fea_dict = {} with open('../data/user_feature.data') as fd: for line in fd: userid, fea_list_str = line.strip().split('\t') user_fea_dict[userid] = fea_list_str # 加载item特征 item_fea_dict = {} with open('../data/item_feature.data') as fd: for line in fd: ss = line.strip().split('\t') if len(ss) != 2: continue itemid, fea_list_str = ss item_fea_dict[itemid] = fea_list_str class index: def GET(self): r = redis.Redis(host='master', port=6379,db=0) # step 1 : 解析请求,上面我们已经得到userid,itemid params = web.input() userid = params.get('userid', '') req_itemid = params.get('itemid', '') # step 2 : 加载模型 model_w_file_path = '../rankmodel/model.w' model_b_file_path = '../rankmodel/model.b' model_w_list = [] model_b = 0. with open (model_w_file_path, 'r') as fd: for line in fd: ss = line.strip().split(' ') if len(ss) != 3: continue model_w_list.append(float(ss[2].strip())) with open (model_b_file_path, 'r') as fd: for line in fd: ss = line.strip().split(' ') model_b = float(ss[2].strip()) # step 3 : 检索候选(match),这里我们分两次,cb,cf #将检索回来的item全部放到recallitem列表里面 rec_item_mergeall = [] # 3.1 cf cf_recinfo = 'null' key = '_'.join(['CF', req_itemid]) if r.exists(key): cf_recinfo = r.get(key) if len(cf_recinfo) > 6: for cf_iteminfo in cf_recinfo.strip().split('_'): item, score = cf_iteminfo.strip().split(':') rec_item_mergeall.append(item) # 3.2 cb cb_recinfo = 'null' key = '_'.join(['CB', req_itemid]) if r.exists(key): cb_recinfo = r.get(key) if len(cb_recinfo) > 6: for cb_iteminfo in cb_recinfo.strip().split('_'): item, score = cb_iteminfo.strip().split(':') rec_item_mergeall.append(item) # step 4: 获取用户特征,将获取的用户特征处理后放到字典里面,方便后续计算内积 user_fea = '' if userid in user_fea_dict: user_fea = user_fea_dict[userid] u_fea_dict = {} for fea_idx in user_fea.strip().split(' '): ss = fea_idx.strip().split(':') if len(ss) != 2: continue idx = int(ss[0].strip()) score = float(ss[1].strip()) u_fea_dict[idx] = score # step 5: 获取物品的特征 ,循环遍历刚刚得到itemid,判断item是否在item特征中,若在开始进行处理 rec_list = [] for itemid in rec_item_mergeall: if itemid in item_fea_dict: item_fea = item_fea_dict[itemid] i_fea_dict = dict() for fea_idx in item_fea.strip().split(' '): ss = fea_idx.strip().split(':') if len(ss) != 2: continue idx = int(ss[0].strip()) score = float(ss[1].strip()) i_fea_dict[idx] = score #得到召回item对应的特征和用户的特征,之后根据模型求出来的w,b,进行打分 wx_score = 0. #这里我们求个内积,wx,然后做sigmoid,先将两个字典拼接起来,然后计算分数 for fea, score in dict(u_fea_dict.items() + i_fea_dict.items()).items(): wx_score += (score * model_w_list[fea]) #**计算sigmoid: 1 / (1 + exp(-wx))** final_rec_score = 1 / (1 + math.exp(-(wx_score + model_b))) #将itemid和分数放入列表中,方便后续排序 rec_list.append((itemid, final_rec_score)) # step 6 : 精排序(rank) rec_sort_list = sorted(rec_list, key=lambda x:x[1], reverse=True) # step 7 : 过滤(filter)取top10 rec_fitler_list = rec_sort_list[:10] # step 8 : 返回+包装(return),进行将itemid转换成name item_dict = {} with open('../data/name_id.dict', 'r') as fd: for line in fd: raw_itemid, name = line.strip().split('\t') item_dict[raw_itemid] = name ret_list = [] for tup in rec_fitler_list: req_item_name = item_dict[req_itemid] item_name = item_dict[tup[0]] item_rank_score = str(tup[1]) ret_list.append(' -> '.join([req_item_name, item_name, item_rank_score])) ret = '\n'.join(ret_list) return ret class test: def GET(self): print web.input() return '222' if __name__ == "__main__": app.run()
七.推荐系统测试:
保证redis启动,执行python main.py 9999
(py27) [root@master rec_server]# 192.168.179.10:9999/?userid=00370d83b51febe3e8ae395afa95c684&itemid=3880409156
浏览器输入上述返回推荐top10。
可视化页面查看:进入web目录执行 python page_web.py 9999
输入useid_itemid