QA 机器人实现最后一步——代码的封装和提供接口

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: QA 机器人实现最后一步——代码的封装和提供接口

代码封装和对外提供接


目标


能够完成封装的代码

能够使用grpc对外提供接口

能够使用supervisord完成服务的管理


1. 完成代码的封装


代码封装过程中,需要注意,在整个结构中,我们有很多的结算结果是dump到本地的,为了防止后续每次的重复计算。所以laod的结果,应该提前加载到内容,而不是每次调用load义词


1.1 完成意图识别代码封装


完成判断用户意图的代码,即在使用fasttext的模型,判断用户输入句子的分类

import fastText
import re
from lib import jieba_cut
fc_word_mode = fastText.load_model("./classify/data/ft_classify.model")
fc_word_mode = fastText.load_model("./classify/data/ft_classify_words.model")
def is_QA(sentence_info):
    python_qs_list = [" ".join(sentence_info["cuted_sentence"])]
    result = fc_word_mode.predict(python_qs_list)
    python_qs_list = [" ".join(sentence_info["cuted_word_sentence"])]
    words_result = fc_word_mode.predict(python_qs_list)
    for index, (label,acc,word_label,word_acc) in enumerate(zip(*result,*words_result)):
        label = label[0]
        acc = acc[0]
        word_label = word_label[0]
        word_acc = word_acc[0]
        #以label_qa为准,如果预测结果是label_chat,则label_qa的概率=1-labele_chat
        if label == "__label__chat":
            label = "__label__QA"
            acc = 1-acc
        if word_label == "__label__chat":
            word_label = "__label__QA"
            word_acc = 1 - word_acc
        if acc>0.95 or word_acc>0.95:
            #是QA
            return True
        else:
            return False


1.2 完成对chatbot代码的封装


提供predict的接口

"""
准备闲聊的模型
"""
import pickle
from lib import jieba_cut
import numpy as np
from chatbot import Sequence2Sequence
class Chatbot:
    def __init__(self,ws_path="./chatbot/data/ws.pkl",save_path="./chatbot/model/seq2seq_chatbot.ckpt"):
        self.ws_chatbot = pickle.load(open(ws_path, "rb"))
        self.save_path = save_path
    #TODO .....
    def predict(self,s):
        """
        :param s:没有分词的
        :param ws:
        :param ws_words:
        :return:
        """
        #TODO ...
        return ans


1.3 完成对问答系统召回的封装


"""
进行召回的方法
"""
import os
import pickle
class Recall:
    def __init__(self,topk=20):
        # 准备问答的mode等模块
        self.topk = topk
    def predict(self,sentence):
        """
        :param sentence:
        :param debug:
        :return: [recall list],[entity]
        """
        #TODO recall
        return recall_list
    def get_answer(self,s):
        return self.QA_dict[s]


1.4 完成对问答排序模型的封装


"""
深度学习排序
"""
import tensorflow as tf
import pickle
from DNN2 import SiamsesNetwork
from lib import jieba_cut
class DNNSort():
    def __init__(self):
        #使用词语和单字两个模型的均值作为最后的结果
        self.dnn_sort_words = DNNSortWords()
        self.dnn_sort_single_word = DNNSortSingleWord()
    def predict(self,s,c_list):
        sort1 = self.dnn_sort_words.predict(s,c_list)
        sort2 = self.dnn_sort_single_word.predict(s,c_list)
        for i in sort1:
            sort1[i] = (sort1[i]+ sort2[i])/2
        sorts = sorted(sort1.items(),key=lambda x:x[-1],reverse=True)
        return sorts[0][0],sorts[0][1]
class DNNSortWords:
    def __init__(self,ws_path="./DNN2/data/ws_80000.pkl",save_path="./DNN2/model_keras/esim_model_softmax.ckpt"):
        self.ws = pickle.load(open(ws_path, "rb"))
        self.save_path = save_path
    #TOOD ...
    def predict(self,s,c_list):
        """
        :param s:没有分词的
        :param c_list: 带比较的列表
        :param ws:
        :param ws_words:
        :return:
        """
        #TOOD ...
        return sim_dict
