Insert API执行流程_milvus源码解析

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: Insert API执行流程_milvus源码解析

Insert API执行流程源码解析

milvus版本:v2.3.2

Insert这个API写入数据,流程较长,是milvus的核心API之一,本文介绍大致的写入流程。

整体架构:

architecture.png

Insert 的数据流向:

insert数据流向.jpg

1.客户端sdk发出Insert API请求。

import numpy as np
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 2000, 8

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)


print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
    # provide the pk field because `auto_id` is set to False
    [str(i) for i in range(num_entities)],
    rng.random(num_entities).tolist(),  # field random, only supports list
    rng.random((num_entities, dim)),    # field embeddings, supports numpy.ndarray and list
]

insert_result = hello_milvus.insert(entities)

hello_milvus.flush()

客户端SDK向proxy发送一个Insert API请求,向数据库写入数据。

这个例子向数据库写入2000条数据,每条数据是一个8维向量。

insert_milvus.jpg

2.客户端接受API请求,将request封装为insertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Insert insert records into collection.
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
   
   
    ......
    // request封装为task
    it := &insertTask{
   
   
        ctx:       ctx,
        Condition: NewTaskCondition(ctx),
        insertMsg: &msgstream.InsertMsg{
   
   
            BaseMsg: msgstream.BaseMsg{
   
   
                HashValues: request.HashKeys,
            },
            InsertRequest: msgpb.InsertRequest{
   
   
                Base: commonpbutil.NewMsgBase(
                    commonpbutil.WithMsgType(commonpb.MsgType_Insert),
                    commonpbutil.WithMsgID(0),
                    commonpbutil.WithSourceID(paramtable.GetNodeID()),
                ),
                DbName:         request.GetDbName(),
                CollectionName: request.CollectionName,
                PartitionName:  request.PartitionName,
                FieldsData:     request.FieldsData,
                NumRows:        uint64(request.NumRows),
                Version:        msgpb.InsertDataVersion_ColumnBased,
            },
        },
        idAllocator:   node.rowIDAllocator,
        segIDAssigner: node.segAssigner,
        chMgr:         node.chMgr,
        chTicker:      node.chTicker,
    }

    ......
    // 将task压入dmQueue队列

    if err := node.sched.dmQueue.Enqueue(it); err != nil {
   
   
        ......
    }

    ......
    // 等待任务执行完
    if err := it.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

InsertRequest结构:

type InsertRequest struct {
   
   
    Base                 *commonpb.MsgBase     
    DbName               string                
    CollectionName       string                
    PartitionName        string                
    FieldsData           []*schemapb.FieldData 
    HashKeys             []uint32              
    NumRows              uint32                
    XXX_NoUnkeyedLiteral struct{
   
   }              
    XXX_unrecognized     []byte                
    XXX_sizecache        int32                 
}

type FieldData struct {
   
   
    Type      DataType 
    FieldName string   
    // Types that are valid to be assigned to Field:
    //
    //    *FieldData_Scalars
    //    *FieldData_Vectors
    Field                isFieldData_Field 
    FieldId              int64             
    IsDynamic            bool              
    XXX_NoUnkeyedLiteral struct{
   
   }          
    XXX_unrecognized     []byte            
    XXX_sizecache        int32             
}

type isFieldData_Field interface {
   
   
    isFieldData_Field()
}

type FieldData_Scalars struct {
   
   
    Scalars *ScalarField
}

type FieldData_Vectors struct {
   
   
    Vectors *VectorField
}

客户端通过grpc发送数据,FieldData.Field存储接受的数据。

isFieldData_Field是一个接口,有2个实现:FieldData_Scalars和FieldData_Vectors。

真正存储数据的就是这2个实现。

3.执行insertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_insert.go

func (it *insertTask) Execute(ctx context.Context) error {
   
   
    ......
    collectionName := it.insertMsg.CollectionName
    // 根据collectionName得到collectionID
    collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), collectionName)
    log := log.Ctx(ctx)
    if err != nil {
   
   
        ......
    }
    it.insertMsg.CollectionID = collID

    getCacheDur := tr.RecordSpan()
    // 得到stream,类型为mqMsgStream
    stream, err := it.chMgr.getOrCreateDmlStream(collID)
    if err != nil {
   
   
        return err
    }
    getMsgStreamDur := tr.RecordSpan()
    // by-dev-rootcoord-dml_0_445811557825249939v0
    // by-dev-rootcoord-dml_1_445811557825249939v1
    // 如果shardNum=2,则获取2个虚拟channel
    channelNames, err := it.chMgr.getVChannels(collID)
    if err != nil {
   
   
        ......
    }

    ......

    // assign segmentID for insert data and repack data by segmentID
    // msgPck包含segmentID
    var msgPack *msgstream.MsgPack
    if it.partitionKeys == nil {
   
   
        // 分配segmentID
        // 重新打包为2个msgstream.TsMsg,分别发送给2个虚拟通道
        msgPack, err = repackInsertData(it.TraceCtx(), channelNames, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    } else {
   
   
        msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    }
    if err != nil {
   
   
        ......
    }
    ......
    // 生产数据,将数据写入mq
    err = stream.Produce(msgPack)
    if err != nil {
   
   
        ......
    }
    ......
}

