高并发场景下,如何让你的向量语义检索快人一步?

本文涉及的产品
对象存储 OSS,OSS 加速器 50 GB 1个月
简介: 当需要同时检索多条查询(如批量问答、RAG 多路召回、多用户并发搜索),逐条串行执行会导致整体耗时随查询数线性增长。通过并发执行多条检索请求,可以将总耗时从 N × 单次延迟 降低到接近 1 × 单次延迟,显著提升吞吐量。本文介绍两种并发方式:CLI 并发和 SDK 并发,适用于批量语义搜索、 RAG 多路召回、多模态批量检索等场景。

当需要同时检索多条查询(如批量问答、RAG 多路召回、多用户并发搜索),逐条串行执行会导致整体耗时随查询数线性增长。通过并发执行多条检索请求,可以将总耗时从 N × 单次延迟 降低到接近 1 × 单次延迟,显著提升吞吐量。

本文介绍两种并发方式:CLI 并发SDK 并发,适用于以下场景:

  • 批量语义搜索:一次性提交多条查询文本,快速获取全部检索结果。
  • RAG 多路召回:为同一用户请求同时发起多条不同角度的检索,降低端到端延迟。
  • 多模态批量检索:同时检索文本、图片、视频等不同模态的向量数据。


选择并发方式

方式

适用场景

特点

CLI 并发

运维脚本、一次性批量检索、无需编写代码的快速验证

输入文本自动 Embedding;无需管理向量维度;Shell 脚本即可实现

SDK 并发

业务服务集成、需要精细控制(过滤条件、结果后处理)、高性能后端

直接调用 API,可设置过滤条件;复用客户端连接;支持 Python 和 Go

如何选择

  • 如果你有一批查询文本,希望快速拿到结果而不想写代码 → 使用 CLI 并发
  • 如果你在开发业务服务,需要将向量检索嵌入到应用逻辑中 → 使用 SDK 并发

说明:CLI 方式内置了 Embedding 模型调用,输入文本即可检索。SDK 方式需要传入已生成的向量,适合已有 Embedding 流程的场景。


通过 CLI 并发检索

CLI 并发通过启动多个 oss-vectors-embed query 进程实现并行检索。以下提供三种实现方式,按复杂度递增排列。

开始前,请确保满足以下条件:

  • 已安装 OSS Vectors Embed CLI。安装方式请参见使用OSS Vectors Embed CLI工具写入和检索向量数据
  • 已配置环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRETDASHSCOPE_API_KEY
  • 已创建向量 Bucket 和向量索引,且索引维度与所用 Embedding 模型输出维度一致。

将以下示例中的占位符替换为实际值:

占位符

说明

<your-account-id>

阿里云账号 ID

<your-vector-bucket>

向量 Bucket 名称

<your-index>

向量索引名称

xargs 快速并发

如果不需要复杂的流程控制,xargs -P 是最简单的 CLI 并发方式,一行命令即可完成。

cat queries.txt | xargs -P 5 -I {} \
  oss-vectors-embed \
    --account-id "<your-account-id>" \
    --vectors-region cn-hangzhou \
    query \
    --vector-bucket-name "<your-vector-bucket>" \
    --index-name "<your-index>" \
    --model-id text-embedding-v4 \
    --text-value "{}" \
    --top-k 10

-P 5 表示最多 5 个进程并行执行。检索结果直接输出到终端,适合快速验证。如需保存结果,可将输出重定向到文件。

Shell 后台并发

通过 & 将多个查询命令放入后台并行执行,wait 等待全部完成。适合查询数量较少(10 条以内)的场景。

#!/bin/bash
ACCOUNT_ID="<your-account-id>"
REGION="cn-hangzhou"
BUCKET="<your-vector-bucket>"
INDEX="<your-index>"
MODEL="text-embedding-v4"
queries=(
  "如何配置生命周期规则"
  "对象存储有哪些存储类型"
  "如何设置跨区域复制"
)
mkdir -p ./query-results
# 并发启动所有查询,每条结果写入独立文件
for i in "${!queries[@]}"; do
  oss-vectors-embed \
    --account-id "$ACCOUNT_ID" \
    --vectors-region "$REGION" \
    query \
    --vector-bucket-name "$BUCKET" \
    --index-name "$INDEX" \
    --model-id "$MODEL" \
    --text-value "${queries[$i]}" \
    --top-k 10 \
    --return-metadata \
    > "./query-results/result_${i}.json" 2>&1 &
