API请求执行流程_milvus源码解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: API请求执行流程_milvus源码解析

API请求执行流程

1.milvus客户端发起api rpc请求,请求内容为request。

2.proxy接受api请求,将request包装为task。

3.将task压入队列。

4.调度器执行队列中的task。

api请求执行流程.jpg

以创建collection的API(CreateCollection)为例:

1.客户端发起创建collection的请求。

from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 3000, 1024

print(f"start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus2", schema, consistency_level="Strong",shards_num=2)

2.proxy接受客户端发送过来的request,将其包装为createCollectionTask。

将createCollectionTask压入队列ddTaskQueue,等待调度器执行。

代码路径:internal\proxy\impl.go

func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
   
   
    ...省略...
    // 将request包装为createCollectionTask
    cct := &createCollectionTask{
   
   
        ctx:                     ctx,
        Condition:               NewTaskCondition(ctx),
        CreateCollectionRequest: request,
        rootCoord:               node.rootCoord,
    }

    ...省略...
    // 将task压入队列ddTaskQueue
    if err := node.sched.ddQueue.Enqueue(cct); err != nil {
   
   
        log.Warn(
            rpcFailedToEnqueue(method),
            zap.Error(err))

    ...省略...
    // 等待task执行完成
    if err := cct.WaitToFinish(); err != nil {
   
   
        log.Warn(
            rpcFailedToWaitToFinish(method),
            zap.Error(err),
            zap.Uint64("BeginTs", cct.BeginTs()),
            zap.Uint64("EndTs", cct.EndTs()))

    ...省略...
}

3.调度器执行队列中的task。

会依次执行cct的PreExecute()、Execute()、PostExecute()方法。

PreExecute()一般用来做预处理。

Execute()真正执行task的任务。

PostExecute()用来task完成后执行的动作,一般直接返回nil,也就是什么都不做。

代码路径:internal\proxy\task.go

type createCollectionTask struct {
   
   
    Condition
    *milvuspb.CreateCollectionRequest
    ctx       context.Context
    rootCoord types.RootCoordClient
    result    *commonpb.Status
    schema    *schemapb.CollectionSchema
}

func (t *createCollectionTask) PreExecute(ctx context.Context) error {
   
   
    ...省略...
}

func (t *createCollectionTask) Execute(ctx context.Context) error {
   
   
    var err error
    t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest)
    return err
}

func (t *createCollectionTask) PostExecute(ctx context.Context) error {
   
   
    return nil
}

为什么会是PreExecute()、Execute()、PostExecute()这个顺序,这个就需要阅读task调度器的源码了。

代码路径:internal\proxy\task_scheduler.go

核心代码如下:

task压入队列后执行的是processTask()方法。

func (sched *taskScheduler) processTask(t task, q taskQueue) {
   
   
    ......

    err := t.PreExecute(ctx)

    ......
    err = t.Execute(ctx)

    ......
    err = t.PostExecute(ctx)
    ......
}

这里再思考另一个问题,processTask()是由谁调用的,调度器是什么时候启动的。

task_scheduler.go有一个方法Start()。由这个方法启动一个goroutine进行调度。

// ddQueue *ddTaskQueue
// dmQueue *dmTaskQueue
// dqQueue *dqTaskQueue
// dcQueue *ddTaskQueue

func (sched *taskScheduler) Start() error {
   
   
    sched.wg.Add(1)
    // ddQueue的调度,数据定义的task
    go sched.definitionLoop()

    sched.wg.Add(1)
    // dcQueue的调度,数据控制的task
    go sched.controlLoop()

    sched.wg.Add(1)
    // dmQueue的调度,数据操作的task
    go sched.manipulationLoop()

    sched.wg.Add(1)
    // dqQueue的调度,数据查询的task
    go sched.queryLoop()

    return nil
}

createCollectionTask是数据定义语言,走go sched.definitionLoop()这条路径。