class DNNSortSingleWord:
    def __init__(self,ws_path="./DNN2/data/ws_word.pkl",save_path="./DNN2/data/esim_word_model_softmax.ckpt"):
        self.ws = pickle.load(open(ws_path, "rb"))
        self.save_path = save_path
        #TOOD ...
    def predict(self,s,c_list):
        """
        :param s:没有分词的
        :param c_list: 带比较的列表
        :param ws:
        :param ws_words:
        :return:
        """
    #TOOD ...
        return sim_dict


1.5 实现对聊天记录的保存


不同的用户,连续10分钟内的对话认为是一轮对话,如果10分还没有下一次对话,认为该轮对话结束,如果10分钟后开始对话,认为是下一轮对话。是要是为了保存不同轮中的聊天主题,后续可以实现基本的对话管理。比如用户刚问了python相关的问题,后续如果问题中不带主体,那么就把redis中的python作为其主体

主要实现逻辑为:

  1. 使用redis存储用户基本的数据
  2. 使用mongodb存储对话记录

具体思路如下:

  1. 根据用户id,获取对话id,根据对话id判断当前的对话是否存在
  2. 如果对话id存在:
  1. 更新对话的entity,上一次对话的时间,设置对话id的过期时间
  2. 保存数据到mongodb
  1. 如果对话id不存在:
  1. 创建用户的基础信息(user_id,entiry,对话时间)
  2. 把用户的基础信息存入redis,同时设置对话id和过期四火箭
  3. 保存数据到mongodb中
"""
获取,更新用户的信息
"""
from pymongo import MongoClient
import redis
from uuid import uuid1
import time
import json
"""
### redis
{
user_id:"id",
user_background:{}
last_entity:[]
last_conversation_time:int(time):
}
userid_conversation_id:""
### monodb 存储对话记录
{user_id:,conversion_id:,from:user/bot,message:"",create_time,entity:[],attention:[]}
"""
HOST = "localhost"
CNVERSION_EXPERID_TIME = 60 * 10  # 10分钟,连续10分钟没有通信,意味着会话结束
class MessageManager:
    def __init__(self):
        self.client = MongoClient(host=HOST)
        self.m = self.client["toutiao"]["dialogue"]
        self.r = redis.Redis(host=HOST, port=6379, db=10)
    def last_entity(self, user_id):
        """最近一次的entity"""
        return json.loads(self.r.hget(user_id, "entity"))
    def gen_conversation_id(self):
        return uuid1().hex
    def bot_message_pipeline(self, user_id, message):
        """保存机器人的回复记录"""
        conversation_id_key = "{}_conversion_id".format(user_id)
        conversation_id = self.user_exist(conversation_id_key)
        if conversation_id:
            # 更新conversation_id的过期时间
            self.r.expire(conversation_id_key, CNVERSION_EXPERID_TIME)
            data = {"user_id": user_id,
                    "conversation_id": conversation_id,
                    "from": "bot",
                    "message": message,
                    "create_time": int(time.time()),
                    }
            self.m.save(data)
        else:
            raise ValueError("没有会话id,但是机器人尝试回复....")
    def user_message_pipeline(self, user_id, message, create_time, attention, entity=[]):
        # 确定用户相关的信息
        # 1. 用户是否存在
        # 2.1 用户存在,返回用户的最近的entity,存入最近的对话
        # 3.1 判断是否为新的对话,如果是新对话,开启新的回话,update用户的对话信息
        # 3.2 如果不是新的对话,update用户的对话信息
        # 3. 更新用户的基本信息
        # 4  返回用户相关信息
        # 5. 调用预测接口,发来对话的结构
        # 要保存的data数据,缺少conversation_id
        data = {
            "user_id": user_id,
            "from": "user",
            "message": message,
            "create_time": create_time,
            "entity": json.dumps(entity),
            "attention": attention,
        }
        conversation_id_key = "{}_conversion_id".format(user_id)
        conversation_id = self.user_exist(conversation_id_key)
        print("conversation_id",conversation_id)
        if conversation_id:
            if entity:
                # 更新当前用户的 last_entity
                self.r.hset(user_id, "last_entity", json.dumps(entity))
            # 更新最后的对话时间
            self.r.hset(user_id, "last_conversion_time", create_time)
            # 设置conversation id的过期时间
            self.r.expire(conversation_id_key, CNVERSION_EXPERID_TIME)
            # 保存聊天记录到mongodb中
            data["conversation_id"] = conversation_id
            self.m.save(data)
            print("mongodb 保存数据成功")
        else:
            # 不存在
            user_basic_info = {
                "user_id": user_id,
                "last_conversion_time": create_time,
                "last_entity": json.dumps(entity)
            }
            self.r.hmset(user_id, user_basic_info)
            print("redis存入 user_basic_info success")
            conversation_id = self.gen_conversation_id()
            print("生成conversation_id",conversation_id)
            # 设置会话的id
            self.r.set(conversation_id_key, conversation_id, ex=CNVERSION_EXPERID_TIME)
            # 保存聊天记录到mongodb中
            data["conversation_id"] = conversation_id
            self.m.save(data)
            print("mongodb 保存数据成功")
    def user_exist(self, conversation_id_key):
        """
        判断用户是否存在
        :param user_id:用户id
        :return:
        """
        conversation_id = self.r.get(conversation_id_key)
        if conversation_id:
            conversation_id = conversation_id.decode()
        print("load conversation_id",conversation_id)
        return conversation_id


