阿里云搜索开发工作台面向企业及开发者提供先进的AI搜索开发平台,内置实践打磨的多模态数据解析、文档切分、文本向量、查询分析、大模型文本生成、效果测评等丰富的组件化服务以及开发模版,同时,可选多种引擎能力,用户可灵活调用,实现智能搜索、检索增强生成(RAG)、多模态搜索等搜索相关场景的搭建。
首次免费开通搜索开发工作台,每个账号可获赠100次服务免费调用额度 >>
针对知识库在线问答场景,搜索开发工作台提供完整的RAG开发链路搭建方法,整体链路包含数据预处理、检索服务以及问答总结生成三大模块。搜索开发工作台将各模块可用的算法服务组件化,可灵活选择各模块使用的服务,如文档解析、排序、问答总结服务等,快速生成开发代码。搜索开发工作台以API形式透出服务,开发者只需将代码下载到本地,根据本文操作步骤替换API_KEY、API调用地址、本地知识库等信息,即可快速搭建基于RAG开发链路的知识库问答应用。
1. 技术原理
检索增强生成RAG(Retrieval-Augmented Generation)结合了检索技术和生成技术的人工智能方法,旨在提升模型生成内容的相关性、准确性和多样性。处理生成任务时,RAG首先在大量外部数据或知识库中检索与输入最相关的片段,然后将检索到的信息与原始输入一起输入到大语言模型(LLM)中,作为提示或者上下文引导模型生成更精确和丰富的回答。这种方法允许模型在生成响应时不仅依赖于其内部的参数和训练数据,还可以使用外部最新或特定领域的信息提升回答准确性。
2. 应用场景
知识库在线问答常用于企业内部知识库检索与总结、垂直领域的在线问答等业务场景,基于客户的专业知识库文档,通过RAG(检索增强生成)技术和LLM(大语言模型),理解和响应复杂的自然语言查询,帮助企业客户通过自然语言快速从PDF、WORD、表格、图片文档中检索到所需信息。
3. 前提条件
- 开通阿里云搜索开发工作台服务,详情请参见开通服务。
- 获取服务调用地址和身份鉴权信息,详情请参见获取服务接入地址、获取API-KEY。
搜索开发工作台支持通过公网和VPC地址调用服务,且可通过VPC实现跨地域调用服务。目前支持上海、杭州、深圳、北京、张家口、青岛地域的用户,通过VPC地址调用搜索开发工作台的服务。
- 创建阿里云Elasticsearch(ES)实例,要求ES 8.5及以上版本,详情请参见创建阿里云Elasticsearch实例。通过公网或私网访问阿里云ES实例时,需要将待访问设备的IP地址加入实例的访问白名单中,详情请参见配置实例公网或私网访问白名单。
- Python版本3.7及以上,在开发环境中引入Python包依赖aiohttp 3.8.6、elasticsearch 8.14。
4. RAG开发链路搭建
说明
为方便用户使用,搜索开发工作台提供两种类型的开发框架:
- Python SDK。
- 如果业务已经使用LangChain开发框架,在开发框架中选择LangChain。
4.1 步骤一:完成服务选型和代码下载
根据知识库和业务需要,选择RAG链路中需要使用的算法服务以及开发框架,本文以Python SDK开发框架为例介绍如何搭建RAG链路。
- 登录搜索开发工作台控制台。
- 选择上海地域,切换到阿里云搜索开发工作台,切换到目标空间。
说明
- 目前仅支持在上海地域开通阿里云搜索开发工作台功能。
- 空间用于隔离和管理数据,首次开通搜索开发工作台服务后,系统自动创建一个default空间,支持创建空间。
- 在左侧导航栏选择场景中心,选择RAG场景-知识库在线问答右侧的进入。
- 根据服务信息结合业务特点,从下拉列表中选择所需服务,服务详情页面可查看服务详细信息。
说明
- 通过API调用RAG链路中的算法服务时,需要提供服务ID(service_id),如文档内容解析服务的ID为ops-document-analyze-001。
- 从服务列表中切换服务后,生成代码中的service_id会同步更新。当代码下载到本地环境后,您仍可以更改service_id,调用对应服务。
环节 |
服务说明 |
文档内容解析 |
文档内容解析服务(ops-document-analyze-001):提供通用文档解析服务,支持从非结构化文档(文本、表格、图片等)中提取标题、分段等逻辑层级结构,以结构化格式输出。 |
图片内容解析 |
|
文档切片 |
文档切片服务(ops-document-split-001):提供通用文本切片服务,支持基于文档段落、文本语义、指定规则,对HTML、Markdown、txt格式的结构化数据进行拆分,同时支持以富文本形式提取文档中的代码、图片以及表格。 |
文本向量化 |
|
文本稀疏向量化 |
提供将文本数据转化为稀疏向量形式表达的服务,稀疏向量存储空间更小,常用于表达关键词和词频信息,可与稠密向量搭配进行混合检索,提升检索效果。 OpenSearch文本稀疏向量化服务(ops-text-sparse-embedding-001):提供多语言(100+)文本向量化服务,输入文本最大长度8192。 |
查询分析 |
查询分析服务001(ops-query-analyze-001)提供通用的Query分析服务,可基于大语言模型对用户输入的Query进行意图理解,以及相似问题扩展。 |
搜索引擎 |
|
排序服务 |
BGE重排模型(ops-bge-reranker-larger):通用文档打分服务,支持根据query与文档内容的相关性,按分数由高到低对文档排序,并输出打分结果。 |
大模型 |
|
- 完成服务选型后,单击配置完成,进入代码查询查看和下载代码。
按照应用调用RAG链路时的运行流程,将代码分为离线文档处理和在线用户问答处理两部分:
流程 |
作用 |
说明 |
离线文档处理 |
负责文档处理,包含文档解析、图片提取、切片、向量化以及将文档处理结果写入ES索引。 |
使用主函数document_pipeline_execute完成以下流程,可通过文档URL或Base64编码输入待处理文档。
|
在线问答处理 |
负责处理用户在线查询,包含生成查询向量、查询分析、检索相关文档切片、排序检索结果以及根据检索结果生成回答。 |
使用主函数query_pipeline_execute完成以下流程,对用户查询进行处理并输出回答。
|
分别选择代码查询下的文档处理流程和在线问答流程,单击复制代码或者下载文件,将代码下载到本地。
4.2 步骤二:本地环境适配和测试RAG开发链路
将代码分别下载到本地的两个文件后,例如online.py和offline.py,需要配置代码中的关键参数。
类别 |
参数 |
说明 |
阿里云搜索开发工作台 |
api_key |
API调用密钥,获取方式请参见管理API Key。 |
aisearch_endpoint |
API调用地址,获取方式请参见获取服务接入地址。 说明 注意需要去掉“http://”。 支持通过公网和VPC两种方式调用API。 |
|
workspace_name |
搜索开发工作台中的空间名称。 |
|
service_id |
服务ID,为操作方便,可以分别在离线文档处理(offline.py)和在线问答处理代码(online.py)中,通过service_id_config配置各项服务以及ID。
|
|
ES搜索引擎 |
es_host |
Elasticsearch(ES)实例访问地址,通过公网或私网访问阿里云ES实例时,需要先将待访问设备的IP地址加入实例的访问白名单中,详情参见配置实例公网或私网访问白名单。 |
es_auth |
访问Elasticsearch实例时的账号和密码,账号为elastic,密码为您创建实例时设置的密码。如果忘记密码可重置,具体操作请参见重置实例访问密码。 |
|
其他参数 |
如使用示例数据则无需修改 |
完成参数配置后即可在Python 3.7及以上版本环境中,先后运行offline.py离线文档处理文件和online.py在线问答处理文件测试运行结果是否正确。
如知识库文档为搜索开发工作台介绍,对文档提问:搜索开发工作台可以做什么?
运行结果如下:
- 离线文档处理结果
- 在线问答处理结果
- offline.py文件:
# RAG离线链路-ElasticSearch引擎 # 环境需求: # Python版本:3.7及以上 # ES集群版本:8.9及以上:如果是阿里云ES需要提前开通并设置访问ip白名单 https://help.aliyun.com/zh/es/user-guide/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster # 包需求: # pip install alibabacloud_searchplat20240529 # pip install elasticsearch # OpenSearch搜索开发工作台配置 aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com" api_key = "OS-xxx" workspace_name = "default" service_id_config = {"extract": "ops-document-analyze-001", "split": "ops-document-split-001", "text_embedding": "ops-text-embedding-001", "text_sparse_embedding": "ops-text-sparse-embedding-001", "image_analyze": "ops-image-analyze-ocr-001"} # ES配置 es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200' es_auth = ('elastic', 'xxx') # 输入文档url,示例文档为opensearch产品说明文档 document_url = "https://help.aliyun.com/zh/open-search/search-platform/product-overview/introduction-to-search-platform?spm=a2c4g.11186623.0.0.7ab93526WDzQ8z" import asyncio from typing import List from elasticsearch import AsyncElasticsearch from elasticsearch import helpers from alibabacloud_tea_openapi.models import Config from alibabacloud_searchplat20240529.client import Client from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \ CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, \ GetDocumentSplitRequestDocument, GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, \ GetTextSparseEmbeddingRequest, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, \ CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskRequest, CreateImageAnalyzeTaskResponse, \ GetImageAnalyzeTaskStatusRequest, GetImageAnalyzeTaskStatusResponse async def poll_task_result(ops_client, task_id, service_id, interval=5): while True: request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id) response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request) status = response.body.result.status if status == "PENDING": await asyncio.sleep(interval) elif status == "SUCCESS": return response else: raise Exception("document analyze task failed") def is_analyzable_url(url:str): if not url: return False image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'} return url.lower().endswith(tuple(image_extensions)) async def image_analyze(ops_client, url): try: print("image analyze :" + url) if url.startswith("//"): url = "https:" + url if not is_analyzable_url(url): print(url + " is not analyzable.") return url image_analyze_service_id = service_id_config["image_analyze"] document = CreateImageAnalyzeTaskRequestDocument( url=url, ) request = CreateImageAnalyzeTaskRequest(document=document) response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request) task_id = response.body.result.task_id while True: request = GetImageAnalyzeTaskStatusRequest(task_id=task_id) response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request) status = response.body.result.status if status == "PENDING": await asyncio.sleep(5) elif status == "SUCCESS": return url + response.body.result.data.content else: print("image analyze error: " + response.body.result.error) return url except Exception as e: print(f"image analyze Exception : {e}") def chunk_list(lst, chunk_size): for i in range(0, len(lst), chunk_size): yield lst[i:i + chunk_size] async def write_to_es(doc_list): es = AsyncElasticsearch( [es_host], basic_auth=es_auth, verify_certs=False, # 不使用SSL证书校验 request_timeout=30, max_retries=10, retry_on_timeout=True ) index_name = 'dense_vertex_index' # 删除已有索引 if await es.indices.exists(index=index_name): await es.indices.delete(index=index_name) # 创建向量索引, 指定embedding字段为dense_vector, content字段为text, source字段为keyword index_mappings = { "mappings": { "properties": { "emb": { "type": "dense_vector", "index": True, "similarity": "cosine", "dims": 1536 # 根据embedding模型输出维度修改 }, "content": { "type": "text" }, "source_doc": { "type": "keyword" } } } } await es.indices.create(index=index_name, body=index_mappings) # 上传embedding结果到上一步创建好的索引 actions = [] for i, doc in enumerate(doc_list): action = { "_index": index_name, "_id": doc['id'], "_source": { "emb": doc['embedding'], "content": doc['content'], "source_doc": document_url } } actions.append(action) try: await helpers.async_bulk(es, actions) except Exception as e: for error in e.errors: print(error) # 确认上传成功 await asyncio.sleep(2) query = { "query": { "ids": { "values": [doc_list[0]["id"]] } } } res = await es.search(index=index_name, body=query) if len(res['hits']['hits']) > 0: print("ES write success") await es.close() async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None): # 生成opensearch开发平台client config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http") ops_client = Client(config=config) # Step 1: 文档解析 document_analyze_request = CreateDocumentAnalyzeTaskRequest( document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64, file_name=file_name, file_type='html')) document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name, service_id=service_id_config[ "extract"], request=document_analyze_request) print("document-analyze task_id:" + document_analyze_response.body.result.task_id) extraction_result = await poll_task_result(ops_client, document_analyze_response.body.result.task_id, service_id_config["extract"]) print("document-analyze done") document_content = extraction_result.body.result.data.content content_type = extraction_result.body.result.data.content_type # Step 2: 文档切片 document_split_request = GetDocumentSplitRequest( GetDocumentSplitRequestDocument(content=document_content, content_type=content_type)) document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"], document_split_request) print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks)) + " rich text count:" + str(len(document_split_result.body.result.rich_texts))) # Step 3: 文本向量化 # 提取切片结果。图片切片会通过图片解析服务提取出文本内容 doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.chunks] + [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"] + [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client, chunk.content)} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"] ) chunk_size = 32 # 一次最多允许计算32个embedding all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = [] for chunk in chunk_list([text["content"] for text in doc_list], chunk_size): response = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"], GetTextEmbeddingRequest(chunk)) all_text_embeddings.extend(response.body.result.embeddings) all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = [] for chunk in chunk_list([text["content"] for text in doc_list], chunk_size): response = await ops_client.get_text_sparse_embedding_async(workspace_name, service_id_config["text_sparse_embedding"], GetTextSparseEmbeddingRequest(chunk, input_type="document", return_token=True)) all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings) for i in range(len(doc_list)): doc_list[i]["embedding"] = all_text_embeddings[i].embedding doc_list[i]["sparse_embedding"] = all_text_sparse_embeddings[i].embedding print("text-embedding done") # Step 4: 写入ElasticSearch存储引擎 await write_to_es(doc_list) if __name__ == "__main__": # 运行异步任务 # import nest_asyncio # 如果在Jupyter notebook中运行,反注释这两行 # nest_asyncio.apply() # 如果在Jupyter notebook中运行,反注释这两行 asyncio.run(document_pipeline_execute(document_url)) # asyncio.run(document_pipeline_execute(document_base64="eHh4eHh4eHg...", file_name="attention.pdf")) #另外一种调用方式
- online.py文件:
# RAG在线链路-ElasticSearch引擎 # 环境需求: # Python版本:3.7及以上 # ES集群版本:8.9以上(如果是阿里云ES 需要提前开通并设置访问ip白名单 https://help.aliyun.com/zh/es/user-guide/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster) # 包需求: # pip install alibabacloud_searchplat20240529 # pip install elasticsearch # OpenSearch搜索开发工作台配置 api_key = "OS-xxx" aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com" workspace_name = "default" service_id_config = { "rank": "ops-bge-reranker-larger", "text_embedding": "ops-text-embedding-001", "text_sparse_embedding": "ops-text-sparse-embedding-001", "llm": "ops-qwen-turbo", "query_analyze": "ops-query-analyze-001" } # ES配置 es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200' es_auth = ('elastic', 'xxx') # 用户query: user_query = "OpenSearch搜索开发平台可以做什么?" import asyncio from elasticsearch import AsyncElasticsearch from alibabacloud_tea_openapi.models import Config from alibabacloud_searchplat20240529.client import Client from alibabacloud_searchplat20240529.models import GetTextEmbeddingRequest, \ GetDocumentRankRequest, GetTextGenerationRequest, GetTextGenerationRequestMessages, \ GetQueryAnalysisRequest # 生成opensearch开发平台client config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http") ops_client = Client(config=config) async def es_retrieve(query): es = AsyncElasticsearch( [es_host], basic_auth=es_auth, verify_certs=False, request_timeout=30, max_retries=10, retry_on_timeout=True ) index_name = 'dense_vertex_index' # query向量化 query_emb_result = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"], GetTextEmbeddingRequest(input=[query], input_type="query")) query_emb = query_emb_result.body.result.embeddings[0].embedding query = { "field": "emb", "query_vector": query_emb, "k": 5, # 返回文档切片数量 "num_candidates": 100 # HNSW搜索参数efsearch } res = await es.search(index=index_name, knn=query) search_results = [item['_source']['content'] for item in res['hits']['hits']] await es.close() return search_results # 在线问答流水线,输入是用户问题 async def query_pipeline_execute(): # Step 1: query分析 query_analyze_response = ops_client.get_query_analysis(workspace_name, service_id_config['query_analyze'], GetQueryAnalysisRequest(query=user_query)) print("query analysis rewrite result:" + query_analyze_response.body.result.query) # Step 2: 召回文档 all_query_results = [] user_query_results = await es_retrieve(user_query) all_query_results.extend(user_query_results) rewrite_query_results = await es_retrieve(query_analyze_response.body.result.query) all_query_results.extend(rewrite_query_results) for extend_query in query_analyze_response.body.result.queries: extend_query_result = await es_retrieve(extend_query) all_query_results.extend(extend_query_result) # 对所有召回结果进行去重 remove_duplicate_results = list(set(all_query_results)) # Step 3: 对召回文档进行重排序 rerank_top_k = 8 score_results = await ops_client.get_document_rank_async(workspace_name, service_id_config["rank"],GetDocumentRankRequest(remove_duplicate_results, user_query)) rerank_results = [remove_duplicate_results[item.index] for item in score_results.body.result.scores[:rerank_top_k]] # Step 4: 调用大模型回答问题 docs = '\n'.join([f"<article>{s}</article>" for s in rerank_results]) messages = [ GetTextGenerationRequestMessages(role="system", content="You are a helpful assistant."), GetTextGenerationRequestMessages(role="user", content=f"""已知信息包含多个独立文档,每个文档在<article>和</article>之间,已知信息如下:\n'''{docs}''' \n\n根据上述已知信息,详细且有条理地回答用户的问题。确保答案充分回答了问题并且正确使用了已知信息。如果信息不足以回答问题,请说“根据已知信息无法回答该问题”。 不要使用不在已知信息中的内容生成答案,确保答案中每一个陈述在上述已知信息中有相应内容支撑。答案请使用中文。 \n问题是:'''{user_query}'''""""") ] response = await ops_client.get_text_generation_async(workspace_name, service_id_config["llm"], GetTextGenerationRequest(messages=messages)) print("大模型最终回答: ", response.body.result.text) if __name__ == "__main__": # import nest_asyncio # 如果在Jupyter notebook中运行,反注释这两行 # nest_asyncio.apply() # 如果在Jupyter notebook中运行,反注释这两行 asyncio.run(query_pipeline_execute())
4.3 常见问题
代码运行期间,由于资源未及时释放可能出现Unclosed connector相关提示,无需处理。