CreateCollection API执行流程_milvus源码解析

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: CreateCollection API执行流程_milvus源码解析

CreateCollection API执行流程源码解析

milvus版本:v2.3.2

CreateCollection这个API流程较长,也是milvus的核心API之一,涉及的内容比较复杂。这里只介绍和元数据相关的流程。

整体架构:

architecture.png

CreateCollection 的数据流向:

create_collection数据流向.jpg

1.客户端sdk发出CreateCollection API请求。

from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 3000, 1024

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)

客户端SDK向proxy发送一个CreateCollection API请求,创建一个名为hello_milvus的collection。

hello_milvus.jpg

2.客户端接受API请求,将request封装为createCollectionTask,并压入ddQueue队列。

代码路径:internal\proxy\impl.go

func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
   
   
    ......
    // request封装为task
    cct := &createCollectionTask{
   
   
        ctx:                     ctx,
        Condition:               NewTaskCondition(ctx),
        CreateCollectionRequest: request,
        rootCoord:               node.rootCoord,
    }

    ......
    // 将task压入ddQueue队列
    if err := node.sched.ddQueue.Enqueue(cct); err != nil {
   
   
        ......
    }

    ......
    // 等待cct执行完
    if err := cct.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

3.执行createCollectionTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task.go

func (t *createCollectionTask) Execute(ctx context.Context) error {
   
   
    var err error
    t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest)
    return err
}

从代码可以看出调用了rootCoord的CreateCollection接口。

4.进入rootCoord的CreateCollection接口。

代码路径:internal\rootcoord\root_coord.go

继续将请求封装为rootcoord里的createCollectionTask

func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
   
   
    ......
    // 封装为createCollectionTask
    t := &createCollectionTask{
   
   
        baseTask: newBaseTask(ctx, c),
        Req:      in,
    }
    // 加入调度
    if err := c.scheduler.AddTask(t); err != nil {
   
   
        ......
    }
    // 等待task完成
    if err := t.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

5.执行createCollectionTask的Prepare、Execute、NotifyDone方法。

Execute()为核心方法。

代码路径:internal\rootcoord\create_collection_task.go