2. 使用GRPC对外提供服务


2.1 安装grpc相关环境


gRPC 的安装:`pip install grpcio`
安装 ProtoBuf 相关的 python 依赖库:`pip install protobuf`
安装 python grpc 的 protobuf 编译工具:`pip install grpcio-tools`


2.2 定义GRPC的接口


//chatbot.proto 文件
syntax = "proto3";
message ReceivedMessage {
    string user_id = 1; //用户id
    string user_message = 2; //当前用户传递的消息
    int32 create_time = 3; //当前消息发送的时间
}
message ResponsedMessage {
    string user_response = 1; //返回给用户的消息
    int32 create_time = 2; //返回给用户的时间
}
service ChatBotService {
  rpc Chatbot (ReceivedMessage) returns (ResponsedMessage);
}


2.3 编译生成protobuf文件


使用下面的命令编译,得到chatbot_pb2.pychatbot_pb2_grpc.py文件

python -m grpc_tools.protoc -I. –python_out=. –grpc_python_out=. ./chatbot.proto


2.4 使用grpc提供服务


import dialogue
from classify import is_QA
from dialogue.process_sentence import process_user_sentence
from chatbot_grpc import chatbot_pb2_grpc
from chatbot_grpc import chatbot_pb2
import time
class chatServicer(chatbot_pb2_grpc.ChatBotServiceServicer):
    def __init__(self):
        #提前加载各种模型
        self.recall = dialogue.Recall(topk=20)
        self.dnnsort = dialogue.DNNSort()
        self.chatbot = dialogue.Chatbot()
        self.message_manager = dialogue.MessageManager()
    def Chatbot(self, request, context):
        user_id = request.user_id
        message = request.user_message
        create_time = request.create_time
        #对用户的输出进行基础的处理,如分词
        message_info = process_user_sentence(message)
        if is_QA(message_info):
            attention = "QA"
            #实现对对话数据的保存
            self.message_manager.user_message_pipeline(user_id, message, create_time, attention, entity=message_info["entity"])
            recall_list,entity = self.recall.predict(message_info)
            line, score = self.dnnsort.predict(message,recall_list)
            if score > 0.7:
                ans = self.recall.get_answer(line)
                user_response = ans["ans"]
            else:
                user_response = "不好意思,这个问题我还没学习到..."
        else:
            attention = "chat"
            # 实现对对话数据的保存
            self.message_manager.user_message_pipeline(user_id,message,create_time,attention,entity=message_info["entity"])
            user_response = self.chatbot.predict(message)
        self.message_manager.bot_message_pipeline(user_id,user_response)
        user_response = user_response
        create_time = int(time.time())
        return chatbot_pb2.ResponsedMessage(user_response=user_response,create_time=create_time)
def serve():
    import grpc
    from concurrent import futures
    # 多线程服务器
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    # 注册本地服务
    chatbot_pb2_grpc.add_ChatBotServiceServicer_to_server(chatServicer(), server)
    # 监听端口
    server.add_insecure_port("[::]:9999")
    # 开始接收请求进行服务
    server.start()
    # 使用 ctrl+c 可以退出服务
    try:
        time.sleep(1000)
    except KeyboardInterrupt:
        server.stop(0)
if __name__ == '__main__':
    serve()


3. 使用supervisor完成对服务的管理


3.1 编写简单的执行脚本


