阿里云百炼应用实践系列-基于LlamaIndex的文档问答助手

简介: 本文以百炼官方文档问答助手为例,介绍如何基于百炼平台打造基于LlamaIndex的RAG文档问答产品。我们基于百炼平台的底座能力,以官方帮助文档为指定知识库,搭建了问答服务,支持钉钉、Web访问。介绍了相关技术方案和主要代码,供开发者参考。

1. 内容介绍

本文以百炼官方文档问答助手为例,介绍如何基于百炼平台打造基于LlamaIndex的RAG文档问答产品。


百炼官方帮助文档内容较多,且不定时更新,对于需要经常查阅文档的同学来说,如何快速检索到所需信息是一个挑战。因此,我们基于百炼平台的底座能力,以官方帮助文档为指定知识库,搭建了问答服务,支持钉钉、Web访问,旨在提供友好、便捷的百炼文档问答服务。本项目也是基于百炼平台能力的RAG最佳实践之一。


测试效果-钉钉机器人:

image.png


测试效果-Web:

image.png


2. 技术方案


2.1 总体框架

本项目采用兼容LlamaIndex的RAG方案,使用Python语言开发。百炼平台Dashscope sdk提供了相关组件能力,具体接口说明可以参阅官方帮助文档。总体框架如下:

yuque_diagram.jpg


其中,

  • 百炼平台:使用到了大模型服务和数据中心两个能力,其中大模型服务提供通义千问系列大模型的问答服务;数据中心提供了“数据管理-导入数据”和“数据应用-知识索引”能力,将文档内容导入知识库中,供检索使用。
  • 问答机器人服务,主要包括:
  • Web Service:提供Web服务。
  • Chat Service:调用大模型问答API,检索知识库,并将结果以流式输出,对接Web和钉钉机器人。
  • Indexing Service:从网页抓取数据,通过百炼平台导入数据并创建索引,最终导入知识库,用于后续的检索。


本项目的开发对象是“问答机器人服务”部分,接下来将分别介绍WebService、Chat Service和Indexing Service的技术方案。由于我们基于百炼平台能力进行开发,因此需要先做好百炼环境基础准备:


2.2 Web Service

Python的Web Service有较成熟的开源框架可以使用。比如本项目使用主流的Python Web服务框架FastAPI,使用Gunicorn部署HTTP服务,具体代码不再赘述。添加的Web API主要有两个:

  • "/chat":接收问题,返回答案。
  • "/updateIndex": 更新知识库。可设置为单次更新,或者定时自动更新。


2.3 Chat Service

主要介绍大模型问答的实现方法。


2.3.1 大模型问答

大模型问答部分基于兼容LlamaIndex的Dashscope sdk实现,关键代码如下:


首先,设置大模型相关的模板:

# 设置 system_prompt
QA_SYSTEM_PROMPT = ChatMessage(
    content=("""
     # 角色
     您是百炼灵积的答疑机器人,能够基于百炼灵积相关的知识库的内容回答关于阿里云百炼灵积的产品使用、API/SDK调用和常见错误等相关问题。
     ## 技能
     ### 技能1:精准信息提取
     - **深入理解**:快速消化并理解文档内容,包括阿里云百炼灵积的产品文档、技术文档、API/SDK、常见问题等多种类型。
     - **问题对应**:精确匹配用户提问与文档中的相关信息,无需依赖外部知识或先前经验。
     ### 技能2:上下文应用
     - **间接引用**:在回答问题时,需间接体现对文档内容的理解,避免直接引用或提及“上下文”,确保回复自然流畅且基于当前文档。
     - **逻辑推理**:基于文档逻辑推导答案,即使答案未被直接陈述,也能通过综合分析得出合理结论。
     ## 工作原则
     1. **严格遵循上下文**:所有答案必须严格依据提供的文档内容,不得掺杂外部信息或假设。
     2. **表达方式**:在构造答案时,避免直接指示词如“根据上下文...”,而是通过整合信息形成独立、明确的回答。
     ## 知识库与工具
     - **依赖材料**:利用已提供的文档内容作为唯一信息源,进行问题解答。
     ## 限制
     - 仅限于文档内信息的查询与解答,不扩展至通用知识或未提及的外部资源。
     """),
    role=MessageRole.SYSTEM,
)