func (t *createCollectionTask) Execute(ctx context.Context) error {
   
   
    // collID为collectionID,在Prepare()里分配
    // partIDs为partitionID,在Prepare()里分配
    collID := t.collID
    partIDs := t.partIDs
    // 产生时间戳
    ts, err := t.getCreateTs()
    if err != nil {
   
   
        return err
    }
    // vchanNames为虚拟channel,在Prepare()里分配
    // chanNames为物理channel,在Prepare()里分配
    vchanNames := t.channels.virtualChannels
    chanNames := t.channels.physicalChannels

    startPositions, err := t.addChannelsAndGetStartPositions(ctx, ts)
    if err != nil {
   
   
        t.core.chanTimeTick.removeDmlChannels(t.channels.physicalChannels...)
        return err
    }
    // 填充partition,创建collection的时候,默认只有一个名为"Default partition"的partition。
    partitions := make([]*model.Partition, len(partIDs))
    for i, partID := range partIDs {
   
   
        partitions[i] = &model.Partition{
   
   
            PartitionID:               partID,
            PartitionName:             t.partitionNames[i],
            PartitionCreatedTimestamp: ts,
            CollectionID:              collID,
            State:                     pb.PartitionState_PartitionCreated,
        }
    }
    // 填充collection
    // 可以看出collection由collID、dbid、schemaName、fields、vchanName、chanName、partition、shardNum等组成
    collInfo := model.Collection{
   
   
        CollectionID:         collID,
        DBID:                 t.dbID,
        Name:                 t.schema.Name,
        Description:          t.schema.Description,
        AutoID:               t.schema.AutoID,
        Fields:               model.UnmarshalFieldModels(t.schema.Fields),
        VirtualChannelNames:  vchanNames,
        PhysicalChannelNames: chanNames,
        ShardsNum:            t.Req.ShardsNum,
        ConsistencyLevel:     t.Req.ConsistencyLevel,
        StartPositions:       toKeyDataPairs(startPositions),
        CreateTime:           ts,
        State:                pb.CollectionState_CollectionCreating,
        Partitions:           partitions,
        Properties:           t.Req.Properties,
        EnableDynamicField:   t.schema.EnableDynamicField,
    }

    clone := collInfo.Clone()

    existedCollInfo, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), typeutil.MaxTimestamp)
    if err == nil {
   
   
        equal := existedCollInfo.Equal(*clone)
        if !equal {
   
   
            return fmt.Errorf("create duplicate collection with different parameters, collection: %s", t.Req.GetCollectionName())
        }

        log.Warn("add duplicate collection", zap.String("collection", t.Req.GetCollectionName()), zap.Uint64("ts", ts))
        return nil
    }
    // 分为多个step执行,每一个undoTask由todoStep和undoStep构成
    // 执行todoStep,报错则执行undoStep
    undoTask := newBaseUndoTask(t.core.stepExecutor)
    undoTask.AddStep(&expireCacheStep{
   
   
        baseStep:        baseStep{
   
   core: t.core},
        dbName:          t.Req.GetDbName(),
        collectionNames: []string{
   
   t.Req.GetCollectionName()},
        collectionID:    InvalidCollectionID,
        ts:              ts,
    }, &nullStep{
   
   })
    undoTask.AddStep(&nullStep{
   
   }, &removeDmlChannelsStep{
   
   
        baseStep:  baseStep{
   
   core: t.core},
        pChannels: chanNames,
    }) 
    undoTask.AddStep(&addCollectionMetaStep{
   
   
        baseStep: baseStep{
   
   core: t.core},
        coll:     &collInfo,
    }, &deleteCollectionMetaStep{
   
   
        baseStep:     baseStep{
   
   core: t.core},
        collectionID: collID,
        ts: ts,
    })

    undoTask.AddStep(&nullStep{
   
   }, &unwatchChannelsStep{
   
   
        baseStep:     baseStep{
   
   core: t.core},
        collectionID: collID,
        channels:     t.channels,
        isSkip:       !Params.CommonCfg.TTMsgEnabled.GetAsBool(),
    })
    undoTask.AddStep(&watchChannelsStep{
   
   
        baseStep: baseStep{
   
   core: t.core},
        info: &watchInfo{
   
   
            ts:             ts,
            collectionID:   collID,
            vChannels:      t.channels.virtualChannels,
            startPositions: toKeyDataPairs(startPositions),
            schema: &schemapb.CollectionSchema{
   
   
                Name:        collInfo.Name,
                Description: collInfo.Description,
                AutoID:      collInfo.AutoID,
                Fields:      model.MarshalFieldModels(collInfo.Fields),
            },
        },
    }, &nullStep{
   
   })
    undoTask.AddStep(&changeCollectionStateStep{
   
   
        baseStep:     baseStep{
   
   core: t.core},
        collectionID: collID,
        state:        pb.CollectionState_CollectionCreated,
        ts:           ts,
    }, &nullStep{
   
   })

    return undoTask.Execute(ctx)
}

创建collection涉及多个步骤,可以看出这里依次分为expireCacheStep、addCollectionMetaStep、watchChannelsStep、changeCollectionStateStep这几个步骤,关于etcd元数据的操作,这里重点关注addCollectionMetaStep。其余step另用篇幅进行讲解。

6.进入addCollectionMetaStep,执行其Execute()方法。

代码路径:internal\rootcoord\step.go

func (s *addCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
   
   
    err := s.core.meta.AddCollection(ctx, s.coll)
    return nil, err
}

在这里重点研究s.core.meta.AddCollection()这个方法做了什么事情。

调用栈如下:

CreateCollection()(internal\proxy\impl.go)
 |--Execute()(internal\proxy\task.go)
   |--t.rootCoord.CreateCollection()(同上)
     |--CreateCollection()(rpc调用,internal\rootcoord\root_coord.go)
       |--Execute()(internal\rootcoord\create_collection_task.go)
         |--Execute()(internal\rootcoord\step.go)
           |--s.core.meta.AddCollection()
             |--AddCollection()(internal\rootcoord\meta_table.go)
               |--mt.catalog.CreateCollection()
                 |--CreateCollection()(internal\metastore\kv\rootcoord\kv_catalog.go)
                   |--kc.Snapshot.Save()
                   |--etcd.SaveByBatchWithLimit()

create_collection堆栈.jpg

在etcd产生collection相关的key:

