Insert API执行流程_milvus源码解析

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: Insert API执行流程_milvus源码解析

Insert API执行流程源码解析

milvus版本:v2.3.2

Insert这个API写入数据,流程较长,是milvus的核心API之一,本文介绍大致的写入流程。

整体架构:

architecture.png

Insert 的数据流向:

insert数据流向.jpg

1.客户端sdk发出Insert API请求。

import numpy as np
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 2000, 8

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)


print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
    # provide the pk field because `auto_id` is set to False
    [str(i) for i in range(num_entities)],
    rng.random(num_entities).tolist(),  # field random, only supports list
    rng.random((num_entities, dim)),    # field embeddings, supports numpy.ndarray and list
]

insert_result = hello_milvus.insert(entities)

hello_milvus.flush()

客户端SDK向proxy发送一个Insert API请求,向数据库写入数据。

这个例子向数据库写入2000条数据,每条数据是一个8维向量。

insert_milvus.jpg

2.客户端接受API请求,将request封装为insertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Insert insert records into collection.
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
   
   
    ......
    // request封装为task
    it := &insertTask{
   
   
        ctx:       ctx,
        Condition: NewTaskCondition(ctx),
        insertMsg: &msgstream.InsertMsg{
   
   
            BaseMsg: msgstream.BaseMsg{
   
   
                HashValues: request.HashKeys,
            },
            InsertRequest: msgpb.InsertRequest{
   
   
                Base: commonpbutil.NewMsgBase(
                    commonpbutil.WithMsgType(commonpb.MsgType_Insert),
                    commonpbutil.WithMsgID(0),
                    commonpbutil.WithSourceID(paramtable.GetNodeID()),
                ),
                DbName:         request.GetDbName(),
                CollectionName: request.CollectionName,
                PartitionName:  request.PartitionName,
                FieldsData:     request.FieldsData,
                NumRows:        uint64(request.NumRows),
                Version:        msgpb.InsertDataVersion_ColumnBased,
            },
        },
        idAllocator:   node.rowIDAllocator,
        segIDAssigner: node.segAssigner,
        chMgr:         node.chMgr,
        chTicker:      node.chTicker,
    }

    ......
    // 将task压入dmQueue队列

    if err := node.sched.dmQueue.Enqueue(it); err != nil {
   
   
        ......
    }

    ......
    // 等待任务执行完
    if err := it.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

InsertRequest结构:

type InsertRequest struct {
   
   
    Base                 *commonpb.MsgBase     
    DbName               string                
    CollectionName       string                
    PartitionName        string                
    FieldsData           []*schemapb.FieldData 
    HashKeys             []uint32              
    NumRows              uint32                
    XXX_NoUnkeyedLiteral struct{
   
   }              
    XXX_unrecognized     []byte                
    XXX_sizecache        int32                 
}

type FieldData struct {
   
   
    Type      DataType 
    FieldName string   
    // Types that are valid to be assigned to Field:
    //
    //    *FieldData_Scalars
    //    *FieldData_Vectors
    Field                isFieldData_Field 
    FieldId              int64             
    IsDynamic            bool              
    XXX_NoUnkeyedLiteral struct{
   
   }          
    XXX_unrecognized     []byte            
    XXX_sizecache        int32             
}

type isFieldData_Field interface {
   
   
    isFieldData_Field()
}

type FieldData_Scalars struct {
   
   
    Scalars *ScalarField
}

type FieldData_Vectors struct {
   
   
    Vectors *VectorField
}

客户端通过grpc发送数据,FieldData.Field存储接受的数据。

isFieldData_Field是一个接口,有2个实现:FieldData_Scalars和FieldData_Vectors。

真正存储数据的就是这2个实现。

3.执行insertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_insert.go

