milvus insert api流程源码分析

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
简介: milvus insert api流程源码分析

Insert API执行流程源码解析

milvus版本:v2.3.2

Insert这个API写入数据,流程较长,是milvus的核心API之一,本文介绍大致的写入流程。

整体架构:

architecture.png

Insert 的数据流向:

insert数据流向.jpg

1.客户端sdk发出Insert API请求。

import numpy as np
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 2000, 8

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)


print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
    # provide the pk field because `auto_id` is set to False
    [str(i) for i in range(num_entities)],
    rng.random(num_entities).tolist(),  # field random, only supports list
    rng.random((num_entities, dim)),    # field embeddings, supports numpy.ndarray and list
]

insert_result = hello_milvus.insert(entities)

hello_milvus.flush()
AI 代码解读

客户端SDK向proxy发送一个Insert API请求,向数据库写入数据。

这个例子向数据库写入2000条数据,每条数据是一个8维向量。

insert_milvus.jpg

2.客户端接受API请求,将request封装为insertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Insert insert records into collection.
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
   
   
    ......
    // request封装为task
    it := &insertTask{
   
   
        ctx:       ctx,
        Condition: NewTaskCondition(ctx),
        insertMsg: &msgstream.InsertMsg{
   
   
            BaseMsg: msgstream.BaseMsg{
   
   
                HashValues: request.HashKeys,
            },
            InsertRequest: msgpb.InsertRequest{
   
   
                Base: commonpbutil.NewMsgBase(
                    commonpbutil.WithMsgType(commonpb.MsgType_Insert),
                    commonpbutil.WithMsgID(0),
                    commonpbutil.WithSourceID(paramtable.GetNodeID()),
                ),
                DbName:         request.GetDbName(),
                CollectionName: request.CollectionName,
                PartitionName:  request.PartitionName,
                FieldsData:     request.FieldsData,
                NumRows:        uint64(request.NumRows),
                Version:        msgpb.InsertDataVersion_ColumnBased,
            },
        },
        idAllocator:   node.rowIDAllocator,
        segIDAssigner: node.segAssigner,
        chMgr:         node.chMgr,
        chTicker:      node.chTicker,
    }

    ......
    // 将task压入dmQueue队列

    if err := node.sched.dmQueue.Enqueue(it); err != nil {
   
   
        ......
    }

    ......
    // 等待任务执行完
    if err := it.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}
AI 代码解读

InsertRequest结构:

type InsertRequest struct {
   
   
    Base                 *commonpb.MsgBase     
    DbName               string                
    CollectionName       string                
    PartitionName        string                
    FieldsData           []*schemapb.FieldData 
    HashKeys             []uint32              
    NumRows              uint32                
    XXX_NoUnkeyedLiteral struct{
   
   }              
    XXX_unrecognized     []byte                
    XXX_sizecache        int32                 
}

type FieldData struct {
   
   
    Type      DataType 
    FieldName string   
    // Types that are valid to be assigned to Field:
    //
    //    *FieldData_Scalars
    //    *FieldData_Vectors
    Field                isFieldData_Field 
    FieldId              int64             
    IsDynamic            bool              
    XXX_NoUnkeyedLiteral struct{
   
   }          
    XXX_unrecognized     []byte            
    XXX_sizecache        int32             
}

type isFieldData_Field interface {
   
   
    isFieldData_Field()
}

type FieldData_Scalars struct {
   
   
    Scalars *ScalarField
}

type FieldData_Vectors struct {
   
   
    Vectors *VectorField
}
AI 代码解读

客户端通过grpc发送数据,FieldData.Field存储接受的数据。

isFieldData_Field是一个接口,有2个实现:FieldData_Scalars和FieldData_Vectors。

真正存储数据的就是这2个实现。

3.执行insertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_insert.go

func (it *insertTask) Execute(ctx context.Context) error {
   
   
    ......
    collectionName := it.insertMsg.CollectionName
    // 根据collectionName得到collectionID
    collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), collectionName)
    log := log.Ctx(ctx)
    if err != nil {
   
   
        ......
    }
    it.insertMsg.CollectionID = collID

    getCacheDur := tr.RecordSpan()
    // 得到stream,类型为mqMsgStream
    stream, err := it.chMgr.getOrCreateDmlStream(collID)
    if err != nil {
   
   
        return err
    }
    getMsgStreamDur := tr.RecordSpan()
    // by-dev-rootcoord-dml_0_445811557825249939v0
    // by-dev-rootcoord-dml_1_445811557825249939v1
    // 如果shardNum=2,则获取2个虚拟channel
    channelNames, err := it.chMgr.getVChannels(collID)
    if err != nil {
   
   
        ......
    }

    ......

    // assign segmentID for insert data and repack data by segmentID
    // msgPck包含segmentID
    var msgPack *msgstream.MsgPack
    if it.partitionKeys == nil {
   
   
        // 分配segmentID
        // 重新打包为2个msgstream.TsMsg,分别发送给2个虚拟通道
        msgPack, err = repackInsertData(it.TraceCtx(), channelNames, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    } else {
   
   
        msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    }
    if err != nil {
   
   
        ......
    }
    ......
    // 生产数据,将数据写入mq
    err = stream.Produce(msgPack)
    if err != nil {
   
   
        ......
    }
    ......
}
AI 代码解读

repackInsertData()这个函数还涉及到了segmentID的分配。

