CreateIndex API执行流程_milvus源码解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: CreateIndex API执行流程_milvus源码解析

CreateIndex API执行流程源码解析

milvus版本:v2.3.2

整体架构:

architecture.png

CreateIndex 的数据流向:

create_index数据流向.jpg

1.客户端sdk发出CreateIndex API请求。

import numpy as np
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 2000, 8


print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")


fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong")


print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
    # provide the pk field because `auto_id` is set to False
    [str(i) for i in range(num_entities)],
    rng.random(num_entities).tolist(),  # field random, only supports list
    rng.random((num_entities, dim)),    # field embeddings, supports numpy.ndarray and list
]

insert_result = hello_milvus.insert(entities)

hello_milvus.flush()

print("Start Creating index IVF_FLAT")
index = {
   
   
    "index_type": "IVF_FLAT",
    "metric_type": "L2",
    "params": {
   
   "nlist": 8},
}

hello_milvus.create_index("embeddings", index,index_name="idx_embeddings")

客户端SDK向proxy发送一个CreateIndex API请求,在embeddings列上创建一个名为idx_embeddings的索引。

2.客户端接受API请求,将request封装为createIndexTask,并压入ddQueue队列。

代码路径:internal\proxy\impl.go

// CreateIndex create index for collection.
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
   
   
    ......

    // request封装为task
    cit := &createIndexTask{
   
   
        ctx:                ctx,
        Condition:          NewTaskCondition(ctx),
        req:                request,
        rootCoord:          node.rootCoord,
        datacoord:          node.dataCoord,
        replicateMsgStream: node.replicateMsgStream,
    }

    ......
    // 将task压入ddQueue队列

    if err := node.sched.ddQueue.Enqueue(cit); err != nil {
   
   
        ......
    }

    ......
    // 等待cct执行完
    if err := cit.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

3.执行createIndexTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

代码路径:internal\proxy\task_index.go

func (cit *createIndexTask) Execute(ctx context.Context) error {
   
   
    ......
    req := &indexpb.CreateIndexRequest{
   
   
        CollectionID:    cit.collectionID,
        FieldID:         cit.fieldSchema.GetFieldID(),
        IndexName:       cit.req.GetIndexName(),
        TypeParams:      cit.newTypeParams,
        IndexParams:     cit.newIndexParams,
        IsAutoIndex:     cit.isAutoIndex,
        UserIndexParams: cit.newExtraParams,
        Timestamp:       cit.BeginTs(),
    }
    cit.result, err = cit.datacoord.CreateIndex(ctx, req)
    ......
    SendReplicateMessagePack(ctx, cit.replicateMsgStream, cit.req)
    return nil
}

从代码可以看出调用了datacoord的CreateIndex接口。

4.进入datacoord的CreateIndex接口。

代码路径:internal\datacoord\index_service.go

// CreateIndex create an index on collection.
// Index building is asynchronous, so when an index building request comes, an IndexID is assigned to the task and
// will get all flushed segments from DataCoord and record tasks with these segments. The background process
// indexBuilder will find this task and assign it to IndexNode for execution.
func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
   
   
    ......
    // 分配indexID,indexID=0
    indexID, err := s.meta.CanCreateIndex(req)
    ......

    if indexID == 0 {
   
   
        // 分配indexID
        indexID, err = s.allocator.allocID(ctx)
        ......
    }

    index := &model.Index{
   
   
        CollectionID:    req.GetCollectionID(),
        FieldID:         req.GetFieldID(),
        IndexID:         indexID,
        IndexName:       req.GetIndexName(),
        TypeParams:      req.GetTypeParams(),
        IndexParams:     req.GetIndexParams(),
        CreateTime:      req.GetTimestamp(),
        IsAutoIndex:     req.GetIsAutoIndex(),
        UserIndexParams: req.GetUserIndexParams(),
    }

    // Get flushed segments and create index

    err = s.meta.CreateIndex(index)
    ......

    // 将collectionID发送到channel,其它的goroutine进行消费。
    select {
   
   
    case s.notifyIndexChan <- req.GetCollectionID():
    default:
    }

    ......
}

变量index:

index_model.jpg

5.进入s.meta.CreateIndex()

代码路径:internal\datacoord\index_meta.go

func (m *meta) CreateIndex(index *model.Index) error {
   
   
    ......
    // 写入etcd元数据
    if err := m.catalog.CreateIndex(m.ctx, index); err != nil {
   
   
        ......
    }

    m.updateCollectionIndex(index)
    ......
}

在这里重点研究m.catalog.CreateIndex()这个方法做了什么事情。

func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error {
   
   
    key := BuildIndexKey(index.CollectionID, index.IndexID)

    value, err := proto.Marshal(model.MarshalIndexModel(index))
    if err != nil {
   
   
        return err
    }

    err = kc.MetaKv.Save(key, string(value))
    if err != nil {
   
   
        return err
    }
    return nil
}

在etcd会产生1个key。

==field-index/445834678636119060/445834678636519085==

value的值的结构为indexpb.FieldIndex,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

&indexpb.FieldIndex{
   
   
    IndexInfo: &indexpb.IndexInfo{
   
   
        CollectionID:    index.CollectionID,
        FieldID:         index.FieldID,
        IndexName:       index.IndexName,
        IndexID:         index.IndexID,
        TypeParams:      index.TypeParams,
        IndexParams:     index.IndexParams,
        IsAutoIndex:     index.IsAutoIndex,
        UserIndexParams: index.UserIndexParams,
    },
    Deleted:    index.IsDeleted,
    CreateTime: index.CreateTime,
}

