当需要同时检索多条查询(如批量问答、RAG 多路召回、多用户并发搜索),逐条串行执行会导致整体耗时随查询数线性增长。通过并发执行多条检索请求,可以将总耗时从 N × 单次延迟 降低到接近 1 × 单次延迟,显著提升吞吐量。
本文介绍两种并发方式:CLI 并发和 SDK 并发,适用于以下场景:
- 批量语义搜索:一次性提交多条查询文本,快速获取全部检索结果。
- RAG 多路召回:为同一用户请求同时发起多条不同角度的检索,降低端到端延迟。
- 多模态批量检索:同时检索文本、图片、视频等不同模态的向量数据。
选择并发方式
方式 |
适用场景 |
特点 |
CLI 并发 |
运维脚本、一次性批量检索、无需编写代码的快速验证 |
输入文本自动 Embedding;无需管理向量维度;Shell 脚本即可实现 |
SDK 并发 |
业务服务集成、需要精细控制(过滤条件、结果后处理)、高性能后端 |
直接调用 API,可设置过滤条件;复用客户端连接;支持 Python 和 Go |
如何选择:
- 如果你有一批查询文本,希望快速拿到结果而不想写代码 → 使用 CLI 并发。
- 如果你在开发业务服务,需要将向量检索嵌入到应用逻辑中 → 使用 SDK 并发。
说明:CLI 方式内置了 Embedding 模型调用,输入文本即可检索。SDK 方式需要传入已生成的向量,适合已有 Embedding 流程的场景。
通过 CLI 并发检索
CLI 并发通过启动多个 oss-vectors-embed query 进程实现并行检索。以下提供三种实现方式,按复杂度递增排列。
开始前,请确保满足以下条件:
- 已安装 OSS Vectors Embed CLI。安装方式请参见使用OSS Vectors Embed CLI工具写入和检索向量数据。
- 已配置环境变量
OSS_ACCESS_KEY_ID、OSS_ACCESS_KEY_SECRET和DASHSCOPE_API_KEY。 - 已创建向量 Bucket 和向量索引,且索引维度与所用 Embedding 模型输出维度一致。
将以下示例中的占位符替换为实际值:
占位符 |
说明 |
|
阿里云账号 ID |
|
向量 Bucket 名称 |
|
向量索引名称 |
xargs 快速并发
如果不需要复杂的流程控制,xargs -P 是最简单的 CLI 并发方式,一行命令即可完成。
cat queries.txt | xargs -P 5 -I {} \ oss-vectors-embed \ --account-id "<your-account-id>" \ --vectors-region cn-hangzhou \ query \ --vector-bucket-name "<your-vector-bucket>" \ --index-name "<your-index>" \ --model-id text-embedding-v4 \ --text-value "{}" \ --top-k 10
-P 5 表示最多 5 个进程并行执行。检索结果直接输出到终端,适合快速验证。如需保存结果,可将输出重定向到文件。
Shell 后台并发
通过 & 将多个查询命令放入后台并行执行,wait 等待全部完成。适合查询数量较少(10 条以内)的场景。
#!/bin/bash ACCOUNT_ID="<your-account-id>" REGION="cn-hangzhou" BUCKET="<your-vector-bucket>" INDEX="<your-index>" MODEL="text-embedding-v4" queries=( "如何配置生命周期规则" "对象存储有哪些存储类型" "如何设置跨区域复制" ) mkdir -p ./query-results # 并发启动所有查询,每条结果写入独立文件 for i in "${!queries[@]}"; do oss-vectors-embed \ --account-id "$ACCOUNT_ID" \ --vectors-region "$REGION" \ query \ --vector-bucket-name "$BUCKET" \ --index-name "$INDEX" \ --model-id "$MODEL" \ --text-value "${queries[$i]}" \ --top-k 10 \ --return-metadata \ > "./query-results/result_${i}.json" 2>&1 & done wait echo "全部查询完成,结果保存在 ./query-results/"
运行后输出:
全部查询完成,结果保存在 ./query-results/
每个结果文件包含 JSON 格式的检索结果,可通过 cat ./query-results/result_0.json | python3 -m json.tool 查看。
控制并发数的 Shell 脚本
当查询数量较多时(数十条以上),需要限制同时运行的进程数,避免超出 API 配额。以下脚本从文件逐行读取查询文本,控制最多 5 个进程同时执行。
#!/bin/bash ACCOUNT_ID="<your-account-id>" REGION="cn-hangzhou" BUCKET="<your-vector-bucket>" INDEX="<your-index>" MODEL="text-embedding-v4" MAX_CONCURRENT=5 QUERY_FILE="./queries.txt" # 每行一条查询文本 mkdir -p ./query-results run_query() { local idx=$1 local text=$2 oss-vectors-embed \ --account-id "$ACCOUNT_ID" \ --vectors-region "$REGION" \ query \ --vector-bucket-name "$BUCKET" \ --index-name "$INDEX" \ --model-id "$MODEL" \ --text-value "$text" \ --top-k 10 \ > "./query-results/result_${idx}.json" 2>&1 } idx=0 while IFS= read -r query_text; do run_query "$idx" "$query_text" & idx=$((idx + 1)) # 达到并发上限时等待一个任务完成再继续 if (( $(jobs -rp | wc -l) >= MAX_CONCURRENT )); then wait -n fi done < "$QUERY_FILE" wait echo "全部 $idx 条查询完成"
运行前准备 queries.txt 文件,每行一条查询文本:
如何配置生命周期规则 对象存储有哪些存储类型 如何设置跨区域复制 Bucket 标签的使用限制 如何启用版本控制
运行后输出:
全部 5 条查询完成
Python 封装 CLI 并发
如果需要对 CLI 返回的结果进行后处理(如解析 JSON、汇总统计),可以用 Python asyncio 封装 CLI 调用。
import asyncio import json from pathlib import Path ACCOUNT_ID = "<your-account-id>" REGION = "cn-hangzhou" BUCKET = "<your-vector-bucket>" INDEX = "<your-index>" MODEL = "text-embedding-v4" MAX_CONCURRENT = 5 async def run_query(semaphore: asyncio.Semaphore, query_text: str, query_id: int): """异步执行单条 CLI query 命令""" async with semaphore: cmd = [ "oss-vectors-embed", "--account-id", ACCOUNT_ID, "--vectors-region", REGION, "query", "--vector-bucket-name", BUCKET, "--index-name", INDEX, "--model-id", MODEL, "--text-value", query_text, "--top-k", "10", "--return-metadata", ] proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode == 0: result = json.loads(stdout.decode()) print(f"查询 {query_id} 完成,返回 {len(result.get('results', [ ]))} 条结果") return {"query_id": query_id, "query_text": query_text, "result": result} else: print(f"查询 {query_id} 失败: {stderr.decode()}") return {"query_id": query_id, "query_text": query_text, "error": stderr.decode()} async def batch_query(queries: list[str]): """批量并发执行多条查询""" semaphore = asyncio.Semaphore(MAX_CONCURRENT) tasks = [ run_query(semaphore, text, idx) for idx, text in enumerate(queries) ] results = await asyncio.gather(*tasks) output_path = Path("./query-results/batch_results.json") output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2)) print(f"汇总结果已保存到 {output_path}") return results if __name__ == "__main__": queries = [ "如何配置生命周期规则", "对象存储有哪些存储类型", "如何设置跨区域复制", "Bucket 标签的使用限制", "如何启用版本控制", ] asyncio.run(batch_query(queries))
运行后输出:
查询 0 完成,返回 10 条结果 查询 1 完成,返回 10 条结果 查询 2 完成,返回 10 条结果 查询 3 完成,返回 10 条结果 查询 4 完成,返回 10 条结果 汇总结果已保存到 query-results/batch_results.json
通过 SDK 并发检索
SDK 并发直接调用 query_vectors API,无需启动外部进程。适合需要精细控制检索参数(如设置过滤条件)或将检索集成到业务服务中的场景。
说明:SDK 方式需要传入已生成的查询向量(如 float32 数组),而非原始文本。如果你的场景是从文本出发检索,建议先通过 Embedding 模型生成向量,或直接使用上文的 CLI 并发方式。
Python SDK 并发检索
使用 alibabacloud-oss-v2 Python SDK 通过线程池并发调用 query_vectors 接口。开始前请安装 SDK:
pip install alibabacloud-oss-v2
确保已配置环境变量 OSS_ACCESS_KEY_ID 和 OSS_ACCESS_KEY_SECRET,并已创建向量 Bucket 和索引。
基本示例
以下示例并发检索 5 组向量,使用 ThreadPoolExecutor 控制最多 5 个线程同时执行:
import json from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import alibabacloud_oss_v2 as oss import alibabacloud_oss_v2.vectors as oss_vectors ACCOUNT_ID = "<your-account-id>" REGION = "cn-hangzhou" BUCKET = "<your-vector-bucket>" INDEX = "<your-index>" MAX_CONCURRENT = 5 def create_vector_client(): """创建向量检索客户端(全局复用,避免重复创建连接)""" credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider() cfg = oss.config.load_default() cfg.credentials_provider = credentials_provider cfg.region = REGION cfg.account_id = ACCOUNT_ID return oss_vectors.Client(cfg) def run_query(client, query_vector, query_id, query_filter=None): """执行单条向量检索""" request = oss_vectors.models.QueryVectorsRequest( bucket=BUCKET, index_name=INDEX, query_vector=query_vector, filter=query_filter, return_distance=True, return_metadata=True, top_k=10, ) result = client.query_vectors(request) print(f"查询 {query_id} 完成,status code: {result.status_code}") return { "query_id": query_id, "status_code": result.status_code, "vectors": [str(v) for v in result.vectors] if result.vectors else [ ], } def batch_query(query_vectors): """批量并发执行多条向量检索""" client = create_vector_client() results = [ ] with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor: futures = { executor.submit(run_query, client, qv, idx): idx for idx, qv in enumerate(query_vectors) } for future in as_completed(futures): idx = futures[future] try: results.append(future.result()) except Exception as e: print(f"查询 {idx} 失败: {e}") results.append({"query_id": idx, "error": str(e)}) results.sort(key=lambda x: x["query_id"]) output_path = Path("./query-results/sdk_batch_results.json") output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2)) print(f"汇总结果已保存到 {output_path}") return results if __name__ == "__main__": # 示例:5 组查询向量(维度需与索引一致,此处以 128 维为例) query_vectors = [ {"float32": [0.1] * 128}, {"float32": [0.2] * 128}, {"float32": [0.3] * 128}, {"float32": [0.4] * 128}, {"float32": [0.5] * 128}, ] batch_query(query_vectors)
运行后输出:
查询 0 完成,status code: 200 查询 2 完成,status code: 200 查询 1 完成,status code: 200 查询 4 完成,status code: 200 查询 3 完成,status code: 200 汇总结果已保存到 query-results/sdk_batch_results.json
说明:由于线程池并发执行,输出顺序可能与提交顺序不同,但最终结果按 query_id 排序保存。
带过滤条件的并发检索
实际业务中,不同查询可能需要搭配不同的过滤条件。以下示例为每条查询指定独立的 filter:
import json from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import alibabacloud_oss_v2 as oss import alibabacloud_oss_v2.vectors as oss_vectors ACCOUNT_ID = "<your-account-id>" REGION = "cn-hangzhou" BUCKET = "<your-vector-bucket>" INDEX = "<your-index>" MAX_CONCURRENT = 5 def create_vector_client(): """创建向量检索客户端(全局复用,避免重复创建连接)""" credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider() cfg = oss.config.load_default() cfg.credentials_provider = credentials_provider cfg.region = REGION cfg.account_id = ACCOUNT_ID return oss_vectors.Client(cfg) def run_query(client, query_vector, query_id, query_filter=None): """执行单条向量检索""" request = oss_vectors.models.QueryVectorsRequest( bucket=BUCKET, index_name=INDEX, query_vector=query_vector, filter=query_filter, return_distance=True, return_metadata=True, top_k=10, ) result = client.query_vectors(request) print(f"查询 {query_id} 完成,status code: {result.status_code}") return { "query_id": query_id, "status_code": result.status_code, "vectors": [str(v) for v in result.vectors] if result.vectors else [ ], } def batch_query(query_vectors): """批量并发执行多条向量检索""" client = create_vector_client() results = [ ] with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor: futures = { executor.submit(run_query, client, qv, idx): idx for idx, qv in enumerate(query_vectors) } for future in as_completed(futures): idx = futures[future] try: results.append(future.result()) except Exception as e: print(f"查询 {idx} 失败: {e}") results.append({"query_id": idx, "error": str(e)}) results.sort(key=lambda x: x["query_id"]) output_path = Path("./query-results/sdk_batch_results.json") output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2)) print(f"汇总结果已保存到 {output_path}") return results if __name__ == "__main__": tasks = [ { "vector": {"float32": [0.1] * 128}, "filter": {"$and": [{"type": {"$in": ["tutorial"]}}]}, }, { "vector": {"float32": [0.2] * 128}, "filter": {"$and": [{"type": {"$nin": ["comedy", "documentary"]}}]}, }, { "vector": {"float32": [0.3] * 128}, "filter": None, # 不设过滤条件 }, ] client = create_vector_client() results = [ ] with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor: futures = { executor.submit( run_query, client, t["vector"], idx, t["filter"] ): idx for idx, t in enumerate(tasks) } for future in as_completed(futures): try: results.append(future.result()) except Exception as e: print(f"查询失败: {e}") for r in sorted(results, key=lambda x: x["query_id"]): print(f"查询 {r['query_id']}: 返回 {len(r.get('vectors', [ ]))} 条结果")
运行后输出:
查询 0 完成,status code: 200 查询 1 完成,status code: 200 查询 2 完成,status code: 200 查询 0: 返回 10 条结果 查询 1: 返回 10 条结果 查询 2: 返回 10 条结果
Go SDK 并发检索
使用 alibabacloud-oss-go-sdk-v2 Go SDK 通过 goroutine 并发调用 QueryVectors 接口。开始前请安装 SDK:
go get github.com/aliyun/alibabacloud-oss-go-sdk-v2
确保已配置环境变量 OSS_ACCESS_KEY_ID 和 OSS_ACCESS_KEY_SECRET,并已创建向量 Bucket 和索引。
基本示例
以下示例使用 sync.WaitGroup 和 channel 信号量并发检索 5 组向量:
package main import ( "context" "fmt" "log" "sync" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors" ) const ( region = "cn-hangzhou" bucketName = "<your-vector-bucket>" accountId = "<your-account-id>" indexName = "<your-index>" maxConcurrent = 5 ) func main() { cfg := oss.LoadDefaultConfig(). WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()). WithRegion(region). WithAccountId(accountId) client := vectors.NewVectorsClient(cfg) // 5 组查询向量(维度需与索引一致) queryVectors := [ ]map[string]any{ {"float32": [ ]float32{0.1}}, {"float32": [ ]float32{0.2}}, {"float32": [ ]float32{0.3}}, {"float32": [ ]float32{0.4}}, {"float32": [ ]float32{0.5}}, } var wg sync.WaitGroup sem := make(chan struct{}, maxConcurrent) // channel 信号量控制并发数 for i, qv := range queryVectors { wg.Add(1) sem <- struct{}{} // 获取信号量,达到上限时阻塞 go func(idx int, queryVector map[string]any) { defer wg.Done() defer func() { <-sem }() // 释放信号量 request := &vectors.QueryVectorsRequest{ Bucket: oss.Ptr(bucketName), IndexName: oss.Ptr(indexName), QueryVector: queryVector, ReturnMetadata: oss.Ptr(true), ReturnDistance: oss.Ptr(true), TopK: oss.Ptr(10), } result, err := client.QueryVectors(context.TODO(), request) if err != nil { log.Printf("查询 %d 失败: %v", idx, err) return } fmt.Printf("查询 %d 完成,status code: %d\n", idx, result.StatusCode) }(i, qv) } wg.Wait() fmt.Println("全部查询完成") }
运行后输出:
查询 0 完成,status code: 200 查询 2 完成,status code: 200 查询 1 完成,status code: 200 查询 3 完成,status code: 200 查询 4 完成,status code: 200 全部查询完成
带过滤条件的并发检索
package main import ( "context" "fmt" "log" "sync" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors" ) type queryTask struct { vector map[string]any filter map[string]any } const ( region = "cn-hangzhou" bucketName = "<your-vector-bucket>" accountId = "<your-account-id>" indexName = "<your-index>" maxConcurrent = 5 ) func main() { cfg := oss.LoadDefaultConfig(). WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()). WithRegion(region). WithAccountId(accountId) client := vectors.NewVectorsClient(cfg) tasks := [ ]queryTask{ { vector: map[string]any{"float32": [ ]float32{0.1}}, filter: map[string]any{ "$and": [ ]map[string]any{ {"type": map[string]any{"$in": [ ]string{"tutorial"}}}, }, }, }, { vector: map[string]any{"float32": [ ]float32{0.2}}, filter: map[string]any{ "$and": [ ]map[string]any{ {"type": map[string]any{"$nin": [ ]string{"comedy", "documentary"}}}, }, }, }, { vector: map[string]any{"float32": [ ]float32{0.3}}, filter: nil, // 不设过滤条件 }, } var wg sync.WaitGroup sem := make(chan struct{}, maxConcurrent) for i, task := range tasks { wg.Add(1) sem <- struct{}{} go func(idx int, t queryTask) { defer wg.Done() defer func() { <-sem }() request := &vectors.QueryVectorsRequest{ Bucket: oss.Ptr(bucketName), IndexName: oss.Ptr(indexName), QueryVector: t.vector, ReturnMetadata: oss.Ptr(true), ReturnDistance: oss.Ptr(true), TopK: oss.Ptr(10), } if t.filter != nil { request.Filter = t.filter } result, err := client.QueryVectors(context.TODO(), request) if err != nil { log.Printf("查询 %d 失败: %v", idx, err) return } fmt.Printf("查询 %d 完成,status code: %d\n", idx, result.StatusCode) }(i, task) } wg.Wait() fmt.Println("全部查询完成") }
运行后输出:
查询 0 完成,status code: 200 查询 1 完成,status code: 200 查询 2 完成,status code: 200 全部查询完成
并发性能调优
调优项 |
建议 |
说明 |
并发数 |
3~5 |
|
|
按需设置 |
返回结果越多,单次请求延迟越高。仅返回业务所需的数量 |
错误重试 |
间隔 1~2 秒 |
并发请求可能触发限流(HTTP 429),建议捕获错误后等待重试 |
CLI 结果输出 |
重定向到文件 |
多个进程同时输出到终端会导致内容交错,建议将每条结果写入独立文件 |
SDK 客户端复用 |
复用同一实例 |
避免为每条查询创建新的 |