CreateIndex API执行流程_milvus源码解析2

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: CreateIndex API执行流程_milvus源码解析2

CreateIndex API执行流程源码解析2

milvus版本:v2.3.2

上一篇介绍了CreateIndex对etcd元数据的操作,这里介绍另一个操作。

整体架构:

architecture.png

CreateIndex 的数据流向:

create_index数据流向3.jpg

1.dataCoord执行CreateIndex。

func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
   
   
    ......
    // 分配indexID,indexID=0
    indexID, err := s.meta.CanCreateIndex(req)
    ......

    err = s.meta.CreateIndex(index)
    ......

    // 将collectionID发送到channel,其它的goroutine进行消费。
    select {
   
   
    case s.notifyIndexChan <- req.GetCollectionID():
    default:
    }

    ......
}

上一篇已经分析过s.meta.CreateIndex(),这里重点分析s.notifyIndexChan。将collectionID发送到channel,其它的goroutine进行消费。

2.消费notifyIndexChan也在dataCoord。

代码路径:internal\datacoord\index_service.go

启动dataCoord服务的时候会调用此方法。

func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
   
   
    log.Info("start create index for segment loop...")
    defer s.serverLoopWg.Done()

    ticker := time.NewTicker(time.Minute)
    defer ticker.Stop()
    for {
   
   
        select {
   
   
        case <-ctx.Done():
            log.Warn("DataCoord context done, exit...")
            return
        case <-ticker.C:
            segments := s.meta.GetHasUnindexTaskSegments()
            for _, segment := range segments {
   
   
                if err := s.createIndexesForSegment(segment); err != nil {
   
   
                    log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
                    continue
                }
            }
        case collectionID := <-s.notifyIndexChan:
            log.Info("receive create index notify", zap.Int64("collectionID", collectionID))
            // 获取collection的segment信息
            segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
   
   
                return isFlush(info) && collectionID == info.CollectionID
            })
            for _, segment := range segments {
   
   
                if err := s.createIndexesForSegment(segment); err != nil {
   
   
                    log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
                    continue
                }
            }
        case segID := <-s.buildIndexCh:
            log.Info("receive new flushed segment", zap.Int64("segmentID", segID))
            segment := s.meta.GetSegment(segID)
            if segment == nil {
   
   
                log.Warn("segment is not exist, no need to build index", zap.Int64("segmentID", segID))
                continue
            }
            if err := s.createIndexesForSegment(segment); err != nil {
   
   
                log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
                continue
            }
        }
    }
}

走case collectionID := <-s.notifyIndexChan这个分支。

3.进入s.createIndexesForSegment()

func (s *Server) createIndexesForSegment(segment *SegmentInfo) error {
   
   
    // 要创建的索引的信息:索引名称,维度等信息
    indexes := s.meta.GetIndexesForCollection(segment.CollectionID, "")
    for _, index := range indexes {
   
   
        if _, ok := segment.segmentIndexes[index.IndexID]; !ok {
   
   
            if err := s.createIndexForSegment(segment, index.IndexID); err != nil {
   
   
                log.Warn("create index for segment fail", zap.Int64("segmentID", segment.ID),
                    zap.Int64("indexID", index.IndexID))
                return err
            }
        }
    }
    return nil
}

4.进入s.createIndexForSegment()

在segment上建立索引。

