基于 EventBridge + DashVector 打造 RAG 全链路动态语义检索能力

简介: 本文将演示如何使用事件总线(EventBridge),向量检索服务(DashVector),函数计算(FunctionCompute)结合灵积模型服务[1]上的 Embedding API[2],来从 0 到 1 构建基于文本索引的构建+向量检索基础上的语义搜索能力。具体来说,我们将基于 OSS 文本文档动态插入数据,进行实时的文本语义搜索,查询最相似的相关内容。

本文将演示如何使用事件总线(EventBridge),向量检索服务(DashVector),函数计算(FunctionCompute)结合灵积模型服务[1]上的 Embedding API[2],来从 0 到 1 构建基于文本索引的构建+向量检索基础上的语义搜索能力。具体来说,我们将基于 OSS 文本文档动态插入数据,进行实时的文本语义搜索,查询最相似的相关内容。


本文中将用到事件总线(EventBridge),它是阿里云提供的一款无服务器事件总线服务,支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。


1. RAG 背景概述


大语言模型(LLM)作为自然语言处理领域的核心技术,具有丰富的自然语言处理能力。但其训练语料库具有一定的局限性,一般由普适知识、常识性知识,如维基百科、新闻、小说,和各种领域的专业知识组成。导致 LLM 在处理特定领域的知识表示和应用时存在一定的局限性,特别对于垂直领域内,或者企业内部等私域专属知识。


实现专属领域的知识问答的关键,在于如何让 LLM 能够理解并获取存在于其训练知识范围外的特定领域知识。同时可以通过特定 Prompt 构造,提示 LLM 在回答特定领域问题的时候,理解意图并根据注入的领域知识来做出回答。在通常情况下,用户的提问是完整的句子,而不像搜索引擎只输入几个关键字。这种情况下,直接使用关键字与企业知识库进行匹配的效果往往不太理想,同时长句本身还涉及分词、权重等处理。相比之下,倘若我们把提问的文本,和知识库的内容,都先转化为高质量向量,再通过向量检索将匹配过程转化为语义搜索,那么提取相关知识点就会变得简单而高效。


本文将介绍如何构建一个完全动态的 RAG 入库方案,通过 EventBridge 拉取 OSS 非结构化数据,同时将数据投递至 DashVector 向量数据库,从而实现完整的 RAG Ingestion 流程。


2. 流程概述


2.1 数据集成(Ingestion)

image.png

2.2 数据检索(Search)

image.png

3. 操作流程


3.1 前提条件

  • DashScope 控制台开通灵积模型服务,并获得 API-KEY 的获取与配置。
  • 开通 DashVector 向量检索服务,并获得 API-KEY 的获取与配置。
  • 开通 OSS 服务。
  • 开通 FC 服务。
  • 开通 EventBridge 服务。
开通灵积模型服务

1. 点击进入 DashScope 控制台[3],开通灵积相关服务

image.png

2. 点击进入“API-KEY”管理,获取相关 KEY 信息

image.png

开通 DashVector 服务

1. 若第一次配置,请点击“新建 DashVector Cluster[4]”,跳转创建新的 Cluster;点击“创建免费 Cluster”快速体验创建向量数据库

image.png

2. 选择“立即购买”

image.png

3. 点击创建“Collection”

image.png

4. 填写纬度为“1536”,距离度量方式“Cosine”,点击“确认”

image.png

5. 点击“API-KEY 管理”,获取 DashVector 的 API KEY

image.png

3.2 创建 Ingestion 数据集成任务

进入 EventBridge 控制台[5]

image.png

配置 OSS 源
  • OSS Bucket:选择空白存储桶实验,若无请自行创建;
  • OSS 前缀:该项可根据诉求填写,若无前缀 EB 将拉取整个 Bucket 内容;本次演示不配置;
  • 文档加载:目前支持解析 TextLoder 作为文档加载器;
  • 加载模式:“单文档加载”单个文件作为一条数据加载,“分块加载”按照分隔符加载数据;本次演示使用单文档加载。

image.png

配置过滤

可根据诉求添加过滤规则,本次演示使用“匹配全部事件”。

image.png

配置转换

转换部分主要是将原始数据转成向量化数据,为投递至 DashVector 做数据准备。

image.png

函数代码如下,函数环境为 Python 3.10:


# -*- coding: utf-8 -*-
import os
import ast
import copy
import json
import logging
import dashscope
from dashscope import TextEmbedding
from http import HTTPStatus

logger = logging.getLogger()
logger.setLevel(level=logging.INFO)

dashscope.api_key='Your-API-KEY'