总结:

1.Insert由proxy向mq(pulsar)写入数据。通过虚拟channel写入。

2.在pulsar创建topic,向topic写入数据。

相关文章
提升用户体验:电商API如何优化购物车与支付流程
本文探讨电商购物车与支付流程优化,通过API技术提升用户体验。一是购物车原子化操作,如ETag版本控制解决冲突、局部更新减少传输;二是支付流程聚合与降级,包括JWT单次验证、动态支付选项及备用渠道切换;三是实时库存保护,利用分布式锁防止超卖。案例显示,优化后购物车冲突减少92%,支付耗时降至2.1秒,移动端转化率提升18%。API作为体验引擎,未来将与前端深度协同,推动更优购物闭环。
56 1
深度解析:爬虫技术获取淘宝商品详情并封装为API的全流程应用
本文探讨了如何利用爬虫技术获取淘宝商品详情并封装为API。首先介绍了爬虫的核心原理与工具,包括Python的Requests、BeautifulSoup和Scrapy等库。接着通过实战案例展示了如何分析淘宝商品页面结构、编写爬虫代码以及突破反爬虫策略。随后讲解了如何使用Flask框架将数据封装为API,并部署到服务器供外部访问。最后强调了在开发过程中需遵守法律与道德规范,确保数据使用的合法性和正当性。
1688商品详情API实战:Python调用全流程与数据解析技巧
本文介绍了1688电商平台的商品详情API接口,助力电商从业者高效获取商品信息。接口可返回商品基础属性、价格体系、库存状态、图片描述及商家详情等多维度数据,支持全球化语言设置。通过Python示例代码展示了如何调用该接口,帮助用户快速上手,适用于选品分析、市场研究等场景。
手把手教你调用京东商品详情 API:从申请到数据抓取全流程
京东商品详情API为电商从业者、分析师及开发者提供高效数据支持,助力优化业务与研究。该接口具备丰富数据(商品属性、价格、描述、图片、评价等)与灵活请求方式(GET/POST),满足多样化需求,是数字化时代电商应用开发与分析的有力工具。
295 13
淘宝商品详情API的调用流程(python请求示例以及json数据示例返回参考)
JSON数据示例:需要提供一个结构化的示例,展示商品详情可能包含的字段,如商品标题、价格、库存、描述、图片链接、卖家信息等。考虑到稳定性,示例应基于淘宝开放平台的标准响应格式。
深度实操:京东商品详情API接入全流程与技术要点剖析
京东商品详情API接口用于获取商品基础信息(标题、价格、库存状态、用户评价等),支持单个或多个商品查询。适用于商品列表展示、竞品分析、价格监控、库存管理、营销活动和数据分析等场景。通过发送HTTP请求(GET/POST)调用接口,服务器返回JSON格式数据,可使用Python等语言解析处理。示例代码中展示了如何用requests库调用API并获取商品详情。
AI 驱动 API 研发提效:解析 Apipost 在 API 文档生成场景的全流程能力
随着AI和大模型技术发展,软件开发步入智能化时代。API研发作为核心环节,高效调试与文档编写至关重要。Apipost是国内领先的API协同工具,提供从API设计到性能测试的完整闭环,支持AI驱动的文档生成、Markdown润色及OpenAPI生成等功能,显著提升团队效率。其AI功能可一键补全参数描述、生成示例代码与响应样例,减少重复劳动,助力开发者专注于业务逻辑优化。无论是快速迭代还是大规模接口设计,Apipost都为高效协作提供了强大支持。
66 0
物流轨迹订阅查询API调用全流程
本文介绍了物流轨迹订阅查询 API 从传统物流到智慧物流的转型价值与应用。该接口支持主流快递公司,通过标准化数据格式返回物流状态与详细路径信息,助力物流可视化、智能调度和精准客服。核心功能包括基于快递单号查询物流状态与轨迹,提供如快递公司、运单号、签收状态等关键信息。同时,文章还提供了调用流程及 Python 示例代码,便于开发者集成使用。未来,随着 AI 和物联网技术的发展,物流轨迹查询将向智慧物流生态演进,进一步提升行业效率与用户体验。
142 0
GitHub官方开源MCP服务!GitHub MCP Server:无缝集成GitHub API,实现Git流程完全自动化
GitHub MCP Server是基于Model Context Protocol的服务器工具,提供与GitHub API的无缝集成,支持自动化处理问题、Pull Request和仓库管理等功能。
772 2
GitHub官方开源MCP服务!GitHub MCP Server:无缝集成GitHub API,实现Git流程完全自动化
【03】支付宝支付商户申请下户到配置完整流程-对签约申请已通过商户进行开通API支付-创建应用-申请支付宝公钥-应用公钥-支付宝密钥-配合支付宝官方证书生成工具+配置完整流程-优雅草卓伊凡
【03】支付宝支付商户申请下户到配置完整流程-对签约申请已通过商户进行开通API支付-创建应用-申请支付宝公钥-应用公钥-支付宝密钥-配合支付宝官方证书生成工具+配置完整流程-优雅草卓伊凡
383 0
【03】支付宝支付商户申请下户到配置完整流程-对签约申请已通过商户进行开通API支付-创建应用-申请支付宝公钥-应用公钥-支付宝密钥-配合支付宝官方证书生成工具+配置完整流程-优雅草卓伊凡
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问