// definitionLoop schedules the ddl tasks.
func (sched *taskScheduler) definitionLoop() {
   
   
    defer sched.wg.Done()
    for {
   
   
        select {
   
   
        case <-sched.ctx.Done():
            return
        case <-sched.ddQueue.utChan():
            if !sched.ddQueue.utEmpty() {
   
   
                t := sched.scheduleDdTask()
                sched.processTask(t, sched.ddQueue)
            }
        }
    }
}

在这里可以看到processTask()方法的调用。for循环里,只要通道有值就会调用processTask()方法。

这样PreExecute()、Execute()、PostExecute()的逻辑就搞清楚了。

目录
相关文章
|
2天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
14 2
|
24天前
|
存储 缓存 搜索推荐
Lazada淘宝详情API的价值与应用解析
在电商行业,数据是驱动业务增长的核心。Lazada作为东南亚知名电商平台,其商品详情API对电商行业影响深远。本文探讨了Lazada商品详情API的重要性,包括提供全面准确的商品信息、增强平台竞争力、促进销售转化、支持用户搜索和发现需求、数据驱动决策、竞品分析、用户行为研究及提升购物体验。文章还介绍了如何通过Lazada提供的API接口、编写代码及使用第三方工具实现实时数据获取。
51 3
|
3天前
|
API 数据安全/隐私保护
抖音视频,图集无水印直链解析免费API接口教程
该接口用于解析抖音视频和图集的无水印直链地址。请求地址为 `https://cn.apihz.cn/api/fun/douyin.php`,支持POST或GET请求。请求参数包括用户ID、用户KEY和视频或图集地址。返回参数包括状态码、信息提示、作者昵称、标题、视频地址、封面、图集和类型。示例请求和返回数据详见文档。
|
21天前
|
存储 数据可视化 API
API接口数据获取流程的细化
本文概述了API的基础知识、获取API访问权限的方法、编写代码调用API的步骤、数据处理与分析技巧以及数据安全与合规的重要性,并提供了社交媒体数据分析、天气预报应用和电商数据分析等API数据获取的应用实例,旨在帮助读者全面了解和实践API接口数据获取的流程。
|
21天前
|
缓存 监控 API
抖音抖店 API 请求获取宝贝详情数据的调用频率限制如何调整?
抖音抖店API请求获取宝贝详情数据的调用频率受限,需遵循平台规则。开发者可通过提升账号等级、申请更高配额、优化业务逻辑(如缓存数据、异步处理、批量请求)及监控调整等方式来应对。
|
23天前
|
缓存 负载均衡 API
抖音抖店API请求获取宝贝详情数据、原价、销量、主图等参数可支持高并发调用接入演示
这是一个使用Python编写的示例代码,用于从抖音抖店API获取商品详情,包括原价、销量和主图等信息。示例展示了如何构建请求、处理响应及提取所需数据。针对高并发场景,建议采用缓存、限流、负载均衡、异步处理及代码优化等策略,以提升性能和稳定性。
|
3天前
|
JSON API 数据格式
携程API接口系列,酒店景点详情请求示例参考
携程API接口系列涵盖了酒店预订、机票预订、旅游度假产品预订、景点门票预订等多个领域,其中酒店和景点详情请求是较为常用的功能。以下提供酒店和景点详情请求的示例参考
|
1月前
|
JavaScript 前端开发 Java
多种语言请求API接口方法
每种语言和库的选择取决于具体需求、项目环境以及个人偏好。了解这些基本方法,开发者就可以根据项目需求选择合适的语言和库来高效地与API交互。
37 1
|
1月前
|
存储 数据可视化 JavaScript
可视化集成API接口请求+变量绑定+源码输出
可视化集成API接口请求+变量绑定+源码输出
43 4
|
1月前
|
JavaScript 前端开发 UED
Vue执行流程及渲染解析
【10月更文挑战第5天】

推荐镜像

更多