代码封装和对外提供接
目标
能够完成封装的代码
能够使用grpc对外提供接口
能够使用supervisord完成服务的管理
1. 完成代码的封装
代码封装过程中,需要注意,在整个结构中,我们有很多的结算结果是dump到本地的,为了防止后续每次的重复计算。所以laod的结果,应该提前加载到内容,而不是每次调用load义词
1.1 完成意图识别代码封装
完成判断用户意图的代码,即在使用fasttext的模型,判断用户输入句子的分类
import fastText import re from lib import jieba_cut fc_word_mode = fastText.load_model("./classify/data/ft_classify.model") fc_word_mode = fastText.load_model("./classify/data/ft_classify_words.model") def is_QA(sentence_info): python_qs_list = [" ".join(sentence_info["cuted_sentence"])] result = fc_word_mode.predict(python_qs_list) python_qs_list = [" ".join(sentence_info["cuted_word_sentence"])] words_result = fc_word_mode.predict(python_qs_list) for index, (label,acc,word_label,word_acc) in enumerate(zip(*result,*words_result)): label = label[0] acc = acc[0] word_label = word_label[0] word_acc = word_acc[0] #以label_qa为准,如果预测结果是label_chat,则label_qa的概率=1-labele_chat if label == "__label__chat": label = "__label__QA" acc = 1-acc if word_label == "__label__chat": word_label = "__label__QA" word_acc = 1 - word_acc if acc>0.95 or word_acc>0.95: #是QA return True else: return False
1.2 完成对chatbot代码的封装
提供predict的接口
""" 准备闲聊的模型 """ import pickle from lib import jieba_cut import numpy as np from chatbot import Sequence2Sequence class Chatbot: def __init__(self,ws_path="./chatbot/data/ws.pkl",save_path="./chatbot/model/seq2seq_chatbot.ckpt"): self.ws_chatbot = pickle.load(open(ws_path, "rb")) self.save_path = save_path #TODO ..... def predict(self,s): """ :param s:没有分词的 :param ws: :param ws_words: :return: """ #TODO ... return ans
1.3 完成对问答系统召回的封装
""" 进行召回的方法 """ import os import pickle class Recall: def __init__(self,topk=20): # 准备问答的mode等模块 self.topk = topk def predict(self,sentence): """ :param sentence: :param debug: :return: [recall list],[entity] """ #TODO recall return recall_list def get_answer(self,s): return self.QA_dict[s]
1.4 完成对问答排序模型的封装
""" 深度学习排序 """ import tensorflow as tf import pickle from DNN2 import SiamsesNetwork from lib import jieba_cut class DNNSort(): def __init__(self): #使用词语和单字两个模型的均值作为最后的结果 self.dnn_sort_words = DNNSortWords() self.dnn_sort_single_word = DNNSortSingleWord() def predict(self,s,c_list): sort1 = self.dnn_sort_words.predict(s,c_list) sort2 = self.dnn_sort_single_word.predict(s,c_list) for i in sort1: sort1[i] = (sort1[i]+ sort2[i])/2 sorts = sorted(sort1.items(),key=lambda x:x[-1],reverse=True) return sorts[0][0],sorts[0][1] class DNNSortWords: def __init__(self,ws_path="./DNN2/data/ws_80000.pkl",save_path="./DNN2/model_keras/esim_model_softmax.ckpt"): self.ws = pickle.load(open(ws_path, "rb")) self.save_path = save_path #TOOD ... def predict(self,s,c_list): """ :param s:没有分词的 :param c_list: 带比较的列表 :param ws: :param ws_words: :return: """ #TOOD ... return sim_dict class DNNSortSingleWord: def __init__(self,ws_path="./DNN2/data/ws_word.pkl",save_path="./DNN2/data/esim_word_model_softmax.ckpt"): self.ws = pickle.load(open(ws_path, "rb")) self.save_path = save_path #TOOD ... def predict(self,s,c_list): """ :param s:没有分词的 :param c_list: 带比较的列表 :param ws: :param ws_words: :return: """ #TOOD ... return sim_dict
1.5 实现对聊天记录的保存
不同的用户,连续10分钟内的对话认为是一轮对话,如果10分还没有下一次对话,认为该轮对话结束,如果10分钟后开始对话,认为是下一轮对话。是要是为了保存不同轮中的聊天主题,后续可以实现基本的对话管理。比如用户刚问了python相关的问题,后续如果问题中不带主体,那么就把redis中的python作为其主体
主要实现逻辑为:
- 使用redis存储用户基本的数据
- 使用mongodb存储对话记录
具体思路如下:
- 根据用户id,获取对话id,根据对话id判断当前的对话是否存在
- 如果对话id存在:
- 更新对话的entity,上一次对话的时间,设置对话id的过期时间
- 保存数据到mongodb
- 如果对话id不存在:
- 创建用户的基础信息(user_id,entiry,对话时间)
- 把用户的基础信息存入redis,同时设置对话id和过期四火箭
- 保存数据到mongodb中
""" 获取,更新用户的信息 """ from pymongo import MongoClient import redis from uuid import uuid1 import time import json """ ### redis { user_id:"id", user_background:{} last_entity:[] last_conversation_time:int(time): } userid_conversation_id:"" ### monodb 存储对话记录 {user_id:,conversion_id:,from:user/bot,message:"",create_time,entity:[],attention:[]} """ HOST = "localhost" CNVERSION_EXPERID_TIME = 60 * 10 # 10分钟,连续10分钟没有通信,意味着会话结束 class MessageManager: def __init__(self): self.client = MongoClient(host=HOST) self.m = self.client["toutiao"]["dialogue"] self.r = redis.Redis(host=HOST, port=6379, db=10) def last_entity(self, user_id): """最近一次的entity""" return json.loads(self.r.hget(user_id, "entity")) def gen_conversation_id(self): return uuid1().hex def bot_message_pipeline(self, user_id, message): """保存机器人的回复记录""" conversation_id_key = "{}_conversion_id".format(user_id) conversation_id = self.user_exist(conversation_id_key) if conversation_id: # 更新conversation_id的过期时间 self.r.expire(conversation_id_key, CNVERSION_EXPERID_TIME) data = {"user_id": user_id, "conversation_id": conversation_id, "from": "bot", "message": message, "create_time": int(time.time()), } self.m.save(data) else: raise ValueError("没有会话id,但是机器人尝试回复....") def user_message_pipeline(self, user_id, message, create_time, attention, entity=[]): # 确定用户相关的信息 # 1. 用户是否存在 # 2.1 用户存在,返回用户的最近的entity,存入最近的对话 # 3.1 判断是否为新的对话,如果是新对话,开启新的回话,update用户的对话信息 # 3.2 如果不是新的对话,update用户的对话信息 # 3. 更新用户的基本信息 # 4 返回用户相关信息 # 5. 调用预测接口,发来对话的结构 # 要保存的data数据,缺少conversation_id data = { "user_id": user_id, "from": "user", "message": message, "create_time": create_time, "entity": json.dumps(entity), "attention": attention, } conversation_id_key = "{}_conversion_id".format(user_id) conversation_id = self.user_exist(conversation_id_key) print("conversation_id",conversation_id) if conversation_id: if entity: # 更新当前用户的 last_entity self.r.hset(user_id, "last_entity", json.dumps(entity)) # 更新最后的对话时间 self.r.hset(user_id, "last_conversion_time", create_time) # 设置conversation id的过期时间 self.r.expire(conversation_id_key, CNVERSION_EXPERID_TIME) # 保存聊天记录到mongodb中 data["conversation_id"] = conversation_id self.m.save(data) print("mongodb 保存数据成功") else: # 不存在 user_basic_info = { "user_id": user_id, "last_conversion_time": create_time, "last_entity": json.dumps(entity) } self.r.hmset(user_id, user_basic_info) print("redis存入 user_basic_info success") conversation_id = self.gen_conversation_id() print("生成conversation_id",conversation_id) # 设置会话的id self.r.set(conversation_id_key, conversation_id, ex=CNVERSION_EXPERID_TIME) # 保存聊天记录到mongodb中 data["conversation_id"] = conversation_id self.m.save(data) print("mongodb 保存数据成功") def user_exist(self, conversation_id_key): """ 判断用户是否存在 :param user_id:用户id :return: """ conversation_id = self.r.get(conversation_id_key) if conversation_id: conversation_id = conversation_id.decode() print("load conversation_id",conversation_id) return conversation_id
2. 使用GRPC对外提供服务
2.1 安装grpc相关环境
gRPC 的安装:`pip install grpcio` 安装 ProtoBuf 相关的 python 依赖库:`pip install protobuf` 安装 python grpc 的 protobuf 编译工具:`pip install grpcio-tools`
2.2 定义GRPC的接口
//chatbot.proto 文件 syntax = "proto3"; message ReceivedMessage { string user_id = 1; //用户id string user_message = 2; //当前用户传递的消息 int32 create_time = 3; //当前消息发送的时间 } message ResponsedMessage { string user_response = 1; //返回给用户的消息 int32 create_time = 2; //返回给用户的时间 } service ChatBotService { rpc Chatbot (ReceivedMessage) returns (ResponsedMessage); }
2.3 编译生成protobuf文件
使用下面的命令编译,得到chatbot_pb2.py
和chatbot_pb2_grpc.py
文件
python -m grpc_tools.protoc -I. –python_out=. –grpc_python_out=. ./chatbot.proto
2.4 使用grpc提供服务
import dialogue from classify import is_QA from dialogue.process_sentence import process_user_sentence from chatbot_grpc import chatbot_pb2_grpc from chatbot_grpc import chatbot_pb2 import time class chatServicer(chatbot_pb2_grpc.ChatBotServiceServicer): def __init__(self): #提前加载各种模型 self.recall = dialogue.Recall(topk=20) self.dnnsort = dialogue.DNNSort() self.chatbot = dialogue.Chatbot() self.message_manager = dialogue.MessageManager() def Chatbot(self, request, context): user_id = request.user_id message = request.user_message create_time = request.create_time #对用户的输出进行基础的处理,如分词 message_info = process_user_sentence(message) if is_QA(message_info): attention = "QA" #实现对对话数据的保存 self.message_manager.user_message_pipeline(user_id, message, create_time, attention, entity=message_info["entity"]) recall_list,entity = self.recall.predict(message_info) line, score = self.dnnsort.predict(message,recall_list) if score > 0.7: ans = self.recall.get_answer(line) user_response = ans["ans"] else: user_response = "不好意思,这个问题我还没学习到..." else: attention = "chat" # 实现对对话数据的保存 self.message_manager.user_message_pipeline(user_id,message,create_time,attention,entity=message_info["entity"]) user_response = self.chatbot.predict(message) self.message_manager.bot_message_pipeline(user_id,user_response) user_response = user_response create_time = int(time.time()) return chatbot_pb2.ResponsedMessage(user_response=user_response,create_time=create_time) def serve(): import grpc from concurrent import futures # 多线程服务器 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # 注册本地服务 chatbot_pb2_grpc.add_ChatBotServiceServicer_to_server(chatServicer(), server) # 监听端口 server.add_insecure_port("[::]:9999") # 开始接收请求进行服务 server.start() # 使用 ctrl+c 可以退出服务 try: time.sleep(1000) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()
3. 使用supervisor完成对服务的管理
3.1 编写简单的执行脚本
3. 使用supervisor完成对服务的管理 3.1 编写简单的执行脚本 #!/bin/bash
3.2 安装、配置supervisor
supervisor现在的官方版本还是python2的,但是可以使用下面的命令安装python3版本
pip3 install git+https://github.com/Supervisor/supervisor
完成supervisor的配置文件的编写,conf中使用分号作为注释符号
;conf.d [program:chat_service] command=/root/chat_service/run.sh ;执行的命令 stdout_logfile=/root/chat_service/log/out.log ;log的位置 stderr_logfile=/root/chat_service/log/error.log ;错误log的位置 directory=/root/chat_service ;路径 autostart=true ;是否自动启动 autorestart=true ;是否自动重启 startretries=10 ;失败的最大尝试次数
在supervisor的基础配置中添加上述配置文件
;/etc/supervisord/supervisor.conf [include] files=/root/chat_service/conf.d
运行supervisord
supervisord -c /etc/supervisord/supervisor.conf