# 设置 message_templates
QA_PROMPT_TMPL_MSGS = [
    QA_SYSTEM_PROMPT,
    ChatMessage(
        content=(
            "下面是上下文信息: \n"
            "---------------------\n"
            "{context_str}\n"
            "---------------------\n"
            "在给定的上下文信息下,根据问题回答。"
            "问题: {query_str}\n"
        ),
        role=MessageRole.USER,
    ),
]

# 设置 context_template
CONTEXT_TEMPLATE = (
    "上下文信息如下:"
    "\n--------------------\n"
    "{context_str}"
    "\n--------------------\n"
)

# 设置 prompt_template
PROMPT_TEMPLATE = (
    "请根据阿里云百炼灵积的相关知识回答问题。\n"
    "问题: {prompt_str}"
)


然后,使用大模型进行问答:

# 获取知识库索引,其中your_index_name为用户创建的知识库名称
index = DashScopeCloudIndex('your_index_name')
# 获取大模型
llm = DashScope(model_name=DashScopeGenerationModels.QWEN_MAX,
                max_tokens=None,
                incremental_output=False)
text_qa_template = ChatPromptTemplate(message_templates=QA_PROMPT_TMPL_MSGS)
chat_engine = index.as_chat_engine(llm=llm,
                                   streaming=stream,
                                   chat_mode=ChatMode.CONTEXT,
                                   text_qa_template=text_qa_template,
                                   system_prompt=QA_SYSTEM_PROMPT.content,
                                   context_template=CONTEXT_TEMPLATE)
# 格式化prompt,其中question为用户原始问题
prompt = PROMPT_TEMPLATE.format(prompt_str={question})
# 获取答案,传入历史对话信息作为chat history
responses = chat_engine.chat(prompt, chat_history={history_messages})


其中,上述chat_history的设置,是为了大模型能够更好地支持多轮对话。将历史对话信息保存起来,在每次问答时作为chat_history参数和prompt一起传入chat engine。

chat history messages需要自行维护,每次问答结束后,需要保存的信息包括:

  • session_id:本次会话ID,由用户的request传入。
  • question: 用户问题。
  • answer:大模型输出的答案。

chat history的维护可以使用diskcache等缓存组件等,具体不作展开。


2.3.2 对接Web

为了方便用户及时查看回答内容,采用流式输出方案。

问答机器人服务作为发送端,使用Python实现。本项目基于FastAPI,使用StreamingResponse和Generator实现,关键代码示例如下:

# 生成答案
def stream_chat(self, request: ChatRequest) -> Generator[str, None, None]:
    responses = query_engine.stream_chat(prompt, chat_history=messages)
    for response in responses.response_gen:
        assistant_message += response
        yield ChatResponse(session_id=request.session_id, content=text)

# 封装答案
def chat_generator(request: ChatRequest) -> Generator[str, None, None]:
    try:
        results = chat_service.stream_chat(request=request)
        for result in results:
            data = ResponseModel(request_id=request.request_id,
                                 data=result).json()
            yield f"data: {data}\n\n"

# 流式返回答案
@router.post("/chat", summary="chat api")
async def chat(request: ChatRequest):
    return StreamingResponse(chat_generator(request=request), media_type="text/event-stream")


2.3.3 对接钉钉机器人

本节主要介绍如何通过钉钉Webhook群聊机器人进行文档问答。


1)创建机器人

首先,在钉钉群聊中,添加一个自定义机器人:

image.png


在这里,Webhook需要勾选上加签,并保存签名token,在后续发送消息时需要使用到。同时勾选开启Outgoing机制,后续需要再接收消息中使用。