==root-coord/database/collection-info/1/445652621026918798==

value的值的结构为etcdpb.CollectionInfo,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

collSchema := &schemapb.CollectionSchema{
   
   
    Name:               coll.Name,
    Description:        coll.Description,
    AutoID:             coll.AutoID,
    EnableDynamicField: coll.EnableDynamicField,
}

collectionPb := &pb.CollectionInfo{
   
   
    ID:                   coll.CollectionID,
    DbId:                 coll.DBID,
    Schema:               collSchema,
    CreateTime:           coll.CreateTime,
    VirtualChannelNames:  coll.VirtualChannelNames,
    PhysicalChannelNames: coll.PhysicalChannelNames,
    ShardsNum:            coll.ShardsNum,
    ConsistencyLevel:     coll.ConsistencyLevel,
    StartPositions:       coll.StartPositions,
    State:                coll.State,
    Properties:           coll.Properties,
}

collectionInfo.jpg

可以看出collection由ID、DbId、schema等组成,其中schema不记录Fields,也不记录partitionID、partitionName、FieldIndex。其它信息由另外的key-value记录。

func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
   
   
    if coll.State != pb.CollectionState_CollectionCreating {
   
   
        return fmt.Errorf("cannot create collection with state: %s, collection: %s", coll.State.String(), coll.Name)
    }
    // 构建key的规则
    k1 := BuildCollectionKey(coll.DBID, coll.CollectionID)
    collInfo := model.MarshalCollectionModel(coll)
    // 序列化
    v1, err := proto.Marshal(collInfo)
    if err != nil {
   
   
        return fmt.Errorf("failed to marshal collection info: %s", err.Error())
    }

    // 写入etcd
    if err := kc.Snapshot.Save(k1, string(v1), ts); err != nil {
   
   
        return err
    }

    ......
}

跟踪BuildCollectionKey()函数,不难得出key的规则。整理如下:

key规则:

  • 前缀/root-coord/database/collection-info/{dbID}/{collectionID}
  • 前缀/snapshots/root-coord/database/collection-info/{dbID}/{collectionID}_ts{时间戳}

根据路径能够反映出collection属于哪个DB。默认数据库名为default,dbID为1。

在etcd还会产生partition相关的key:

==root-coord/partitions/445653146967736660/445653146967736661==

value的值的结构为etcdpb.PartitionInfo,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

&pb.PartitionInfo{
   
   
    PartitionID:               partition.PartitionID,
    PartitionName:             partition.PartitionName,
    PartitionCreatedTimestamp: partition.PartitionCreatedTimestamp,
    CollectionId:              partition.CollectionID,
    State:                     partition.State,
}

partitionInfo.jpg

可以看出来partition包括partitionID、partitionName、collectionId等。

for _, partition := range coll.Partitions {
   
   
    k := BuildPartitionKey(coll.CollectionID, partition.PartitionID)
    partitionInfo := model.MarshalPartitionModel(partition)
    v, err := proto.Marshal(partitionInfo)
    if err != nil {
   
   
        return err
    }
    kvs[k] = string(v)
}

跟踪BuildPartitionKey()函数,不难得出key的规则。整理如下:

key规则:

  • 前缀/root-coord/partitions/{collectionID}/{partitionID}

  • 前缀/snapshots/root-coord/partitions/{collectionID}/{partitionID}_ts{时间戳}

由路径可以反映出partition属于哪个collection。

一个collection可以包含多个partition。默认partition名为:_default。

分区名称可配置(milvus.yml):

common.defaultPartitionName

在etcd还会产生field相关的key:

==root-coord/fields/445653146967736660/100==

value的值的结构为schemapb.FieldSchema ,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

&schemapb.FieldSchema{
   
   
    FieldID:        field.FieldID,
    Name:           field.Name,
    IsPrimaryKey:   field.IsPrimaryKey,
    Description:    field.Description,
    DataType:       field.DataType,
    TypeParams:     field.TypeParams,
    IndexParams:    field.IndexParams,
    AutoID:         field.AutoID,
    IsDynamic:      field.IsDynamic,
    IsPartitionKey: field.IsPartitionKey,
    DefaultValue:   field.DefaultValue,
    ElementType:    field.ElementType,
}

fieldInfo.jpg