done
wait
echo "全部查询完成,结果保存在 ./query-results/"

运行后输出:

全部查询完成,结果保存在 ./query-results/

每个结果文件包含 JSON 格式的检索结果,可通过 cat ./query-results/result_0.json | python3 -m json.tool 查看。

控制并发数的 Shell 脚本

当查询数量较多时(数十条以上),需要限制同时运行的进程数,避免超出 API 配额。以下脚本从文件逐行读取查询文本,控制最多 5 个进程同时执行。

#!/bin/bash
ACCOUNT_ID="<your-account-id>"
REGION="cn-hangzhou"
BUCKET="<your-vector-bucket>"
INDEX="<your-index>"
MODEL="text-embedding-v4"
MAX_CONCURRENT=5
QUERY_FILE="./queries.txt"  # 每行一条查询文本
mkdir -p ./query-results
run_query() {
  local idx=$1
  local text=$2
  oss-vectors-embed \
    --account-id "$ACCOUNT_ID" \
    --vectors-region "$REGION" \
    query \
    --vector-bucket-name "$BUCKET" \
    --index-name "$INDEX" \
    --model-id "$MODEL" \
    --text-value "$text" \
    --top-k 10 \
    > "./query-results/result_${idx}.json" 2>&1
}
idx=0
while IFS= read -r query_text; do
  run_query "$idx" "$query_text" &
  idx=$((idx + 1))
  # 达到并发上限时等待一个任务完成再继续
  if (( $(jobs -rp | wc -l) >= MAX_CONCURRENT )); then
    wait -n
  fi
done < "$QUERY_FILE"
wait
echo "全部 $idx 条查询完成"

运行前准备 queries.txt 文件,每行一条查询文本:

如何配置生命周期规则
对象存储有哪些存储类型
如何设置跨区域复制
Bucket 标签的使用限制
如何启用版本控制

运行后输出:

全部 5 条查询完成

Python 封装 CLI 并发

如果需要对 CLI 返回的结果进行后处理(如解析 JSON、汇总统计),可以用 Python asyncio 封装 CLI 调用。

import asyncio
import json
from pathlib import Path
ACCOUNT_ID = "<your-account-id>"
REGION = "cn-hangzhou"
BUCKET = "<your-vector-bucket>"
INDEX = "<your-index>"
MODEL = "text-embedding-v4"
MAX_CONCURRENT = 5
async def run_query(semaphore: asyncio.Semaphore, query_text: str, query_id: int):
    """异步执行单条 CLI query 命令"""
    async with semaphore:
        cmd = [
            "oss-vectors-embed",
            "--account-id", ACCOUNT_ID,
            "--vectors-region", REGION,
            "query",
            "--vector-bucket-name", BUCKET,
            "--index-name", INDEX,
            "--model-id", MODEL,
            "--text-value", query_text,
            "--top-k", "10",
            "--return-metadata",
        ]
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await proc.communicate()
        if proc.returncode == 0:
            result = json.loads(stdout.decode())
            print(f"查询 {query_id} 完成,返回 {len(result.get('results', [ ]))} 条结果")
            return {"query_id": query_id, "query_text": query_text, "result": result}
        else:
            print(f"查询 {query_id} 失败: {stderr.decode()}")
            return {"query_id": query_id, "query_text": query_text, "error": stderr.decode()}
async def batch_query(queries: list[str]):
    """批量并发执行多条查询"""
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    tasks = [
        run_query(semaphore, text, idx)
        for idx, text in enumerate(queries)
    ]
    results = await asyncio.gather(*tasks)
    output_path = Path("./query-results/batch_results.json")
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2))
    print(f"汇总结果已保存到 {output_path}")
    return results