def handler(event, context):
  evt = json.loads(event)
  evtinput = evt['data']
  resp = dashscope.TextEmbedding.call(
        model=dashscope.TextEmbedding.Models.text_embedding_v1,
        api_key=os.getenv('DASHSCOPE_API_KEY'), 
        input= evtinput )
  if resp.status_code == HTTPStatus.OK:
        print(resp)
  else:
        print(resp)
  return resp


🔔 注意:需手动安装相关函数环境,相关文档参考《为函数安装第三方依赖》[6]


pip3 install dashvector dashscope -t .


返回样例:


{
    "code": "",
    "message": "",
    "output": {
        "embeddings": [
            {
                "embedding": [
                    -2.192838430404663,
                    -0.703125,
                    ... ...
                    -0.8980143070220947,
                    -0.9130208492279053,
                    -0.520526111125946,
                    -0.47154948115348816
                ],
                "text_index": 0
            }
        ]
    },
    "request_id": "e9f9a555-85f2-9d15-ada8-133af54352b8",
    "status_code": 200,
    "usage": {
        "total_tokens": 3
    }
}


配置向量数据库 Dashvector

选择创建好的向量数据库。

image.png

  • 数据映射:选择 Upsert 方式插入;
  • 向量:填写上游 Dashscope 的 TextEmbedding 投递的向量信息 $.output. embeddings[0].embedding;
  • 鉴权配置:获取的 DashVector API-KEY 参数。

image.png

3.3 创建 Search 数据检索任务

在进行数据检索时,需要首先对数据进行 embedding,然后将 embedding 后的向量值与数据库值做检索排序。最后填写 prompt 模版,通过自然语言理解和语义分析,理解数据检索意图。


该任务可以部署在云端函数计算,或者直接在本地环境执行;首先,我们创建 embedding.py,将需要检索的问题进行文本向量化,代码如下所示:


embedding.py


import os
import dashscope
from dashscope import TextEmbedding

def generate_embeddings(news):
    rsp = TextEmbedding.call(
        model=TextEmbedding.Models.text_embedding_v1,
        input=news
    )
    embeddings = [record['embedding'] for record in rsp.output['embeddings']]
    return embeddings if isinstance(news, list) else embeddings[0]


if __name__ == '__main__':
    dashscope.api_key = '{your-dashscope-api-key}'


然后,创建 search.py 文件,并将如下示例代码复制到 search.py 文件中,通过 DashVector 的向量检索能力来检索相似度的最高的内容。


search.py


from dashvector import Client

from embedding import generate_embeddings


def search_relevant_news(question):
    # 初始化 dashvector client
    client = Client(
      api_key='{your-dashvector-api-key}',
      endpoint='{your-dashvector-cluster-endpoint}'
    )

    # 获取存入的集合
    collection = client.get('news_embedings')
    assert collection

    # 向量检索:指定 topk = 1 
    rsp = collection.query(generate_embeddings(question), output_fields=['raw'],
                           topk=1)
    assert rsp
    return rsp.output[0].fields['raw']


创建 answer.py 文件,我们就可以按照特定的模板作为 prompt 向 LLM 发起提问了,在这里我们选用的 LLM 是通义千问(qwen-turbo),代码示例如下:


answer.py


from dashscope import Generation


def answer_question(question, context):
    prompt = f'''请基于```内的内容回答问题。"
  ```
  {context}
  ```
  我的问题是:{question}。
    '''

    rsp = Generation.call(model='qwen-turbo', prompt=prompt)
    return rsp.output.text


最后,创建 run.py 文件,并将如下示例代码复制到 run.py 文件中,并最终执行 run.py 文件。(验证时,可在绑定的 OSS Bucket 上传需要被检索的知识库信息。)


import dashscope

from search import search_relevant_news
from answer import answer_question

if __name__ == '__main__':
    dashscope.api_key = '{your-dashscope-api-key}'

    question = 'EventBridge 是什么,它有哪些能力?'
    context = search_relevant_news(question)
    answer = answer_question(question, context)

    print(f'question: {question}\n' f'answer: {answer}')


4. 总结


从本文的范例中,我们可以比较方便的使用 EventBridge 提供的 OSS To DashVector 离线数据流导入能力,开箱即用的构建强大向量检索服务能力,这些能力和各个 AI 模型结合,能够衍生多样的 AI 应用的可能。同样,Transform 部分使用了函数计算能力,可以更灵活的制定想要的 Split 切分算法,提供更灵活且具备生产力的 RAG 方案。


相关链接:

[1] 灵积模型服务

[2] Embedding API

[3] DashScope 控制台

[4] 新建 DashVector Cluster

[5] EventBridge 控制台

[6] 《为函数安装第三方依赖》


作者:肯梦

作者介绍
目录