3. 使用supervisor完成对服务的管理
3.1 编写简单的执行脚本
#!/bin/bash


3.2 安装、配置supervisor


supervisor现在的官方版本还是python2的,但是可以使用下面的命令安装python3版本

pip3 install git+https://github.com/Supervisor/supervisor    


完成supervisor的配置文件的编写,conf中使用分号作为注释符号

;conf.d
[program:chat_service]
command=/root/chat_service/run.sh  ;执行的命令
stdout_logfile=/root/chat_service/log/out.log ;log的位置
stderr_logfile=/root/chat_service/log/error.log  ;错误log的位置
directory=/root/chat_service  ;路径
autostart=true  ;是否自动启动
autorestart=true  ;是否自动重启
startretries=10 ;失败的最大尝试次数


在supervisor的基础配置中添加上述配置文件

;/etc/supervisord/supervisor.conf 
[include]
files=/root/chat_service/conf.d


运行supervisord

supervisord -c /etc/supervisord/supervisor.conf

相关实践学习
【文生图】一键部署Stable Diffusion基于函数计算
本实验教你如何在函数计算FC上从零开始部署Stable Diffusion来进行AI绘画创作,开启AIGC盲盒。函数计算提供一定的免费额度供用户使用。本实验答疑钉钉群:29290019867
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
6月前
|
运维 监控 安全
调用钉钉机器人API接口将堡垒机安全运维告警单发给运维人员
调用钉钉机器人API接口将堡垒机安全运维告警单发给运维人员
191 0
|
6月前
|
JSON 机器人 数据安全/隐私保护
钉钉中,如何获取机器人发送群聊消息接口返回的加密消息id(processQueryKey)?
钉钉中,如何获取机器人发送群聊消息接口返回的加密消息id(processQueryKey)?【1月更文挑战第5天】【1月更文挑战第24篇】
167 5
|
机器学习/深度学习 算法 机器人
路径规划算法:基于蜉蝣优化的机器人路径规划算法- 附matlab代码
路径规划算法:基于蜉蝣优化的机器人路径规划算法- 附matlab代码
路径规划算法:基于蜉蝣优化的机器人路径规划算法- 附matlab代码
|
3月前
|
机器人 API Python
智能对话机器人(通义版)会话接口API使用Quick Start
本文主要演示了如何使用python脚本快速调用智能对话机器人API接口,在参数获取的部分给出了具体的获取位置截图,这部分容易出错,第一次使用务必仔细参考接入参数获取的位置。
205 1
|
3月前
|
机器学习/深度学习 算法 机器人
【2023年第十三届APMCM亚太地区大学生数学建模竞赛】A题 水果采摘机器人的图像识别 Python代码解析
本文介绍了2023年第十三届APMCM亚太地区大学生数学建模竞赛A题的Python代码实现,详细阐述了水果采摘机器人图像识别问题的分析与解决策略,包括图像特征提取、数学模型建立、目标检测算法使用,以及苹果数量统计、位置估计、成熟度评估和质量估计等任务的编程实践。
74 0
【2023年第十三届APMCM亚太地区大学生数学建模竞赛】A题 水果采摘机器人的图像识别 Python代码解析
|
4月前
|
机器学习/深度学习 人工智能 自然语言处理
Midjourney是一个基于GPT-3.5系列接口开发的免费AI机器人
Midjourney是一个基于GPT-3.5系列接口开发的免费AI机器人
80 0
|
机器学习/深度学习 算法 机器人
路径规划算法:基于被囊群优化的机器人路径规划算法- 附matlab代码
路径规划算法:基于被囊群优化的机器人路径规划算法- 附matlab代码
|
机器学习/深度学习 算法 机器人
路径规划算法:基于蜻蜓分布优化的机器人路径规划算法- 附matlab代码
路径规划算法:基于蜻蜓分布优化的机器人路径规划算法- 附matlab代码
|
6月前
|
运维 监控 安全
【优化篇】调用钉钉机器人API接口将堡垒机安全运维告警单发给运维人员
【优化篇】调用钉钉机器人API接口将堡垒机安全运维告警单发给运维人员
129 0
|
机器学习/深度学习 传感器 算法
基于Dijkstra、A*和动态规划的移动机器人路径规划(Matlab代码实现)
基于Dijkstra、A*和动态规划的移动机器人路径规划(Matlab代码实现)

热门文章

最新文章