if __name__ == "__main__":
    queries = [
        "如何配置生命周期规则",
        "对象存储有哪些存储类型",
        "如何设置跨区域复制",
        "Bucket 标签的使用限制",
        "如何启用版本控制",
    ]
    asyncio.run(batch_query(queries))

运行后输出:

查询 0 完成,返回 10 条结果
查询 1 完成,返回 10 条结果
查询 2 完成,返回 10 条结果
查询 3 完成,返回 10 条结果
查询 4 完成,返回 10 条结果
汇总结果已保存到 query-results/batch_results.json


通过 SDK 并发检索

SDK 并发直接调用 query_vectors API,无需启动外部进程。适合需要精细控制检索参数(如设置过滤条件)或将检索集成到业务服务中的场景。

说明:SDK 方式需要传入已生成的查询向量(如 float32 数组),而非原始文本。如果你的场景是从文本出发检索,建议先通过 Embedding 模型生成向量,或直接使用上文的 CLI 并发方式。

Python SDK 并发检索

使用 alibabacloud-oss-v2 Python SDK 通过线程池并发调用 query_vectors 接口。开始前请安装 SDK:

pip install alibabacloud-oss-v2

确保已配置环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRET,并已创建向量 Bucket 和索引。

基本示例

以下示例并发检索 5 组向量,使用 ThreadPoolExecutor 控制最多 5 个线程同时执行:

import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors
ACCOUNT_ID = "<your-account-id>"
REGION = "cn-hangzhou"
BUCKET = "<your-vector-bucket>"
INDEX = "<your-index>"
MAX_CONCURRENT = 5
def create_vector_client():
    """创建向量检索客户端(全局复用,避免重复创建连接)"""
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = REGION
    cfg.account_id = ACCOUNT_ID
    return oss_vectors.Client(cfg)
def run_query(client, query_vector, query_id, query_filter=None):
    """执行单条向量检索"""
    request = oss_vectors.models.QueryVectorsRequest(
        bucket=BUCKET,
        index_name=INDEX,
        query_vector=query_vector,
        filter=query_filter,
        return_distance=True,
        return_metadata=True,
        top_k=10,
    )
    result = client.query_vectors(request)
    print(f"查询 {query_id} 完成,status code: {result.status_code}")
    return {
        "query_id": query_id,
        "status_code": result.status_code,
        "vectors": [str(v) for v in result.vectors] if result.vectors else [ ],
    }
def batch_query(query_vectors):
    """批量并发执行多条向量检索"""
    client = create_vector_client()
    results = [ ]
    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
        futures = {
            executor.submit(run_query, client, qv, idx): idx
            for idx, qv in enumerate(query_vectors)
        }
        for future in as_completed(futures):
            idx = futures[future]
            try:
                results.append(future.result())
            except Exception as e:
                print(f"查询 {idx} 失败: {e}")
                results.append({"query_id": idx, "error": str(e)})
    results.sort(key=lambda x: x["query_id"])
    output_path = Path("./query-results/sdk_batch_results.json")
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2))
    print(f"汇总结果已保存到 {output_path}")
    return results
if __name__ == "__main__":
    # 示例:5 组查询向量(维度需与索引一致,此处以 128 维为例)
    query_vectors = [
        {"float32": [0.1] * 128},
        {"float32": [0.2] * 128},
        {"float32": [0.3] * 128},
        {"float32": [0.4] * 128},
        {"float32": [0.5] * 128},
    ]
    batch_query(query_vectors)

运行后输出:

查询 0 完成,status code: 200
查询 2 完成,status code: 200
查询 1 完成,status code: 200
查询 4 完成,status code: 200
查询 3 完成,status code: 200
汇总结果已保存到 query-results/sdk_batch_results.json

说明:由于线程池并发执行,输出顺序可能与提交顺序不同,但最终结果按 query_id 排序保存。

带过滤条件的并发检索

实际业务中,不同查询可能需要搭配不同的过滤条件。以下示例为每条查询指定独立的 filter

