Insert API执行流程_milvus源码解析

简介: Insert API执行流程_milvus源码解析

Insert API执行流程源码解析

milvus版本:v2.3.2

Insert这个API写入数据,流程较长,是milvus的核心API之一,本文介绍大致的写入流程。

整体架构:

architecture.png

Insert 的数据流向:

insert数据流向.jpg

1.客户端sdk发出Insert API请求。

import numpy as np
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 2000, 8

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)


print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
    # provide the pk field because `auto_id` is set to False
    [str(i) for i in range(num_entities)],
    rng.random(num_entities).tolist(),  # field random, only supports list
    rng.random((num_entities, dim)),    # field embeddings, supports numpy.ndarray and list
]

insert_result = hello_milvus.insert(entities)

hello_milvus.flush()

客户端SDK向proxy发送一个Insert API请求,向数据库写入数据。

这个例子向数据库写入2000条数据,每条数据是一个8维向量。

insert_milvus.jpg

2.客户端接受API请求,将request封装为insertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Insert insert records into collection.
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
   
   
    ......
    // request封装为task
    it := &insertTask{
   
   
        ctx:       ctx,
        Condition: NewTaskCondition(ctx),
        insertMsg: &msgstream.InsertMsg{
   
   
            BaseMsg: msgstream.BaseMsg{
   
   
                HashValues: request.HashKeys,
            },
            InsertRequest: msgpb.InsertRequest{
   
   
                Base: commonpbutil.NewMsgBase(
                    commonpbutil.WithMsgType(commonpb.MsgType_Insert),
                    commonpbutil.WithMsgID(0),
                    commonpbutil.WithSourceID(paramtable.GetNodeID()),
                ),
                DbName:         request.GetDbName(),
                CollectionName: request.CollectionName,
                PartitionName:  request.PartitionName,
                FieldsData:     request.FieldsData,
                NumRows:        uint64(request.NumRows),
                Version:        msgpb.InsertDataVersion_ColumnBased,
            },
        },
        idAllocator:   node.rowIDAllocator,
        segIDAssigner: node.segAssigner,
        chMgr:         node.chMgr,
        chTicker:      node.chTicker,
    }

    ......
    // 将task压入dmQueue队列

    if err := node.sched.dmQueue.Enqueue(it); err != nil {
   
   
        ......
    }

    ......
    // 等待任务执行完
    if err := it.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

InsertRequest结构:

type InsertRequest struct {
   
   
    Base                 *commonpb.MsgBase     
    DbName               string                
    CollectionName       string                
    PartitionName        string                
    FieldsData           []*schemapb.FieldData 
    HashKeys             []uint32              
    NumRows              uint32                
    XXX_NoUnkeyedLiteral struct{
   
   }              
    XXX_unrecognized     []byte                
    XXX_sizecache        int32                 
}

type FieldData struct {
   
   
    Type      DataType 
    FieldName string   
    // Types that are valid to be assigned to Field:
    //
    //    *FieldData_Scalars
    //    *FieldData_Vectors
    Field                isFieldData_Field 
    FieldId              int64             
    IsDynamic            bool              
    XXX_NoUnkeyedLiteral struct{
   
   }          
    XXX_unrecognized     []byte            
    XXX_sizecache        int32             
}

type isFieldData_Field interface {
   
   
    isFieldData_Field()
}

type FieldData_Scalars struct {
   
   
    Scalars *ScalarField
}

type FieldData_Vectors struct {
   
   
    Vectors *VectorField
}

客户端通过grpc发送数据,FieldData.Field存储接受的数据。

isFieldData_Field是一个接口,有2个实现:FieldData_Scalars和FieldData_Vectors。

真正存储数据的就是这2个实现。

3.执行insertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_insert.go

