API请求执行流程_milvus源码解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: API请求执行流程_milvus源码解析

API请求执行流程

1.milvus客户端发起api rpc请求,请求内容为request。

2.proxy接受api请求,将request包装为task。

3.将task压入队列。

4.调度器执行队列中的task。

api请求执行流程.jpg

以创建collection的API(CreateCollection)为例:

1.客户端发起创建collection的请求。

from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 3000, 1024

print(f"start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus2", schema, consistency_level="Strong",shards_num=2)

2.proxy接受客户端发送过来的request,将其包装为createCollectionTask。

将createCollectionTask压入队列ddTaskQueue,等待调度器执行。

代码路径:internal\proxy\impl.go

func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
   
   
    ...省略...
    // 将request包装为createCollectionTask
    cct := &createCollectionTask{
   
   
        ctx:                     ctx,
        Condition:               NewTaskCondition(ctx),
        CreateCollectionRequest: request,
        rootCoord:               node.rootCoord,
    }

    ...省略...
    // 将task压入队列ddTaskQueue
    if err := node.sched.ddQueue.Enqueue(cct); err != nil {
   
   
        log.Warn(
            rpcFailedToEnqueue(method),
            zap.Error(err))

    ...省略...
    // 等待task执行完成
    if err := cct.WaitToFinish(); err != nil {
   
   
        log.Warn(
            rpcFailedToWaitToFinish(method),
            zap.Error(err),
            zap.Uint64("BeginTs", cct.BeginTs()),
            zap.Uint64("EndTs", cct.EndTs()))

    ...省略...
}

3.调度器执行队列中的task。

会依次执行cct的PreExecute()、Execute()、PostExecute()方法。

PreExecute()一般用来做预处理。

Execute()真正执行task的任务。

PostExecute()用来task完成后执行的动作,一般直接返回nil,也就是什么都不做。

代码路径:internal\proxy\task.go

type createCollectionTask struct {
   
   
    Condition
    *milvuspb.CreateCollectionRequest
    ctx       context.Context
    rootCoord types.RootCoordClient
    result    *commonpb.Status
    schema    *schemapb.CollectionSchema
}

func (t *createCollectionTask) PreExecute(ctx context.Context) error {
   
   
    ...省略...
}

func (t *createCollectionTask) Execute(ctx context.Context) error {
   
   
    var err error
    t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest)
    return err
}

func (t *createCollectionTask) PostExecute(ctx context.Context) error {
   
   
    return nil
}

为什么会是PreExecute()、Execute()、PostExecute()这个顺序,这个就需要阅读task调度器的源码了。

代码路径:internal\proxy\task_scheduler.go

核心代码如下:

task压入队列后执行的是processTask()方法。

func (sched *taskScheduler) processTask(t task, q taskQueue) {
   
   
    ......

    err := t.PreExecute(ctx)

    ......
    err = t.Execute(ctx)

    ......
    err = t.PostExecute(ctx)
    ......
}

这里再思考另一个问题,processTask()是由谁调用的,调度器是什么时候启动的。

task_scheduler.go有一个方法Start()。由这个方法启动一个goroutine进行调度。

// ddQueue *ddTaskQueue
// dmQueue *dmTaskQueue
// dqQueue *dqTaskQueue
// dcQueue *ddTaskQueue

func (sched *taskScheduler) Start() error {
   
   
    sched.wg.Add(1)
    // ddQueue的调度,数据定义的task
    go sched.definitionLoop()

    sched.wg.Add(1)
    // dcQueue的调度,数据控制的task
    go sched.controlLoop()

    sched.wg.Add(1)
    // dmQueue的调度,数据操作的task
    go sched.manipulationLoop()

    sched.wg.Add(1)
    // dqQueue的调度,数据查询的task
    go sched.queryLoop()

    return nil
}

createCollectionTask是数据定义语言,走go sched.definitionLoop()这条路径。

// definitionLoop schedules the ddl tasks.
func (sched *taskScheduler) definitionLoop() {
   
   
    defer sched.wg.Done()
    for {
   
   
        select {
   
   
        case <-sched.ctx.Done():
            return
        case <-sched.ddQueue.utChan():
            if !sched.ddQueue.utEmpty() {
   
   
                t := sched.scheduleDdTask()
                sched.processTask(t, sched.ddQueue)
            }
        }
    }
}

在这里可以看到processTask()方法的调用。for循环里,只要通道有值就会调用processTask()方法。

这样PreExecute()、Execute()、PostExecute()的逻辑就搞清楚了。

目录
相关文章
|
8天前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
46 3
|
10天前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
26 0
|
10天前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
25 0
|
10天前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
21 0
|
10天前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
24 0
|
8天前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
29 5
|
10天前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
|
10天前
|
XML Java 数据格式
Spring底层架构源码解析(二)
Spring底层架构源码解析(二)
|
11天前
|
存储 Java API
从源码角度解析ArrayList.subList的几个坑!
从源码角度解析ArrayList.subList的几个坑!
|
5天前
|
前端开发 JavaScript UED
axios取消请求CancelToken的原理解析及用法示例
axios取消请求CancelToken的原理解析及用法示例
14 0

推荐镜像

更多