import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.vectors as oss_vectors
ACCOUNT_ID = "<your-account-id>"
REGION = "cn-hangzhou"
BUCKET = "<your-vector-bucket>"
INDEX = "<your-index>"
MAX_CONCURRENT = 5
def create_vector_client():
    """创建向量检索客户端(全局复用,避免重复创建连接)"""
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = REGION
    cfg.account_id = ACCOUNT_ID
    return oss_vectors.Client(cfg)
def run_query(client, query_vector, query_id, query_filter=None):
    """执行单条向量检索"""
    request = oss_vectors.models.QueryVectorsRequest(
        bucket=BUCKET,
        index_name=INDEX,
        query_vector=query_vector,
        filter=query_filter,
        return_distance=True,
        return_metadata=True,
        top_k=10,
    )
    result = client.query_vectors(request)
    print(f"查询 {query_id} 完成,status code: {result.status_code}")
    return {
        "query_id": query_id,
        "status_code": result.status_code,
        "vectors": [str(v) for v in result.vectors] if result.vectors else [ ],
    }
def batch_query(query_vectors):
    """批量并发执行多条向量检索"""
    client = create_vector_client()
    results = [ ]
    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
        futures = {
            executor.submit(run_query, client, qv, idx): idx
            for idx, qv in enumerate(query_vectors)
        }
        for future in as_completed(futures):
            idx = futures[future]
            try:
                results.append(future.result())
            except Exception as e:
                print(f"查询 {idx} 失败: {e}")
                results.append({"query_id": idx, "error": str(e)})
    results.sort(key=lambda x: x["query_id"])
    output_path = Path("./query-results/sdk_batch_results.json")
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(results, ensure_ascii=False, indent=2))
    print(f"汇总结果已保存到 {output_path}")
    return results
if __name__ == "__main__":
    tasks = [
        {
            "vector": {"float32": [0.1] * 128},
            "filter": {"$and": [{"type": {"$in": ["tutorial"]}}]},
        },
        {
            "vector": {"float32": [0.2] * 128},
            "filter": {"$and": [{"type": {"$nin": ["comedy", "documentary"]}}]},
        },
        {
            "vector": {"float32": [0.3] * 128},
            "filter": None,  # 不设过滤条件
        },
    ]
    client = create_vector_client()
    results = [ ]
    with ThreadPoolExecutor(max_workers=MAX_CONCURRENT) as executor:
        futures = {
            executor.submit(
                run_query, client, t["vector"], idx, t["filter"]
            ): idx
            for idx, t in enumerate(tasks)
        }
        for future in as_completed(futures):
            try:
                results.append(future.result())
            except Exception as e:
                print(f"查询失败: {e}")
    for r in sorted(results, key=lambda x: x["query_id"]):
        print(f"查询 {r['query_id']}: 返回 {len(r.get('vectors', [ ]))} 条结果")

运行后输出:

查询 0 完成,status code: 200
查询 1 完成,status code: 200
查询 2 完成,status code: 200
查询 0: 返回 10 条结果
查询 1: 返回 10 条结果
查询 2: 返回 10 条结果

Go SDK 并发检索

使用 alibabacloud-oss-go-sdk-v2 Go SDK 通过 goroutine 并发调用 QueryVectors 接口。开始前请安装 SDK:

go get github.com/aliyun/alibabacloud-oss-go-sdk-v2

确保已配置环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRET,并已创建向量 Bucket 和索引。

基本示例

以下示例使用 sync.WaitGroup 和 channel 信号量并发检索 5 组向量:

package main
import (
  "context"
  "fmt"
  "log"
  "sync"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)