fieldInfo记录了字段的filedID、name、description、datatype等信息。

for _, field := range coll.Fields {
   
   
    k := BuildFieldKey(coll.CollectionID, field.FieldID)
    fieldInfo := model.MarshalFieldModel(field)
    v, err := proto.Marshal(fieldInfo)
    if err != nil {
   
   
        return err
    }
    kvs[k] = string(v)
}

跟踪BuildFieldKey()函数,不难得出key的规则。整理如下:

key规则:

  • 前缀/root-coord/fields/{collectionID}/{fieldID}
  • 前缀/snapshots/root-coord/fields/{collectionID}/{fieldID}_ts{时间戳}

从路径可以反映field属于哪个collection。一个field就是一个字段。

kvs.jpg

将kvs批量写入etcd。kvs既有partition,又有field。

完整代码:

func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
   
   
    if coll.State != pb.CollectionState_CollectionCreating {
   
   
        return fmt.Errorf("cannot create collection with state: %s, collection: %s", coll.State.String(), coll.Name)
    }
    // 构建collection的key规则
    k1 := BuildCollectionKey(coll.DBID, coll.CollectionID)
    // 填充collection
    collInfo := model.MarshalCollectionModel(coll)
    // 序列化
    v1, err := proto.Marshal(collInfo)
    if err != nil {
   
   
        return fmt.Errorf("failed to marshal collection info: %s", err.Error())
    }

    // 写入etcd,最终会写入2个key,一个原始的,一个加snapshots
    if err := kc.Snapshot.Save(k1, string(v1), ts); err != nil {
   
   
        return err
    }

    kvs := map[string]string{
   
   }

    // 构建partition
    for _, partition := range coll.Partitions {
   
   
        // 构建partition的key规则
        k := BuildPartitionKey(coll.CollectionID, partition.PartitionID)
        // 填充partition
        partitionInfo := model.MarshalPartitionModel(partition)
        // 序列化
        v, err := proto.Marshal(partitionInfo)
        if err != nil {
   
   
            return err
        }
        kvs[k] = string(v)
    }

    // 构建field
    for _, field := range coll.Fields {
   
   
        // 构建field的key规则
        k := BuildFieldKey(coll.CollectionID, field.FieldID)
        // 填充field
        fieldInfo := model.MarshalFieldModel(field)
        // 序列化
        v, err := proto.Marshal(fieldInfo)
        if err != nil {
   
   
            return err
        }
        kvs[k] = string(v)
    }

    // 批量写入etcd,传入一个key,最终会写入2个key,一个原始的,一个加snapshots
    return etcd.SaveByBatchWithLimit(kvs, maxTxnNum/2, func(partialKvs map[string]string) error {
   
   
        return kc.Snapshot.MultiSave(partialKvs, ts)
    })
}

使用etcd-manager查看etcd。

partition-key.jpg

field-key.jpg

客户端SDK使用了3个field,分别是pk、random、embeddings。

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

每一个field都分配有一个fieldID,例如本例中pk分配100、random分配101、embedding分配102。

但是注意还会产生2个fieldID,一个为0、一个为1。

总结:

1.CreateCollection由proxy传递给协调器rootCoord操作etcd。

2.CreateCollection最终会在etcd上写入3种类型的key

  • collection

    前缀/root-coord/database/collection-info/{dbID}/{collectionID}

  • partition

    前缀/root-coord/partitions/{collectionID}/{partitionID}

  • field

    前缀/root-coord/fields/{collectionID}/{fieldID}