func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) error {
   
   
    log.Info("create index for segment", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", indexID))
    buildID, err := s.allocator.allocID(context.Background())
    if err != nil {
   
   
        return err
    }
    segIndex := &model.SegmentIndex{
   
   
        SegmentID:    segment.ID,
        CollectionID: segment.CollectionID,
        PartitionID:  segment.PartitionID,
        NumRows:      segment.NumOfRows,
        IndexID:      indexID,
        BuildID:      buildID,
        CreateTime:   uint64(segment.ID),
        WriteHandoff: false,
    }
    // 添加segment-index
    // 前缀/segment-index/{collectionID}/{partitionID}/{segmentID}/{buildID}
    if err = s.meta.AddSegmentIndex(segIndex); err != nil {
   
   
        return err
    }
    // 入队,通知IndexNode执行索引创建任务
    s.indexBuilder.enqueue(buildID)
    return nil
}

segIndex变量:

segIndex.jpg

s.meta.AddSegmentIndex(segIndex)在etcd添加segment-index。

==前缀/segment-index/{collectionID}/{partitionID}/{segmentID}/{buildID}==

func (kc *Catalog) CreateSegmentIndex(ctx context.Context, segIdx *model.SegmentIndex) error {
   
   
    // key规则
    key := BuildSegmentIndexKey(segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.BuildID)

    value, err := proto.Marshal(model.MarshalSegmentIndexModel(segIdx))
    if err != nil {
   
   
        return err
    }
    // 写入etcd
    err = kc.MetaKv.Save(key, string(value))
    if err != nil {
   
   
        log.Error("failed to save segment index meta in etcd", zap.Int64("buildID", segIdx.BuildID),
            zap.Int64("segmentID", segIdx.SegmentID), zap.Error(err))
        return err
    }
    return nil
}

5.进入s.indexBuilder.enqueue(buildID)

此函数作用:入队,通知IndexNode执行索引创建任务。

create_index数据流向2.jpg

6.进入process()

func (ib *indexBuilder) process(buildID UniqueID) bool {
   
   
    ......
    // 有一个定时器,默认1秒,配置项indexCoord.scheduler.interval
    switch state {
   
   
    case indexTaskInit:
        segment := ib.meta.GetSegment(meta.SegmentID)
        if !isSegmentHealthy(segment) || !ib.meta.IsIndexExist(meta.CollectionID, meta.IndexID) {
   
   
            log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID))
            if err := ib.meta.DeleteTask(buildID); err != nil {
   
   
                log.Ctx(ib.ctx).Warn("IndexCoord delete index failed", zap.Int64("buildID", buildID), zap.Error(err))
                return false
            }
            deleteFunc(buildID)
            return true
        }
        indexParams := ib.meta.GetIndexParams(meta.CollectionID, meta.IndexID)
        if isFlatIndex(getIndexType(indexParams)) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
   
   
            // 数量小于最低数量无需建立索引,默认为1024
            log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID),
                zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
            if err := ib.meta.FinishTask(&indexpb.IndexTaskInfo{
   
   
                BuildID:        buildID,
                State:          commonpb.IndexState_Finished,
                IndexFileKeys:  nil,
                SerializedSize: 0,
                FailReason:     "",
            }); err != nil {
   
   
                log.Ctx(ib.ctx).Warn("IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err))
                return false
            }
            updateStateFunc(buildID, indexTaskDone)
            return true
        }
        // peek client
        // if all IndexNodes are executing task, wait for one of them to finish the task.
        nodeID, client := ib.nodeManager.PeekClient(meta)
        if client == nil {
   
   
            log.Ctx(ib.ctx).WithRateGroup("dc.indexBuilder", 1, 60).RatedInfo(5, "index builder peek client error, there is no available")
            return false
        }
        // update version and set nodeID
        if err := ib.meta.UpdateVersion(buildID, nodeID); err != nil {
   
   
            log.Ctx(ib.ctx).Warn("index builder update index version failed", zap.Int64("build", buildID), zap.Error(err))
            return false
        }
        // binLogs为向量数据文件的位置信息
        // files/insert_log/444517122896489678/444517122896489679/444517122896489694/102/444517122896236031
        // files/insert_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/444517122896236031
        binLogs := make([]string, 0)
        fieldID := ib.meta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID)
        for _, fieldBinLog := range segment.GetBinlogs() {
   
   
            if fieldBinLog.GetFieldID() == fieldID {
   
   
                for _, binLog := range fieldBinLog.GetBinlogs() {
   
   
                    binLogs = append(binLogs, binLog.LogPath)
                }
                break
            }
        }

        typeParams := ib.meta.GetTypeParams(meta.CollectionID, meta.IndexID)

        var storageConfig *indexpb.StorageConfig
        // 获取元数据存储类型:minio
        if Params.CommonCfg.StorageType.GetValue() == "local" {
   
   
            storageConfig = &indexpb.StorageConfig{
   
   
                RootPath:    Params.LocalStorageCfg.Path.GetValue(),
                StorageType: Params.CommonCfg.StorageType.GetValue(),
            }
        } else {
   
   
            storageConfig = &indexpb.StorageConfig{
   
   
                Address:          Params.MinioCfg.Address.GetValue(),
                AccessKeyID:      Params.MinioCfg.AccessKeyID.GetValue(),
                SecretAccessKey:  Params.MinioCfg.SecretAccessKey.GetValue(),
                UseSSL:           Params.MinioCfg.UseSSL.GetAsBool(),
                BucketName:       Params.MinioCfg.BucketName.GetValue(),
                RootPath:         Params.MinioCfg.RootPath.GetValue(),
                UseIAM:           Params.MinioCfg.UseIAM.GetAsBool(),
                IAMEndpoint:      Params.MinioCfg.IAMEndpoint.GetValue(),
                StorageType:      Params.CommonCfg.StorageType.GetValue(),
                Region:           Params.MinioCfg.Region.GetValue(),
                UseVirtualHost:   Params.MinioCfg.UseVirtualHost.GetAsBool(),
                CloudProvider:    Params.MinioCfg.CloudProvider.GetValue(),
                RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(),
            }
        }
        req := &indexpb.CreateJobRequest{
   
   
            ClusterID:           Params.CommonCfg.ClusterPrefix.GetValue(),
            IndexFilePrefix:     path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
            BuildID:             buildID,
            DataPaths:           binLogs,
            IndexVersion:        meta.IndexVersion + 1,
            StorageConfig:       storageConfig,
            IndexParams:         indexParams,
            TypeParams:          typeParams,
            NumRows:             meta.NumRows,
            CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
        }
        // 通知IndexNode创建索引
        if err := ib.assignTask(client, req); err != nil {
   
   
            ......
        }
        ......
        // update index meta state to InProgress
        // 更新索引状态
        if err := ib.meta.BuildIndex(buildID); err != nil {
   
   
            ......
        }
        updateStateFunc(buildID, indexTaskInProgress)

    case indexTaskDone:
        ......

    default:
        ......
    }
    return true
}

