API请求执行流程
1.milvus客户端发起api rpc请求,请求内容为request。
2.proxy接受api请求,将request包装为task。
3.将task压入队列。
4.调度器执行队列中的task。
以创建collection的API(CreateCollection)为例:
1.客户端发起创建collection的请求。
from pymilvus import (
connections,
FieldSchema, CollectionSchema, DataType,
Collection,
)
num_entities, dim = 3000, 1024
print(f"start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")
fields = [
FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
FieldSchema(name="random", dtype=DataType.DOUBLE),
FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")
print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus2", schema, consistency_level="Strong",shards_num=2)
2.proxy接受客户端发送过来的request,将其包装为createCollectionTask。
将createCollectionTask压入队列ddTaskQueue,等待调度器执行。
代码路径:internal\proxy\impl.go
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
...省略...
// 将request包装为createCollectionTask
cct := &createCollectionTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: request,
rootCoord: node.rootCoord,
}
...省略...
// 将task压入队列ddTaskQueue
if err := node.sched.ddQueue.Enqueue(cct); err != nil {
log.Warn(
rpcFailedToEnqueue(method),
zap.Error(err))
...省略...
// 等待task执行完成
if err := cct.WaitToFinish(); err != nil {
log.Warn(
rpcFailedToWaitToFinish(method),
zap.Error(err),
zap.Uint64("BeginTs", cct.BeginTs()),
zap.Uint64("EndTs", cct.EndTs()))
...省略...
}
3.调度器执行队列中的task。
会依次执行cct的PreExecute()、Execute()、PostExecute()方法。
PreExecute()一般用来做预处理。
Execute()真正执行task的任务。
PostExecute()用来task完成后执行的动作,一般直接返回nil,也就是什么都不做。
代码路径:internal\proxy\task.go
type createCollectionTask struct {
Condition
*milvuspb.CreateCollectionRequest
ctx context.Context
rootCoord types.RootCoordClient
result *commonpb.Status
schema *schemapb.CollectionSchema
}
func (t *createCollectionTask) PreExecute(ctx context.Context) error {
...省略...
}
func (t *createCollectionTask) Execute(ctx context.Context) error {
var err error
t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest)
return err
}
func (t *createCollectionTask) PostExecute(ctx context.Context) error {
return nil
}
为什么会是PreExecute()、Execute()、PostExecute()这个顺序,这个就需要阅读task调度器的源码了。
代码路径:internal\proxy\task_scheduler.go
核心代码如下:
task压入队列后执行的是processTask()方法。
func (sched *taskScheduler) processTask(t task, q taskQueue) {
......
err := t.PreExecute(ctx)
......
err = t.Execute(ctx)
......
err = t.PostExecute(ctx)
......
}
这里再思考另一个问题,processTask()是由谁调用的,调度器是什么时候启动的。
task_scheduler.go有一个方法Start()。由这个方法启动一个goroutine进行调度。
// ddQueue *ddTaskQueue
// dmQueue *dmTaskQueue
// dqQueue *dqTaskQueue
// dcQueue *ddTaskQueue
func (sched *taskScheduler) Start() error {
sched.wg.Add(1)
// ddQueue的调度,数据定义的task
go sched.definitionLoop()
sched.wg.Add(1)
// dcQueue的调度,数据控制的task
go sched.controlLoop()
sched.wg.Add(1)
// dmQueue的调度,数据操作的task
go sched.manipulationLoop()
sched.wg.Add(1)
// dqQueue的调度,数据查询的task
go sched.queryLoop()
return nil
}
createCollectionTask是数据定义语言,走go sched.definitionLoop()这条路径。
// definitionLoop schedules the ddl tasks.
func (sched *taskScheduler) definitionLoop() {
defer sched.wg.Done()
for {
select {
case <-sched.ctx.Done():
return
case <-sched.ddQueue.utChan():
if !sched.ddQueue.utEmpty() {
t := sched.scheduleDdTask()
sched.processTask(t, sched.ddQueue)
}
}
}
}
在这里可以看到processTask()方法的调用。for循环里,只要通道有值就会调用processTask()方法。
这样PreExecute()、Execute()、PostExecute()的逻辑就搞清楚了。