目录
相关文章
|
3天前
|
数据采集 JSON API
如何利用Python爬虫淘宝商品详情高级版(item_get_pro)API接口及返回值解析说明
本文介绍了如何利用Python爬虫技术调用淘宝商品详情高级版API接口(item_get_pro),获取商品的详细信息,包括标题、价格、销量等。文章涵盖了环境准备、API权限申请、请求构建和返回值解析等内容,强调了数据获取的合规性和安全性。
|
1天前
|
JSON 自然语言处理 Java
OpenAI API深度解析:参数、Token、计费与多种调用方式
随着人工智能技术的飞速发展,OpenAI API已成为许多开发者和企业的得力助手。本文将深入探讨OpenAI API的参数、Token、计费方式,以及如何通过Rest API(以Postman为例)、Java API调用、工具调用等方式实现与OpenAI的交互,并特别关注调用具有视觉功能的GPT-4o使用本地图片的功能。此外,本文还将介绍JSON模式、可重现输出的seed机制、使用代码统计Token数量、开发控制台循环聊天,以及基于最大Token数量的消息列表限制和会话长度管理的控制台循环聊天。
26 7
|
2天前
|
域名解析 弹性计算 安全
阿里云服务器租用、注册域名、备案及域名解析完整流程参考(图文教程)
对于很多初次建站的用户来说,选购云服务器和注册应及备案和域名解析步骤必须了解的,目前轻量云服务器2核2G68元一年,2核4G4M服务器298元一年,域名注册方面,阿里云推出域名1元购买活动,新用户注册com和cn域名2年首年仅需0元,xyz和top等域名首年仅需1元。对于建站的用户来说,购买完云服务器并注册好域名之后,下一步还需要操作备案和域名绑定。本文为大家展示阿里云服务器的购买流程,域名注册、绑定以及备案的完整流程,全文以图文教程形式为大家展示具体细节及注意事项,以供新手用户参考。
|
8天前
|
供应链 搜索推荐 数据挖掘
1688搜索词推荐API接口:开发应用与收益全解析
在电商数据驱动时代,1688搜索词推荐API接口为开发者、供应商及电商从业者提供强大工具,优化业务流程,提升竞争力。该接口基于1688平台的海量数据,提供精准搜索词推荐,助力电商平台优化搜索体验,提高供应商商品曝光度与销售转化率,同时为企业提供市场分析与商业洞察,促进精准决策与成本降低。通过集成此API,各方可实现流量增长、销售额提升及运营优化,推动电商行业的创新发展。
19 0
|
6天前
|
人工智能 自然语言处理 API
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动
谷歌推出的Multimodal Live API是一个支持多模态交互、低延迟实时互动的AI接口,能够处理文本、音频和视频输入,提供自然流畅的对话体验,适用于多种应用场景。
45 3
Multimodal Live API:谷歌推出新的 AI 接口,支持多模态交互和低延迟实时互动
|
2天前
|
前端开发 API 数据库
Next 编写接口api
Next 编写接口api
|
8天前
|
XML JSON 缓存
阿里巴巴商品详情数据接口(alibaba.item_get) 丨阿里巴巴 API 实时接口指南
阿里巴巴商品详情数据接口(alibaba.item_get)允许商家通过API获取商品的详细信息,包括标题、描述、价格、销量、评价等。主要参数为商品ID(num_iid),支持多种返回数据格式,如json、xml等,便于开发者根据需求选择。使用前需注册并获得App Key与App Secret,注意遵守使用规范。
|
7天前
|
JSON API 开发者
淘宝买家秀数据接口(taobao.item_review_show)丨淘宝 API 实时接口指南
淘宝买家秀数据接口(taobao.item_review_show)可获取买家上传的图片、视频、评论等“买家秀”内容,为潜在买家提供真实参考,帮助商家优化产品和营销策略。使用前需注册开发者账号,构建请求URL并发送GET请求,解析响应数据。调用时需遵守平台规定,保护用户隐私,确保内容真实性。
|
7天前
|
搜索推荐 数据挖掘 API
淘宝天猫商品评论数据接口丨淘宝 API 实时接口指南
淘宝天猫商品评论数据接口(Taobao.item_review)提供全面的评论信息,包括文字、图片、视频评论、评分、追评等,支持实时更新和高效筛选。用户可基于此接口进行数据分析,支持情感分析、用户画像构建等,同时确保数据使用的合规性和安全性。使用步骤包括注册开发者账号、创建应用获取 API 密钥、发送 API 请求并解析返回数据。适用于电商商家、市场分析人员和消费者。
|
17天前
|
JSON API 开发工具
淘宝实时 API 接口丨淘宝商品详情接口(Taobao.item_get)
淘宝商品详情接口(Taobao.item_get)允许开发者获取商品的详细信息,包括基本信息、描述、卖家资料、图片、属性及销售情况等。开发者需注册账号、创建应用并获取API密钥,通过构建请求获取JSON格式数据,注意遵守平台规则,合理使用接口,确保数据准确性和时效性。

推荐镜像

更多