ib.assignTask()通知indexNode创建索引。

7.进入ib.assignTask()

// assignTask sends the index task to the IndexNode, it has a timeout interval, if the IndexNode doesn't respond within
// the interval, it is considered that the task sending failed.
func (ib *indexBuilder) assignTask(builderClient types.IndexNodeClient, req *indexpb.CreateJobRequest) error {
   
   
    ......
    // rpc调用indexNode
    resp, err := builderClient.CreateJob(ctx, req)
    ......
}

8.进入builderClient.CreateJob()

代码路径:internal\indexnode\indexnode_service.go

func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest) (*commonpb.Status, error) {
   
   
    ......
    task := &indexBuildTask{
   
   
        ident:          fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
        ctx:            taskCtx,
        cancel:         taskCancel,
        BuildID:        req.GetBuildID(),
        ClusterID:      req.GetClusterID(),
        node:           i,
        req:            req,
        cm:             cm,
        nodeID:         i.GetNodeID(),
        tr:             timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
        serializedSize: 0,
    }
    ret := merr.Success()
    if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
   
   
        ......
    }
    ......
}

task执行逻辑:

pipelines := []func(context.Context) error{
   
   t.Prepare, t.BuildIndex, t.SaveIndexFiles}

依次执行task的Prepare()、BuildIndex()、SaveIndexFiles()方法。

9.进入BuildIndex()

func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
   
   
    ......
    var buildIndexInfo *indexcgowrapper.BuildIndexInfo
    buildIndexInfo, err = indexcgowrapper.NewBuildIndexInfo(it.req.GetStorageConfig())
    ......
    err = buildIndexInfo.AppendFieldMetaInfo(it.collectionID, it.partitionID, it.segmentID, it.fieldID, it.fieldType)
    ......

    err = buildIndexInfo.AppendIndexMetaInfo(it.req.IndexID, it.req.BuildID, it.req.IndexVersion)
    ......

    err = buildIndexInfo.AppendBuildIndexParam(it.newIndexParams)
    ......

    jsonIndexParams, err := json.Marshal(it.newIndexParams)
    ......

    err = buildIndexInfo.AppendBuildTypeParam(it.newTypeParams)
    ......

    for _, path := range it.req.GetDataPaths() {
   
   
        err = buildIndexInfo.AppendInsertFile(path)
        if err != nil {
   
   
            log.Ctx(ctx).Warn("append insert binlog path failed", zap.Error(err))
            return err
        }
    }

    it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion())
    if err := buildIndexInfo.AppendIndexEngineVersion(it.currentIndexVersion); err != nil {
   
   
        log.Ctx(ctx).Warn("append index engine version failed", zap.Error(err))
        return err
    }
    // cgo层调用CreateIndex()
    // it.index有个Upload()方法写入s3(SaveIndexFiles方法里调用it.index.Upload()写入s3)
    it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexInfo)
    if err != nil {
   
   
        ......
    }

    ......
}