const (
  region        = "cn-hangzhou"
  bucketName    = "<your-vector-bucket>"
  accountId     = "<your-account-id>"
  indexName     = "<your-index>"
  maxConcurrent = 5
)
func main() {
  cfg := oss.LoadDefaultConfig().
    WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
    WithRegion(region).
    WithAccountId(accountId)
  client := vectors.NewVectorsClient(cfg)
  // 5 组查询向量(维度需与索引一致)
  queryVectors := [ ]map[string]any{
    {"float32": [ ]float32{0.1}},
    {"float32": [ ]float32{0.2}},
    {"float32": [ ]float32{0.3}},
    {"float32": [ ]float32{0.4}},
    {"float32": [ ]float32{0.5}},
  }
  var wg sync.WaitGroup
  sem := make(chan struct{}, maxConcurrent) // channel 信号量控制并发数
  for i, qv := range queryVectors {
    wg.Add(1)
    sem <- struct{}{} // 获取信号量,达到上限时阻塞
    go func(idx int, queryVector map[string]any) {
      defer wg.Done()
      defer func() { <-sem }() // 释放信号量
      request := &vectors.QueryVectorsRequest{
        Bucket:         oss.Ptr(bucketName),
        IndexName:      oss.Ptr(indexName),
        QueryVector:    queryVector,
        ReturnMetadata: oss.Ptr(true),
        ReturnDistance:  oss.Ptr(true),
        TopK:           oss.Ptr(10),
      }
      result, err := client.QueryVectors(context.TODO(), request)
      if err != nil {
        log.Printf("查询 %d 失败: %v", idx, err)
        return
      }
      fmt.Printf("查询 %d 完成,status code: %d\n",
        idx, result.StatusCode)
    }(i, qv)
  }
  wg.Wait()
  fmt.Println("全部查询完成")
}

运行后输出:

查询 0 完成,status code: 200
查询 2 完成,status code: 200
查询 1 完成,status code: 200
查询 3 完成,status code: 200
查询 4 完成,status code: 200
全部查询完成

带过滤条件的并发检索

package main
import (
  "context"
  "fmt"
  "log"
  "sync"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
  "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors"
)
type queryTask struct {
  vector map[string]any
  filter map[string]any
}
const (
  region        = "cn-hangzhou"
  bucketName    = "<your-vector-bucket>"
  accountId     = "<your-account-id>"
  indexName     = "<your-index>"
  maxConcurrent = 5
)
func main() {
  cfg := oss.LoadDefaultConfig().
    WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
    WithRegion(region).
    WithAccountId(accountId)
  client := vectors.NewVectorsClient(cfg)
  tasks := [ ]queryTask{
    {
      vector: map[string]any{"float32": [ ]float32{0.1}},
      filter: map[string]any{
        "$and": [ ]map[string]any{
          {"type": map[string]any{"$in": [ ]string{"tutorial"}}},
        },
      },
    },
    {
      vector: map[string]any{"float32": [ ]float32{0.2}},
      filter: map[string]any{
        "$and": [ ]map[string]any{
          {"type": map[string]any{"$nin": [ ]string{"comedy", "documentary"}}},
        },
      },
    },
    {
      vector: map[string]any{"float32": [ ]float32{0.3}},
      filter: nil, // 不设过滤条件
    },
  }
  var wg sync.WaitGroup
  sem := make(chan struct{}, maxConcurrent)
  for i, task := range tasks {
    wg.Add(1)
    sem <- struct{}{}
    go func(idx int, t queryTask) {
      defer wg.Done()
      defer func() { <-sem }()
      request := &vectors.QueryVectorsRequest{
        Bucket:         oss.Ptr(bucketName),
        IndexName:      oss.Ptr(indexName),
        QueryVector:    t.vector,
        ReturnMetadata: oss.Ptr(true),
        ReturnDistance:  oss.Ptr(true),
        TopK:           oss.Ptr(10),
      }
      if t.filter != nil {
        request.Filter = t.filter
      }
      result, err := client.QueryVectors(context.TODO(), request)
      if err != nil {
        log.Printf("查询 %d 失败: %v", idx, err)
        return
      }
      fmt.Printf("查询 %d 完成,status code: %d\n", idx, result.StatusCode)
    }(i, task)
  }
  wg.Wait()
  fmt.Println("全部查询完成")
}

运行后输出:

查询 0 完成,status code: 200
查询 1 完成,status code: 200
查询 2 完成,status code: 200
全部查询完成


并发性能调优

调优项

建议

说明

并发数

3~5