image.png


注:上述的POST地址必须是公网可以访问的地址。


2) 对接机器人

首先,通过Outgoing机制,服务端接受机器人消息,也就是群聊里用户@机器人时,可以接收到用户的消息。


具体Python的参考代码如下:

@router.post("/ding_talk_service", summary="dingding chat")
async def ding_webhook_chat(message: Dict, request: Request):
    log.info("start ding webhook chat, ip: %s, request: %s" %
             (request.client.host, json.dumps(message, ensure_ascii=False)))
    #这里的token,也就是Outgoging机制中配置的token
    token = request.headers.get("token")
    if token != settings.DING_WEBHOOK_TOKEN:
        log.error("invalid token, message: %s" % message)
        return {"errorCode": 403,
                "errorMessage": "Token does not match"}
    #这里通过一个异步机制来做,防止钉钉outgoing请求超时
    asyncio.ensure_future(chat_service.webhook_chat(message=message))
    log.info("end ding webhook chat, ip: %s" % request.client.host)
    return {"errorCode": 0,
            "errorMessage": "success"}

async def webhook_chat(self, message: Dict):
    # 会话id
    conversation_id = message.get("conversationId")
    # 消息id
    msg_id = message.get("msgId")
    # 消息类型
    message_type = message.get("msgtype")
    try:
        # 文本消息内容
        if message_type == "text":
            text = message.get("text", {}).get("content", None)
        else:
            log.error("unknown supported type: %s, conversation_id: %s, msg_id: %s" %
                      (conversation_id, msg_id, message_type))
            return
        request = ChatRequest(request_id=msg_id,
                              session_id=conversation_id,
                              session_type="text_chat",
                              content=text)
        responses = self._stream_chat(request)
        response_text = ""
        # 替换钉钉不支持的markdown字符
        pattern = r'```[a-z]*\n|```'
        replaced_text = re.sub(pattern, "", response_text, flags=re.DOTALL)
        # 发送消息到钉钉群
        await self._send_webhook_message(message=message,
                                         text=replaced_text)
    except Exception as e:
        log.error("failed to chat with stream, conversation id: %s, msg id: %s err: %s" %
                  (conversation_id, msg_id, traceback.format_exc()))
        waiting_text = "小助手出小差了,请您稍后在重试......"
        await self._send_webhook_message(message=message,
                                         text=waiting_text)


发送消息的代码如下,这里采用了Markdown的消息格式:

async def _send_webhook_message(self,
                                message: Dict,
                                text: str):
    conversation_id = message.get("conversationId")
    msg_id = message.get("msgId")
    sender_nick = message.get("senderNick")
    session_webhook = message.get("sessionWebhook")
    at_users = message.get("atUsers")
    result = {
        "msgtype": "markdown",
        "at": {
            "isAtAll": "false",
            "atUserIds": at_users
        },
        "markdown": {
              "title": "百炼答疑机器人",
              "text": text
        }
    }
    headers = {
        "Content-Type": "application/json"
    }
    result_json = json.dumps(result, ensure_ascii=False)
    log.info("send result to ding webhook, conversation_id: %s, msg_id: %s, content: %s" %
             (conversation_id, msg_id, result_json))

    async with aiohttp.ClientSession() as session:
        async with session.post(session_webhook, data=result_json, headers=headers) as response:
            text = await response.text()
            # response = requests.post(session_webhook, data=json.dumps(result), headers=headers)
            if response.status == 200:
                log.info("send success, conversationId: %s, msgId: %s, text: %s" %
                         (conversation_id, msg_id, text))
                else:
                    log.error(
                        "send fail, conversationId: %s, msgId: %s, err: %s" %
                        (conversation_id, msg_id, text))


钉钉Webhook支持更多的消息格式,具体可以参考文档:

https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages


2.4 Indexing Service