indexcgowrapper.CreateIndex()调用C层创建索引。

10.进入SaveIndexFiles()

func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
   
   
    ......
    // c++层上传索引文件到s3
    indexFilePath2Size, err := it.index.UpLoad()
    ......

}

总结:

  • CreateIndex由proxy传递给协调器dataCoord操作etcd。
    • 前缀/field-index/{collectionID}/{IndexID}
    • 前缀/segment-index/{collectionID}/{partitionID}/{segmentID}/{buildID}
  • dataCoord通知indexNode创建索引,indexNode通过cgo调用C++层创建索引,并上传至s3。
目录
相关文章
|
11天前
|
监控 安全 开发工具
鸿蒙HarmonyOS应用开发 | HarmonyOS Next-从应用开发到上架全流程解析
HarmonyOS Next是华为推出的最新版本鸿蒙操作系统,强调多设备协同和分布式技术,提供丰富的开发工具和API接口。本文详细解析了从应用开发到上架的全流程,包括环境搭建、应用设计与开发、多设备适配、测试调试、应用上架及推广等环节,并介绍了鸿蒙原生应用开发者激励计划,帮助开发者更好地融入鸿蒙生态。通过DevEco Studio集成开发环境和华为提供的多种支持工具,开发者可以轻松创建并发布高质量的鸿蒙应用,享受技术和市场推广的双重支持。
159 11
|
9天前
|
存储 人工智能 API
AgentScope:阿里开源多智能体低代码开发平台,支持一键导出源码、多种模型API和本地模型部署
AgentScope是阿里巴巴集团开源的多智能体开发平台,旨在帮助开发者轻松构建和部署多智能体应用。该平台提供分布式支持,内置多种模型API和本地模型部署选项,支持多模态数据处理。
90 4
AgentScope:阿里开源多智能体低代码开发平台,支持一键导出源码、多种模型API和本地模型部署
|
8天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
8天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
8天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
6天前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。
|
15天前
|
数据采集 JSON API
如何利用Python爬虫淘宝商品详情高级版(item_get_pro)API接口及返回值解析说明
本文介绍了如何利用Python爬虫技术调用淘宝商品详情高级版API接口(item_get_pro),获取商品的详细信息,包括标题、价格、销量等。文章涵盖了环境准备、API权限申请、请求构建和返回值解析等内容,强调了数据获取的合规性和安全性。
|
14天前
|
JSON 自然语言处理 Java
OpenAI API深度解析:参数、Token、计费与多种调用方式
随着人工智能技术的飞速发展,OpenAI API已成为许多开发者和企业的得力助手。本文将深入探讨OpenAI API的参数、Token、计费方式,以及如何通过Rest API(以Postman为例)、Java API调用、工具调用等方式实现与OpenAI的交互,并特别关注调用具有视觉功能的GPT-4o使用本地图片的功能。此外,本文还将介绍JSON模式、可重现输出的seed机制、使用代码统计Token数量、开发控制台循环聊天,以及基于最大Token数量的消息列表限制和会话长度管理的控制台循环聊天。
103 7
|
14天前
|
域名解析 弹性计算 安全
阿里云服务器租用、注册域名、备案及域名解析完整流程参考(图文教程)
对于很多初次建站的用户来说,选购云服务器和注册应及备案和域名解析步骤必须了解的,目前轻量云服务器2核2G68元一年,2核4G4M服务器298元一年,域名注册方面,阿里云推出域名1元购买活动,新用户注册com和cn域名2年首年仅需0元,xyz和top等域名首年仅需1元。对于建站的用户来说,购买完云服务器并注册好域名之后,下一步还需要操作备案和域名绑定。本文为大家展示阿里云服务器的购买流程,域名注册、绑定以及备案的完整流程,全文以图文教程形式为大家展示具体细节及注意事项,以供新手用户参考。
|
9天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。

热门文章

最新文章

推荐镜像

更多