一、 项目样式展现
下面主要展现的是项目的整体部分,主要分为推荐页,热门页以及新闻详情页。
(因为审*核问题,下图打了点码hhh)
二、后端目录结构
news_rec_sys/ conf/ dao_config.py controller/ dao/ materials/ news_scrapy/ user_proccess/ material_proccess recpocess/ recall/ rank/ online.py offline.py scheduler/ server.py
conf/dao_config.py: 候选整体配置文件
controller/ : 项目中用于操作数据库的接口
dao/ : 项目的实体类,对应数据库表。
如entity文件夹的定义mysql数据库中的表结构;
mongo_server.py通过pymongo获取mongo实例中的collections,用于存储新闻及其画像、用户画像;
mysql_server.py基于sqlalchemy创建连接某个mysql的会话session,存储用户及其行为信息;
redis_server.py访问redis中的字典。
materials/: 项目的物料部分,主要用户爬取物料以及处理用户画像和新闻画像
recpocess/: 项目的推荐模块,主要包含召回和排序,以及一些线上服务和线下处理部分
scheduler: 项目的定时任务的脚本部分,
server.py: 项目后端的入口部分,主要包含项目整体的后端接口部分。
在该项目中:
(1)前端主要使用的是Vue框架+mint-ui;
(2)后端主要使用的是Flask+Mysql+Mongodb+Redis来完成的;
(3)前后端采用分离的方式,通过json的数据格式进行数据传递。
其中该项目后端的主要逻辑在在server.py中,其中主要包含用户注册,登录,推荐列表,热门列表,获取新闻详情页以及用户的行为等功能。
1、用户注册登录
为了能够对用户进行千人千面的推荐,因此需要每个使用该系统的人都需要明确先进行注册登入,为每个用户生成唯一的用户id,根据用户的历史行为,实现对用户进行个性化推荐的效果。
(1)注册部分:
这部分在scheduler文件夹中的server.py文件中。在函数头前也是通过@app.route即flask中的路由,将url和注册函数绑定。
def register(): """用户注册""" request_str = request.get_data() request_dict = json.loads(request_str) user = RegisterUser() user.username = request_dict["username"] user.passwd = request_dict["passwd"] # 查询当前用户名是否已经被用过了 result = UserAction().user_is_exist(user, "register") if result != 0: return jsonify({"code": 500, "mgs": "this username is exists"}) user.userid = snowflake.client.get_guid() # 雪花算法生成唯一的用户id user.age = request_dict["age"] user.gender = request_dict["gender"] user.city = request_dict["city"] save_res = UserAction().save_user(user) # 将注册用户信息加入mysql if not save_res: return jsonify({"code": 500, "mgs": "register fail."}) return jsonify({"code": 200, "msg": "register success."})
可以看到,上面的注册部分主要是记录一些用户的一些基础属性,并将用户的注册信息写入msyql表当中。注意,为了防止并发问题导致用户id出现冲负问题,因此这里采用了Twitter的雪花算法来为给每个用户生成一个唯一的id。
雪花算法的启服,生成唯一的用户ID,在terminal命令:
snowflake_start_server --address=127.0.0.1 --port=8910 --dc=1 --worker=1
(2)登录部分:
@app.route('/recsys/login', methods=["POST"]) def login(): """用户登录 """ request_str = request.get_data() request_dict = json.loads(request_str) user = RegisterUser() user.username = request_dict["username"] user.passwd = request_dict["passwd"] # 查询数据库中的用户名或者密码是否存在 try: result = UserAction().user_is_exist(user, "login") # print(result,"login") if result == 1: return jsonify({"code": 200, "msg": "login success"}) elif result == 2: # 密码错误 return jsonify({"code": 500, "msg": "passwd is error"}) else: return jsonify({"code": 500, "msg": "this username is not exist!"}) except Exception as e: return jsonify({"code": 500, "mgs": "login fail."})
用户登陆部分,前端通过将输入的账号密码通过POST请求传给 /recsys/login,通过UserAction().user_is_exist()方法查询数据库中的用户名或者密码是否存在,其中1表示账号密码正确,2表示密码错误,0表示用户不存在。
2、推荐页列表
在项目样式展现的部分中,第一附图就是推荐页列表的样式,通过瀑布流的方式将新闻内容进行展现。
@app.route('/recsys/rec_list', methods=["GET"]) def rec_list(): """推荐页""" user_name = request.args.get('user_id') page_id = request.args.get('page_id') # 查询用户的id user_id = UserAction().get_user_id_by_name(user_name) if not user_id: return False if user_id is None or page_id is None: return jsonify({"code": 2000, "msg": "user_id or page_id is none!"}) try: # 获取推荐列表新闻信息 rec_news_list = recsys_server.get_rec_list(user_id, page_id) if len(rec_news_list) == 0: return jsonify({"code": 500, "msg": "rec_list data is empty."}) return jsonify({"code": 200, "msg": "request rec_list success.", "data": rec_news_list, "user_id": user_id}) except Exception as e: print(str(e)) return jsonify({"code": 500, "msg": "redis fail."})
该部分的主要逻辑是前端通过请求 “/recsys/rec_list” 接口,后端通过前端传递过来的用户姓名,从数据库中获取用户id,再根据用户id去推荐服务(recsys_server)中获取到推荐列表。
2.1、获取用户推荐列表
我们知道用户的推荐列表是通过推荐服务的 get_rec_list(user_id, page_id) 接口获取到的。其中需要两个参数:
user_id:通过用户id,我们可以去redis中查找已经给用户构建好的新闻列表,将新闻信息返回给前端。
page_id:通过page id定位到目前已经给用户推荐到列表的位置,然后在从该位置之后去新的新闻内容。
def get_rec_list(self, user_id, page_id): """给定页面的展示范围进行展示 user_id 后面做个性化推荐的时候需要用到""" # 根据page id计算需要获取redis中哪些范围的news_id, 假设每一页展示10个新闻 s = (int(page_id) - 1) * 10 e = s + 9 # 返回的是一个news_id列表 news_id_list = self.reclist_redis_db.zrange("rec_list", start=s, end=e) # 根据news_id获取新闻的具体内容,并返回一个列表,列表中的元素是按照顺序展示的新闻信息字典 news_info_list = [] news_expose_list = [] for news_id in news_id_list: news_info_dict = self._get_news_simple(news_id) news_info_list.append(news_info_dict) news_expose_list.append(news_info_dict["news_id"]) # 记录在用户曝光表上[user_exposure] self._save_user_exposure(user_id,news_expose_list) # 曝光落表 return news_info_list
这里的逻辑,主要是先根据page id,计算从redis中推荐列表取的范围。在得到新闻id列表之后,通过_get_news_simple() 方法从mysql何redis中获取新闻列表所需的展现内容。
为了提高用户体验,这里考虑将已经在推荐列表中给用户曝光过的新闻,当天内不会再通过热门页对用户进行曝光。因此这里需要利用_save_user_exposure()方法来将已经曝光过的新闻存储到redis中,这样在热门推荐中,针对用户的曝光会对热门推荐的内容进行过滤。
返回的数据格式如下:
"data": [ { "news_id": "4bfb8aab-bcd8-4c74-b7fd-92b28ca5df69", "cate": "国内", "read_num": 0, "likes": 0, "collections": 0, "ctime": "2021-11-30 12:07", "title": "....." }, ... { "news_id": "4ded60ac-aa2f-408b-af4d-09ca0c58b50a", "cate": "国内", "read_num": 6, "likes": 1, "collections": 0, "ctime": "2021-11-30 10:44", "title": "......" }]
3、热门推荐页
热门推荐页部分,前端通过请求’/recsys/hot_list'接口,通过传递用户姓名和当前页号来获取热门新闻列表。主要的逻辑和第二小点的获取推荐页相同,区别在于热门新闻信息主要是通过推荐服务(recsys_server)中的get_hot_list()方法来获取到热门新闻推荐列表。
@app.route('/recsys/hot_list', methods=["GET"]) def hot_list(): """热门页面""" if request.method == "GET": user_name = request.args.get('user_id') page_id = request.args.get('page_id') if user_name is None or page_id is None: return jsonify({"code": 2000, "msg": "user_name or page_id is none!"}) # 查询用户的id user_id = UserAction().get_user_id_by_name(user_name) if not user_id: return False try: # # 获取热门列表新闻信息 rec_news_list = recsys_server.get_hot_list(user_id) if len(rec_news_list) == 0: return jsonify({"code": 200, "msg": "request redis data fail."}) # rec_news_list = recsys_server.get_hot_list(user_id, page_id) return jsonify({"code": 200, "msg": "request hot_list success.", "data": rec_news_list, "user_id": user_id}) except Exception as e: print(str(e)) return jsonify({"code": 2000, "msg": "request hot_list fail."})
可以看到这里其实在后端逻辑上和推荐列表部分相似,主要在于get_hot_list()和get_rec_list()的区别;而热门推荐部分内在的细节内容,将会在后面详细介绍,这里不再赘述。
4、新闻详情页
在项目样式展现的部分中,第三附图就是新闻详情页的样式。该部分主要包含一些新闻的详细信息,其中还有两个按钮,用于收集用户的显性反馈,用户可以根据自己对该文章的喜好程度进行喜欢和收藏的反馈内容。
@app.route('/recsys/news_detail', methods=["GET"]) def news_detail(): """一篇文章的详细信息""" user_name = request.args.get('user_name') news_id = request.args.get('news_id') user_id = UserAction().get_user_id_by_name(user_name) # if news_id is None or user_id is None: if news_id is None or user_name is None: return jsonify({"code": 2000, "msg": "news_id is none or user_name is none!"}) try: news_detail = recsys_server.get_news_detail(news_id) if UserAction().get_likes_counts_by_user(user_id,news_id) > 0: news_detail["likes"] = True else: news_detail["likes"] = False if UserAction().get_coll_counts_by_user(user_id,news_id) > 0: news_detail["collections"] = True else: news_detail["collections"] = False # print("test",news_detail) return jsonify({"code": 0, "msg": "request news_detail success.", "data": news_detail}) except Exception as e: print(str(e)) return jsonify({"code": 2000, "msg": "error"})
上面就是详情页的后端逻辑,通过用户名字从mysql中获取用户id信息。防止用户id或者 page id出现空值的情况,需要进行判断。紧接着通过recsys_server服务的get_news_detail()方法,根据新闻的id进行获取内容。
如果用户对该新闻之前点击过喜欢或收藏,再次点击该新闻应该在喜欢或收藏按钮应该是点亮状态,因此还需要根据mysql中再次查询用户与该新闻是否存在记录,并将结果返回给前端,将其进行点亮展示。这里采用两个字段likes和collections,通过True,False来判断用户对该文章之前是否点击过喜欢或收藏。
返回的数据格式如下:
{ "code": 0, "data": { "news_id": "4ded60ac-aa2f-408b-af4d-09ca0c58b50a", "cate": "娱乐", "title": "......”", "content": "......”。", "collections": true, "read_num": 6, "likes": true, "ctime": "2021-11-30 10:44", "url": "https://news.sina.com.cn/c/2021-11-30/doc-ikyakumx1093113.shtml" }, "msg": "request news_detail success." }
5、用户的行为
在该系统中,用户在看新闻时主要会留下三种用户行为:
一是阅读,即用户在点击一篇新闻的详细页时,用户产生的行为;
二是喜欢,在新闻详情页下面会存在喜欢按钮,用户可以通过点击按钮触发系统记录该行为;
三是收藏,和喜欢行为同理,需要通过用户主动的方式来触发。
因此在用户点进一篇新闻的详情页时候,前端会发送一个请求,并给后端传递一个json格式数据:
{ "user_name":"wang", "news_id":"0a745412-db48-4e37-bf13-9a5b56028f7e", "action_time":1638532127190, "action_type":"read" }
在点击喜欢或收藏按钮的时候同样会产生一个请求,并发送json
数据:
//点击喜欢 { "user_name":"wang", "news_id":"0a745412-db48-4e37-bf13-9a5b56028f7e", "action_time":1638532127190, "action_type":"like:ture" } //点击收藏 { "user_name":"wang", "news_id":"0a745412-db48-4e37-bf13-9a5b56028f7e", "action_time":1638532127190, "action_type":"collections:true" }
通过前端的传递的数据,后端对应的接口可以通过传递的参数对用户行为进行记录:
@app.route('/recsys/action', methods=["POST"]) def actions(): """用户的行为:阅读,点赞,收藏""" request_str = request.get_data() request_dict = json.loads(request_str) username = request_dict.get('user_name') newsid = request_dict.get('news_id') actiontype = request_dict.get("action_type") actiontime = request_dict.get("action_time") userid = UserAction().get_user_id_by_name(username) # 获取用户 id if not userid: return jsonify({"code": 2000, "msg": "user not register"}) action_type_list = actiontype.split(":") if len(action_type_list) == 2: _action_type = action_type_list[0] if action_type_list[1] == "false": # 如果这个参数为false的话, 表示数据库中存在记录 需要删除数据 if _action_type=="likes": UserAction().del_likes_by_user(userid,newsid) # 删除用户喜欢记录 elif _action_type=="collections": UserAction().del_coll_by_user(userid,newsid) # 删除用户收藏记录 else: if _action_type=="likes": # 如果这个参数为true的话, 表示数据库中不存在记录 需要添加数据 userlikes = UserLikes() userlikes.new(userid,username,newsid) UserAction().save_one_action(userlikes) # 记录用户喜欢记录 elif _action_type=="collections": usercollections = UserCollections() usercollections.new(userid,username,newsid) UserAction().save_one_action(usercollections) # 记录用户收藏记录 try: # 落日志 logitem = LogItem() logitem.new(userid,newsid,action_type_list[0]) LogController().save_one_log(logitem) # 更新redis中的展示数据 新闻侧 recsys_server.update_news_dynamic_info(news_id=newsid,action_type=action_type_list) return jsonify({"code": 200, "msg": "action success"}) except Exception as e: print(str(e)) return jsonify({"code": 2000, "msg": "action error"})
上述代码中主要存在三部分内容:
(1)用户行为记录:
在前端传递过来的数据中存在一个字段 "action_type":"like:ture" 或 "action_type":"like:false"(收藏行为类似),对于action_type参数,其值会是一个组合字符串,冒号前面表示用户的具体行为,冒号后面表示用户当前的行为是点击喜欢还是取消喜欢(例如用户误触导致,用户再次点击则会取消)。
通过true和false我们不仅可以知道当前用户是点击还是取消,其实还可以知道在数据库中是否存在该用户对该新闻的行为记录。原因是当传递来的是false时,表明like的状态是从true变为false,因此数据库中肯定会存在该记录,如果是true,表明like的状态是从false变为true,表明此时数据库中不存在该用户对该新闻的行为记录。通过这样的方式,我们可以比较简单的对数据库进行操作,记录用户的行为。
(2)用户行为落日志:
在企业中,任何系统都会有日志的存在,其中最主要的作用是,日志相当于一个监控器,可以随时监测系统是否出现故障,通过日志可以及时定位系统中可能存在的问题。
但是我们说的日志还有所区别,我们这里所说的日志主要是记录的一些线上信息,通过日志的方式进行记录,类似于我们这个系统,我们需要通过分析这样的用户行为来更好的了解用户兴趣,从而进行更加个性化的推荐。因此我们可以借助日志的方式来记录有意义的用户数据,通过日志数据去分析数据,构建模型。
在我们的新闻推荐系统中这么做的原因:
认识到日志的意义,即可以直接通过日志获取一些线上有意义的用户数据。
通过日志数据,可以帮助我们更新用户画像中的一些动态特征。
在后面构建模型时,我们也能获取到用户的一些点击率,收藏率的建模,为后面的工作提供数据基础。
上诉代码中,我们通过 LogController() 的 save_one_log() 方法对数据进行了存储到了mysql中。
(3)新闻动态数据更新
由于我们在展现时会显示该新闻的阅读人数、喜欢人数和收藏人数,因此用户的行为实际上会改变新闻这三个属性。因此我们需要更新redis中新闻的这些动态的数据。
主要是通过推荐服务里面的 update_news_dynamic_info()方法进行更新。
def update_news_dynamic_info(self, news_id,action_type): """更新新闻展示的详细信息""" news_dynamic_info_str = self.dynamic_news_info_redis_db.get("dynamic_news_detail:" + news_id) news_dynamic_info_str = news_dynamic_info_str.replace("'", '"' ) # 将单引号都替换成双引号 news_dynamic_info_dict = json.loads(news_dynamic_info_str) if len(action_type) == 2: if action_type[1] == "true": news_dynamic_info_dict[action_type[0]] +=1 elif action_type[1] == "false": news_dynamic_info_dict[action_type[0]] -=1 else: news_dynamic_info_dict["read_num"] +=1 news_dynamic_info_str = json.dumps(news_dynamic_info_dict) news_dynamic_info_str = news_dynamic_info_str.replace('"', "'" ) res = self.dynamic_news_info_redis_db.set("dynamic_news_detail:" + news_id, news_dynamic_info_str) return res
上述代码主要是新闻动态特征更新的部分,主要是获取redis中的信息,根据前端传递过来的行为来更新对用新闻属性的值。更改完之后,从新将新的结果从新存储到redis中。