func (it *insertTask) Execute(ctx context.Context) error {
   
   
    ......
    collectionName := it.insertMsg.CollectionName
    // 根据collectionName得到collectionID
    collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), collectionName)
    log := log.Ctx(ctx)
    if err != nil {
   
   
        ......
    }
    it.insertMsg.CollectionID = collID

    getCacheDur := tr.RecordSpan()
    // 得到stream,类型为mqMsgStream
    stream, err := it.chMgr.getOrCreateDmlStream(collID)
    if err != nil {
   
   
        return err
    }
    getMsgStreamDur := tr.RecordSpan()
    // by-dev-rootcoord-dml_0_445811557825249939v0
    // by-dev-rootcoord-dml_1_445811557825249939v1
    // 如果shardNum=2,则获取2个虚拟channel
    channelNames, err := it.chMgr.getVChannels(collID)
    if err != nil {
   
   
        ......
    }

    ......

    // assign segmentID for insert data and repack data by segmentID
    // msgPck包含segmentID
    var msgPack *msgstream.MsgPack
    if it.partitionKeys == nil {
   
   
        // 分配segmentID
        // 重新打包为2个msgstream.TsMsg,分别发送给2个虚拟通道
        msgPack, err = repackInsertData(it.TraceCtx(), channelNames, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    } else {
   
   
        msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    }
    if err != nil {
   
   
        ......
    }
    ......
    // 生产数据,将数据写入mq
    err = stream.Produce(msgPack)
    if err != nil {
   
   
        ......
    }
    ......
}

总结:

1.Insert由proxy向mq(pulsar)写入数据。通过虚拟channel写入。

2.在pulsar创建topic,向topic写入数据。

目录
相关文章
|
13天前
|
搜索推荐 API 开发者
京东商品列表 API 接口全解析:从入门到精通
京东商品列表API是京东开放平台为开发者提供的核心数据接口,支持批量获取商品基础信息、价格、库存状态等多维度数据。它具备数据丰富性、灵活筛选与分页查询、稳定高效等特点,可满足市场分析、选品优化、比价工具及推荐系统开发等需求,为电商业务创新提供坚实支撑。通过标准化通道,助力第三方高效、合法地利用京东海量商品数据。
|
20天前
|
数据挖掘 API 开发者
深度解析!淘宝商品详情 API 接口的高效调用与实战应用
淘宝商品详情API为开发者提供高效获取商品信息的途径,支持名称、价格、销量等详细数据的提取。接口通过GET/POST请求方式调用,需携带商品ID与授权信息(如AppKey)。其特点包括数据全面、实时性强及安全性高,满足电商应用、数据分析等需求。本文还提供了Python调用示例,涵盖签名生成、参数构建及请求发送全流程,助力开发者快速集成淘宝商品数据至自身系统中。
|
1月前
|
架构师 安全 物联网
Apipost vs Apifox:高效API协作的差异化功能解析
作为企业级API架构师,深度体验APIPost与Apifox后发现几大亮点功能。目录级参数配置避免全局污染;WebSocket消息分组提升长连接管理效率;Socket.IO支持解决特定协议需求;接口锁定保障团队协作安全。大型团队适合APIPost的细粒度管控,复杂物联网项目需WebSocket分组,维护遗留系统离不开Socket.IO支持,初创团队可按需灵活选择。这些特性显著优化开发协作质量。
|
16天前
|
安全 API 数据安全/隐私保护
12种API认证全场景解析:从Basic到OAuth2.0,哪个认证最适合你的业务?
在API认证领域,从简单的Key-Value到高级的OAuth2.0和JWT,共有12种主流认证方式。本文详解了每种方式的意义、适用场景及优劣,并通过认证方式矩阵对比常见工具(如Postman、Apifox)的支持情况。此外,还介绍了企业级安全功能,如密钥保险箱、动态令牌和合规审计。选择合适的认证方式不仅能提升安全性,还能大幅提高开发效率。未来,自动化认证矩阵或将成为API调试的核心趋势。
|
1月前
|
存储 人工智能 API
离线VS强制登录?Apipost与Apifox的API工具理念差异深度解析
在代码开发中,工具是助手还是枷锁?本文通过对比Apipost和Apifox在断网环境下的表现,探讨API工具的选择对开发自由度的影响。Apifox强制登录限制了离线使用,而Apipost支持游客模式与本地存储,尊重开发者数据主权。文章从登录策略、离线能力、协作模式等方面深入分析,揭示工具背后的设计理念与行业趋势,帮助开发者明智选择,掌握数据控制权并提升工作效率。
|
2月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
203 29
|
6月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
178 2
|
2月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
69 3
|
2月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
2月前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。

推荐镜像

更多