fieldindex.jpg

跟踪BuildIndexKey()函数,即可以得到key的规则。整理如下:

key规则:

  • 前缀/field-index/{collectionID}/{IndexID}

可以反映index属于哪个collection。Index的value可以反映属于哪个field。

不能反映属于哪个partition、哪个segment。

总结:

  • CreateIndex由proxy传递给协调器dataCoord操作etcd。
  • CreateIndex最终会在etcd上写入1种类型的key(其实还有一种,在另一篇中进行介绍)。
目录
相关文章
|
8天前
|
IDE JavaScript API
1688寻源通API对接流程以及说明
1688寻源通API(这里主要指的是跨境寻原通数据接口)的对接流程及说明如下:
|
8天前
|
搜索推荐 测试技术 API
探秘电商API:从测试到应用的深度解析与实战指南
电商API是电子商务背后的隐形引擎,支撑着从商品搜索、购物车更新到支付处理等各个环节的顺畅运行。它通过定义良好的接口,实现不同系统间的数据交互与功能集成,确保订单、库存和物流等信息的实时同步。RESTful、GraphQL和WebSocket等类型的API各自适用于不同的应用场景,满足多样化的需求。在测试方面,使用Postman、SoapUI和jMeter等工具进行全面的功能、性能和安全测试,确保API的稳定性和可靠性。未来,随着人工智能、大数据和物联网技术的发展,电商API将进一步智能化和标准化,为用户提供更个性化的购物体验,并推动电商行业的持续创新与进步。
26 4
|
8天前
|
搜索推荐 API 开发者
深度解析:利用商品详情 API 接口实现数据获取与应用
在电商蓬勃发展的今天,数据成为驱动业务增长的核心。商品详情API接口作为连接海量商品数据的桥梁,帮助运营者、商家和开发者获取精准的商品信息(如价格、描述、图片、评价等),优化策略、提升用户体验。通过理解API概念、工作原理及不同平台特点,掌握获取权限、构建请求、处理响应和错误的方法,可以将数据应用于商品展示、数据分析、竞品分析和个性化推荐等场景,助力电商创新与发展。未来,随着技术进步,API接口将与人工智能、大数据深度融合,带来更多变革。
33 3
|
14天前
|
数据挖掘 API 数据安全/隐私保护
深度解析:获取亚马逊畅销榜API接口及实战应用
Lazada 淘宝详情 API 是连接 Lazada 和淘宝商品数据的桥梁,帮助电商从业者获取淘宝商品的详细信息(如标题、描述、价格等),并应用于 Lazada 平台。它在市场调研、产品选品、价格策略和数据分析等方面为商家提供支持,助力优化运营策略。通过 Python 示例代码展示了 API 的实际应用,并强调了数据准确性、API 使用限制及数据安全的重要性。
39 10
|
22天前
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。
|
7天前
|
搜索推荐 数据挖掘 API
微店商品详情接口(微店API系列)
微店商品详情接口是微店API的重要组成部分,帮助开发者和商家获取商品的详细信息(如标题、价格、库存等),并将其集成到应用程序或数据分析系统中。该接口支持HTTP GET/POST请求,返回JSON/XML格式数据,需通过AppKey和AppSecret进行身份验证和签名加密。应用场景包括商品信息同步、数据分析与市场调研、个性化推荐系统等,助力商业决策和业务拓展。
32 13
|
9天前
|
供应链 数据挖掘 API
1688app 商品详情接口系列(1688API)
1688作为国内知名批发采购平台,提供了一系列商品详情接口(API),助力企业和开发者获取商品基础、价格、库存及供应商信息。通过Python示例代码展示如何调用这些接口,应用场景涵盖采购决策辅助、数据分析与市场调研、电商平台整合及供应链管理系统的优化,为企业和采购商提供有力的数据支持,提升业务效率和竞争力。
54 15
|
16天前
|
JSON 搜索推荐 API
京东店铺所有商品接口系列(京东 API)
本文介绍如何使用Python调用京东API获取店铺商品信息。前期需搭建Python环境,安装`requests`库并熟悉`json`库的使用。接口采用POST请求,参数包括`app_key`、`method`、`timestamp`、`v`、`sign`和业务参数`360buy_param_json`。通过示例代码展示如何生成签名并发送请求。应用场景涵盖店铺管理、竞品分析、数据统计及商品推荐系统,帮助商家优化运营和提升竞争力。
61 23
|
8天前
|
JSON 数据挖掘 开发者
1688 商品评论接口系列(1688API)
1688商品评论接口助力电商数据分析与优化。通过该接口,开发者可获取指定商品的评论数据(如昵称、内容、评分等),支持情感分析和质量反馈收集。接口采用HTTP GET/POST请求,返回JSON格式数据。Python示例代码展示如何调用接口并处理响应。应用场景包括商家产品优化、客户服务提升、市场调研及电商平台数据分析。
|
8天前
|
供应链 数据挖掘 BI
1688 买家订单,订单物流,订单回传接口系列(1688 寻源通 API)
1688作为国内领先的批发采购平台,提供了买家订单、订单物流及订单回传三大API接口,助力企业实现订单管理、物流跟踪和信息反馈的自动化。通过这些接口,企业可以获取订单详情、物流状态,并将处理结果回传至平台,提升运营效率。Python示例代码展示了如何使用这些接口进行数据交互,适用于电商内部管理、物流跟踪及数据分析等场景。

热门文章

最新文章

推荐镜像

更多