func (it *insertTask) Execute(ctx context.Context) error {
   
   
    ......
    collectionName := it.insertMsg.CollectionName
    // 根据collectionName得到collectionID
    collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), collectionName)
    log := log.Ctx(ctx)
    if err != nil {
   
   
        ......
    }
    it.insertMsg.CollectionID = collID

    getCacheDur := tr.RecordSpan()
    // 得到stream,类型为mqMsgStream
    stream, err := it.chMgr.getOrCreateDmlStream(collID)
    if err != nil {
   
   
        return err
    }
    getMsgStreamDur := tr.RecordSpan()
    // by-dev-rootcoord-dml_0_445811557825249939v0
    // by-dev-rootcoord-dml_1_445811557825249939v1
    // 如果shardNum=2,则获取2个虚拟channel
    channelNames, err := it.chMgr.getVChannels(collID)
    if err != nil {
   
   
        ......
    }

    ......

    // assign segmentID for insert data and repack data by segmentID
    // msgPck包含segmentID
    var msgPack *msgstream.MsgPack
    if it.partitionKeys == nil {
   
   
        // 分配segmentID
        // 重新打包为2个msgstream.TsMsg,分别发送给2个虚拟通道
        msgPack, err = repackInsertData(it.TraceCtx(), channelNames, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    } else {
   
   
        msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    }
    if err != nil {
   
   
        ......
    }
    ......
    // 生产数据,将数据写入mq
    err = stream.Produce(msgPack)
    if err != nil {
   
   
        ......
    }
    ......
}

总结:

1.Insert由proxy向mq(pulsar)写入数据。通过虚拟channel写入。

2.在pulsar创建topic,向topic写入数据。

目录
相关文章
|
10天前
|
C语言
内核源码中遇到不会解析的宏怎么办?
内核源码中遇到不会解析的宏怎么办?
189 1
|
12天前
|
Java 程序员 API
Java并发基础:concurrent Flow API全面解析
java.util.concurrent.Flow定义了响应式编程的核心接口,促进了Java在异步数据处理和背压机制方面的标准化,这使得第三方库如Reactor和RxJava能够基于这些接口提供丰富的实现和功能,同时简化了响应式编程在Java中的使用,Flow API增强了Java在并发编程领域的灵活性,使得处理异步数据流变得更加自然和高效。
Java并发基础:concurrent Flow API全面解析
|
3天前
|
Java 关系型数据库 数据库连接
Spring源码解析--深入Spring事务原理
本文将带领大家领略Spring事务的风采,Spring事务是我们在日常开发中经常会遇到的,也是各种大小面试中的高频题,希望通过本文,能让大家对Spring事务有个深入的了解,无论开发还是面试,都不会让Spring事务成为拦路虎。
12 1
|
2天前
|
移动开发 API HTML5
HTML5响应式自动采集API壁纸系统源码自适应手机端
HTML5响应式自动采集API壁纸系统源码自适应手机端
26 11
HTML5响应式自动采集API壁纸系统源码自适应手机端
|
3天前
|
存储 Oracle Java
Java 包和 API 深度解析:组织代码,避免命名冲突
Java 中的包 用于将相关的类分组在一起。可以将其视为文件目录中的一个文件夹。我们使用包来避免名称冲突,并编写更易于维护的代码。 包分为两类: 内置包(来自 Java API 的包) 用户定义的包(创建自己的包)
177 2
|
4天前
|
API 开发工具 数据安全/隐私保护
API接口的对接流程和注意事项
随着互联网技术的发展和应用的普及,API接口已经成为不同系统、不同应用之间进行交互和数据交换的重要方式。API接口使得不同的系统能够互相调用对方的功能,提高了系统的灵活性和扩展性。但是,在进行API接口对接的过程中,需要注意一些流程和事项,以确保对接的顺利进行和系统的稳定运行。
|
5天前
|
数据挖掘 API 数据处理
获取商品详情信息API接口:1688开放平台功能解析
首先,要获取商品详情信息,开发者需要向1688开放平台申请相应的权限,并遵循其调用规则。在调用商品详情信息API接口时,需要传入相应的请求参数,如商品ID、调用时间戳、密钥等。这些参数对于确保接口的正常运行至关重要。
|
6天前
|
数据采集 安全 API
如何实时获取小红书笔记详情的API使用与解析
小红书是一个以分享消费经验、生活方式为主的社交平台,拥有大量的用户和内容。为了更好地了解用户在小红书上的行为和内容,许多开发者选择使用小红书开放平台提供的API接口。本文将介绍如何通过小红书笔记详情API实现实时数据获取,并给出相应的代码示例。
|
13天前
|
存储 API
milvus insert api的数据结构源码分析
milvus insert api的数据结构源码分析
770 6
milvus insert api的数据结构源码分析
|
1月前
|
XML Java 数据格式
Spring5源码(41)-tx:annotation-driven 标签解析过程
Spring5源码(41)-tx:annotation-driven 标签解析过程
185 0

相关产品

  • 云原生数据仓库 AnalyticDB PostgreSQL版
  • 推荐镜像

    更多