总结:

1.Insert由proxy向mq(pulsar)写入数据。通过虚拟channel写入。

2.在pulsar创建topic,向topic写入数据。

目录
相关文章
|
2天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
13 2
|
24天前
|
存储 缓存 搜索推荐
Lazada淘宝详情API的价值与应用解析
在电商行业,数据是驱动业务增长的核心。Lazada作为东南亚知名电商平台,其商品详情API对电商行业影响深远。本文探讨了Lazada商品详情API的重要性,包括提供全面准确的商品信息、增强平台竞争力、促进销售转化、支持用户搜索和发现需求、数据驱动决策、竞品分析、用户行为研究及提升购物体验。文章还介绍了如何通过Lazada提供的API接口、编写代码及使用第三方工具实现实时数据获取。
51 3
|
3天前
|
API 数据安全/隐私保护
抖音视频,图集无水印直链解析免费API接口教程
该接口用于解析抖音视频和图集的无水印直链地址。请求地址为 `https://cn.apihz.cn/api/fun/douyin.php`,支持POST或GET请求。请求参数包括用户ID、用户KEY和视频或图集地址。返回参数包括状态码、信息提示、作者昵称、标题、视频地址、封面、图集和类型。示例请求和返回数据详见文档。
|
21天前
|
存储 数据可视化 API
API接口数据获取流程的细化
本文概述了API的基础知识、获取API访问权限的方法、编写代码调用API的步骤、数据处理与分析技巧以及数据安全与合规的重要性,并提供了社交媒体数据分析、天气预报应用和电商数据分析等API数据获取的应用实例,旨在帮助读者全面了解和实践API接口数据获取的流程。
|
17天前
|
JSON 前端开发 JavaScript
API接口商品详情接口数据解析
商品详情接口通常用于提供特定商品的详细信息,这些信息比商品列表接口中的信息更加详细和全面。以下是一个示例的JSON数据格式,用于表示一个商品详情API接口的响应。这个示例假定API返回一个包含商品详细信息的对象。
|
17天前
|
存储 人工智能 大数据
拼多多详情API的价值与应用解析
拼多多作为中国电商市场的重要参与者,其开放平台提供的商品详情API接口为电商行业带来了新的机遇和挑战。该接口允许开发者通过编程方式获取商品的详细信息,包括标题、价格、描述、图片、规格参数和库存等,推动了电商运营的智能化和高效化。本文将深入解析拼多多详情API的价值与应用,帮助商家和开发者更好地理解和利用这一宝贵资源。
27 0
|
17天前
|
存储 监控 安全
API接口数据获取全流程用户指南
本文介绍了从明确需求到数据存储与管理的API接口数据获取全流程。首先,明确业务需求和选择合适的数据源;接着,准备API接口,包括审查文档、申请密钥和安全存储;然后,构建与发送请求,处理响应与数据;最后,进行数据存储与管理,并持续监控与优化,确保数据的安全与合规。通过这些步骤,用户可以高效地获取和管理数据,为数据分析和业务优化提供支持。
|
3天前
|
JSON API 数据格式
淘宝 / 天猫官方商品 / 订单订单 API 接口丨商品上传接口对接步骤
要对接淘宝/天猫官方商品或订单API,需先注册淘宝开放平台账号,创建应用获取App Key和App Secret。之后,详细阅读API文档,了解接口功能及权限要求,编写认证、构建请求、发送请求和处理响应的代码。最后,在沙箱环境中测试与调试,确保API调用的正确性和稳定性。
|
15天前
|
供应链 数据挖掘 API
电商API接口介绍——sku接口概述
商品SKU(Stock Keeping Unit)接口是电商API接口中的一种,专门用于获取商品的SKU信息。SKU是库存量单位,用于区分同一商品的不同规格、颜色、尺寸等属性。通过商品SKU接口,开发者可以获取商品的SKU列表、SKU属性、库存数量等详细信息。
|
17天前
|
JSON API 数据格式
店铺所有商品列表接口json数据格式示例(API接口)
当然,以下是一个示例的JSON数据格式,用于表示一个店铺所有商品列表的API接口响应

推荐镜像

更多