Indexing service部分围绕数据的获取和导入展开。需要注意的是,目前Dashscope sdk不支持网页url作为数据来源,仅支持本地文档,因此本项目采用的方案是先将网页内容保存成pdf文件,然后使用Dashscope sdk将文件上传到数据中心。


2.4.1 数据抓取

注意:针对某一个网页的数据抓取,具体方案需根据待处理网页的特征而确定。同一个网页也可能在更新后发生结构上的变化,需要同步优化抓取方案。以下方案仅供参考。


经过调研,本项目采用开源软件playwright抓取数据。部分关键代码如下:


  • 递归展开目录中的所有标题
async def expand_ul(page, ul_element):
    li_elements = await ul_element.query_selector_all('li')
    for li in li_elements:
        li_id = await li.get_attribute('id')
        a_elements = await li.query_selector_all('a')
        for a in a_elements:
            i_elements = await a.query_selector_all('i')
            if i_elements:
                for i in i_elements:
                    await i.click()
                    await page.wait_for_timeout(50)
        child_ul_elements = await li.query_selector_all('ul')
        if child_ul_elements:
            for child_ul in child_ul_elements:
                await expand_ul(page, child_ul)


  • 提取ID、Url
a_elements = await root_ul_element.query_selector_all('a')
items: List[Metadata] = []
for a in a_elements:
    if await a.get_attribute('href'):
        href = await a.get_attribute('href')
        url = urljoin(page.url, href)
        id = await a.get_attribute('id')


  • 提取每个网页的更新时间,用于判断文档是否更新
async def get_update_time(page):
    update_time_element = await page.query_selector(".Header--updateTime--YXGPhcZ")
    if update_time_element:
        text = await update_time_element.text_content()
        time_matches = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', text)
        if time_matches:
            return time_matches.group(0)


  • 将网页保存成pdf文件
await page.goto(url)
await page.pdf(path='./local.pdf', format='A3')


2.4.2 数据导入

在导入数据前,如果不使用默认数据分类,则需要先在百炼平台上创建数据分类,参考:数据类目操作说明

数据分类创建后,在百炼控制台上可以看到:

image.png


然后,将保存的所有pdf批量上传到数据中心(可参考帮助文档 DashScopeParse),核心代码如下:

parse = DashScopeParse(result_type=ResultType.DASHSCOPE_DOCMIND,
                       category_id='数据分类名称',
                       verbose=False)
documents = await parse.aload_data(file_path=['pdf列表'])


数据导入后,在对应的数据分类下,可以看到已上传的文档:

image.png


2.4.3 知识库更新

接下来导入知识库,如果需要手动创建知识库,可参考帮助文档:创建知识库。或者,使用Dashscope sdk在导入知识库的同时自动创建知识库,参考文档:DashScopeCloudIndex。核心代码如下:

index = DashScopeCloudIndex.from_documents(
    documents=documents,
    name='知识库名称',
    verbose=True,
)


导入知识库后,在百炼控制台上可以查看已导入文档:

image.png


对于已经删除或者过期的文档,我们需要将其从知识库中剔除,使用delete_ref_doc接口:

index = DashScopeCloudIndex(name='知识库名称')
index.delete_ref_doc(ref_doc_ids=['待删除的doc id列表'])


其中,待删除的doc id从documents中获取,然后用数据库等方式进行维护。


3. 总结

本项目的操作流程总结如下:

  1. 开通百炼服务:开通阿里云百炼大模型服务产品
  2. 创建API_KEY:获取API-KEY
  3. 创建数据类目:数据类目操作说明
  4. 创建知识库:创建知识库
  5. 如果通过钉钉输出,配置钉钉机器人。
  6. 配置并运行问答机器人服务程序(包含知识库更新、大模型问答)。
  7. 通过Web或钉钉测试问答服务。


综上所述,本项目基于百炼平台的大模型服务、数据中心相关能力,较快地构建了以官方帮助文档为知识库的RAG问答机器人,支持Web和钉钉访问。

欢迎大家留言,一起交流探讨。

作者介绍
目录