Insert API执行流程_milvus源码解析

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
简介: Insert API执行流程_milvus源码解析

Insert API执行流程源码解析

milvus版本:v2.3.2

Insert这个API写入数据,流程较长,是milvus的核心API之一,本文介绍大致的写入流程。

整体架构:

architecture.png

Insert 的数据流向:

insert数据流向.jpg

1.客户端sdk发出Insert API请求。

import numpy as np
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 2000, 8

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)


print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
    # provide the pk field because `auto_id` is set to False
    [str(i) for i in range(num_entities)],
    rng.random(num_entities).tolist(),  # field random, only supports list
    rng.random((num_entities, dim)),    # field embeddings, supports numpy.ndarray and list
]

insert_result = hello_milvus.insert(entities)

hello_milvus.flush()

客户端SDK向proxy发送一个Insert API请求,向数据库写入数据。

这个例子向数据库写入2000条数据,每条数据是一个8维向量。

insert_milvus.jpg

2.客户端接受API请求,将request封装为insertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Insert insert records into collection.
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
   
   
    ......
    // request封装为task
    it := &insertTask{
   
   
        ctx:       ctx,
        Condition: NewTaskCondition(ctx),
        insertMsg: &msgstream.InsertMsg{
   
   
            BaseMsg: msgstream.BaseMsg{
   
   
                HashValues: request.HashKeys,
            },
            InsertRequest: msgpb.InsertRequest{
   
   
                Base: commonpbutil.NewMsgBase(
                    commonpbutil.WithMsgType(commonpb.MsgType_Insert),
                    commonpbutil.WithMsgID(0),
                    commonpbutil.WithSourceID(paramtable.GetNodeID()),
                ),
                DbName:         request.GetDbName(),
                CollectionName: request.CollectionName,
                PartitionName:  request.PartitionName,
                FieldsData:     request.FieldsData,
                NumRows:        uint64(request.NumRows),
                Version:        msgpb.InsertDataVersion_ColumnBased,
            },
        },
        idAllocator:   node.rowIDAllocator,
        segIDAssigner: node.segAssigner,
        chMgr:         node.chMgr,
        chTicker:      node.chTicker,
    }

    ......
    // 将task压入dmQueue队列

    if err := node.sched.dmQueue.Enqueue(it); err != nil {
   
   
        ......
    }

    ......
    // 等待任务执行完
    if err := it.WaitToFinish(); err != nil {
   
   
        ......
    }

    ......
}

InsertRequest结构:

type InsertRequest struct {
   
   
    Base                 *commonpb.MsgBase     
    DbName               string                
    CollectionName       string                
    PartitionName        string                
    FieldsData           []*schemapb.FieldData 
    HashKeys             []uint32              
    NumRows              uint32                
    XXX_NoUnkeyedLiteral struct{
   
   }              
    XXX_unrecognized     []byte                
    XXX_sizecache        int32                 
}

type FieldData struct {
   
   
    Type      DataType 
    FieldName string   
    // Types that are valid to be assigned to Field:
    //
    //    *FieldData_Scalars
    //    *FieldData_Vectors
    Field                isFieldData_Field 
    FieldId              int64             
    IsDynamic            bool              
    XXX_NoUnkeyedLiteral struct{
   
   }          
    XXX_unrecognized     []byte            
    XXX_sizecache        int32             
}

type isFieldData_Field interface {
   
   
    isFieldData_Field()
}

type FieldData_Scalars struct {
   
   
    Scalars *ScalarField
}

type FieldData_Vectors struct {
   
   
    Vectors *VectorField
}

客户端通过grpc发送数据,FieldData.Field存储接受的数据。

isFieldData_Field是一个接口,有2个实现:FieldData_Scalars和FieldData_Vectors。

真正存储数据的就是这2个实现。

3.执行insertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_insert.go

func (it *insertTask) Execute(ctx context.Context) error {
   
   
    ......
    collectionName := it.insertMsg.CollectionName
    // 根据collectionName得到collectionID
    collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), collectionName)
    log := log.Ctx(ctx)
    if err != nil {
   
   
        ......
    }
    it.insertMsg.CollectionID = collID

    getCacheDur := tr.RecordSpan()
    // 得到stream,类型为mqMsgStream
    stream, err := it.chMgr.getOrCreateDmlStream(collID)
    if err != nil {
   
   
        return err
    }
    getMsgStreamDur := tr.RecordSpan()
    // by-dev-rootcoord-dml_0_445811557825249939v0
    // by-dev-rootcoord-dml_1_445811557825249939v1
    // 如果shardNum=2,则获取2个虚拟channel
    channelNames, err := it.chMgr.getVChannels(collID)
    if err != nil {
   
   
        ......
    }

    ......

    // assign segmentID for insert data and repack data by segmentID
    // msgPck包含segmentID
    var msgPack *msgstream.MsgPack
    if it.partitionKeys == nil {
   
   
        // 分配segmentID
        // 重新打包为2个msgstream.TsMsg,分别发送给2个虚拟通道
        msgPack, err = repackInsertData(it.TraceCtx(), channelNames, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    } else {
   
   
        msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
    }
    if err != nil {
   
   
        ......
    }
    ......
    // 生产数据,将数据写入mq
    err = stream.Produce(msgPack)
    if err != nil {
   
   
        ......
    }
    ......
}

总结:

1.Insert由proxy向mq(pulsar)写入数据。通过虚拟channel写入。

2.在pulsar创建topic,向topic写入数据。

目录
相关文章
|
8天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
8天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
8天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
6天前
|
安全 API 数据安全/隐私保护
速卖通AliExpress商品详情API接口深度解析与实战应用
速卖通(AliExpress)作为全球化电商的重要平台,提供了丰富的商品资源和便捷的购物体验。为了提升用户体验和优化商品管理,速卖通开放了API接口,其中商品详情API尤为关键。本文介绍如何获取API密钥、调用商品详情API接口,并处理API响应数据,帮助开发者和商家高效利用这些工具。通过合理规划API调用策略和确保合法合规使用,开发者可以更好地获取商品信息,优化管理和营销策略。
|
15天前
|
数据采集 JSON API
如何利用Python爬虫淘宝商品详情高级版(item_get_pro)API接口及返回值解析说明
本文介绍了如何利用Python爬虫技术调用淘宝商品详情高级版API接口(item_get_pro),获取商品的详细信息,包括标题、价格、销量等。文章涵盖了环境准备、API权限申请、请求构建和返回值解析等内容,强调了数据获取的合规性和安全性。
|
14天前
|
JSON 自然语言处理 Java
OpenAI API深度解析:参数、Token、计费与多种调用方式
随着人工智能技术的飞速发展,OpenAI API已成为许多开发者和企业的得力助手。本文将深入探讨OpenAI API的参数、Token、计费方式,以及如何通过Rest API(以Postman为例)、Java API调用、工具调用等方式实现与OpenAI的交互,并特别关注调用具有视觉功能的GPT-4o使用本地图片的功能。此外,本文还将介绍JSON模式、可重现输出的seed机制、使用代码统计Token数量、开发控制台循环聊天,以及基于最大Token数量的消息列表限制和会话长度管理的控制台循环聊天。
104 7
|
9天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
86 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
87 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
68 0

推荐镜像

更多