query 并发建议不超过 5,与 put 命令的最大并发一致,避免触发限流

top_k

按需设置

返回结果越多,单次请求延迟越高。仅返回业务所需的数量

错误重试

间隔 1~2 秒

并发请求可能触发限流(HTTP 429),建议捕获错误后等待重试

CLI 结果输出

重定向到文件

多个进程同时输出到终端会导致内容交错,建议将每条结果写入独立文件

SDK 客户端复用

复用同一实例

避免为每条查询创建新的 Client,重复创建会增加连接建立和认证开销


相关文档

相关实践学习
对象存储OSS快速上手——如何使用ossbrowser
本实验是对象存储OSS入门级实验。通过本实验,用户可学会如何用对象OSS的插件,进行简单的数据存、查、删等操作。
相关文章
|
21小时前
|
运维 开发工具 数据安全/隐私保护
Vector 基于多索引表架构应对海量多租户的大规模向量检索
OSS 向量 Bucket 支持多索引架构,通过按租户/业务创建独立索引,实现数据隔离、并发检索与结果合并,兼顾高安全性与毫秒级响应,轻松应对千万级向量的多租户 RAG 与语义搜索场景。
|
21小时前
|
存储 文字识别 开发工具
Vector 构建原始文件和向量数据之间的映射关系
OSS 向量 Bucket 的检索结果返回的是向量 Key 和 Metadata,而非原始文件本身。要将检索结果关联回原始文件(如图片、文档、视频),需要在写入向量时构建映射关系。
|
2月前
|
存储 数据可视化 机器人
从 LangChain 到 LangGraph 构建可控 Agent 的工程实践
本文详解如何用 LangChain 的 `create_agent` 与 LangGraph 的 `StateGraph` 构建有状态、可控制、可调试的智能 Agent:支持条件判断、工具调用、流程分支与短期记忆,告别线性 Chain,迈向工程化图智能体。
1203 1
|
8月前
|
弹性计算 监控 网络协议
阿里云精品BGP线路EIP助力香港云服务器访问加速
香港云服务器因默认BGP线路问题常导致大陆访问延迟高、丢包严重。阿里云国际站推出精品BGP线路EIP,通过直连优化,实现低至80ms延迟、高稳定性跨境访问,助企业提升业务体验。
|
11月前
|
人工智能 Kubernetes Cloud Native
我们香港见!阿里云亮相 KubeCon China 2025
阿里云亮相KubeCon + CloudNativeCon China 2025,带来多个技术议题分享,覆盖容涉及 AI 模型分发、Argo 工作流、Fluid 数据管理、Kubernetes 运维等多个热门话题,欢迎大家前来与我们零距离交流,共同探讨云原生技术!
|
机器学习/深度学习 人工智能 算法
《探秘卷积神经网络的核心—卷积核》
卷积神经网络(CNN)在图像和语音识别等领域取得显著成就,卷积核作为其核心组件发挥关键作用。卷积核是滑动于输入数据上的小矩阵,通过卷积操作提取特征,参数共享机制减少模型复杂度并提高鲁棒性。不同类型的卷积核(如标准、深度可分离和扩张卷积核)适用于多种任务,为CNN的成功奠定基础。
1120 5
|
存储 缓存 Serverless
使用云存储构建云上推理平台
本文介绍了大模型分布式推理的工作流、IO分析、存储需求及解决方案。通过分布式缓存和P2P能力,优化了大规模并发场景下的模型加载与分发效率,提升了推理性能。NAS文件存储和OSS加速器在高并发读取和小模型缓存中表现出色,支持秒级加载和高效数据处理。阿里云存储为开发者提供了稳定、高效的推理环境,助力AI应用快速落地。
|
存储 人工智能 大数据
AI驱动下的云存储创新
随着大数据时代的到来,云存储作为数据存储和管理的核心基础设施,其重要性日益凸显。同时, AI 快速发展也为云存储的进化与创新提供了强大的驱动力。本话题将解读AI 驱动下云存储的进化趋势,分享阿里云存储的创新技术,助力企业实现数字化升级。
1020 5