Insert API执行流程源码解析
milvus版本:v2.3.2
Insert这个API写入数据,流程较长,是milvus的核心API之一,本文介绍大致的写入流程。
整体架构:
Insert 的数据流向:
1.客户端sdk发出Insert API请求。
import numpy as np
from pymilvus import (
connections,
FieldSchema, CollectionSchema, DataType,
Collection,
)
num_entities, dim = 2000, 8
print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")
fields = [
FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
FieldSchema(name="random", dtype=DataType.DOUBLE),
FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")
print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)
print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
# provide the pk field because `auto_id` is set to False
[str(i) for i in range(num_entities)],
rng.random(num_entities).tolist(), # field random, only supports list
rng.random((num_entities, dim)), # field embeddings, supports numpy.ndarray and list
]
insert_result = hello_milvus.insert(entities)
hello_milvus.flush()
客户端SDK向proxy发送一个Insert API请求,向数据库写入数据。
这个例子向数据库写入2000条数据,每条数据是一个8维向量。
2.客户端接受API请求,将request封装为insertTask,并压入dmQueue队列。
注意这里是dmQueue。DDL类型的是ddQueue。
代码路径:internal\proxy\impl.go
// Insert insert records into collection.
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
......
// request封装为task
it := &insertTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
insertMsg: &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: request.HashKeys,
},
InsertRequest: msgpb.InsertRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Insert),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
DbName: request.GetDbName(),
CollectionName: request.CollectionName,
PartitionName: request.PartitionName,
FieldsData: request.FieldsData,
NumRows: uint64(request.NumRows),
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
}
......
// 将task压入dmQueue队列
if err := node.sched.dmQueue.Enqueue(it); err != nil {
......
}
......
// 等待任务执行完
if err := it.WaitToFinish(); err != nil {
......
}
......
}
InsertRequest结构:
type InsertRequest struct {
Base *commonpb.MsgBase
DbName string
CollectionName string
PartitionName string
FieldsData []*schemapb.FieldData
HashKeys []uint32
NumRows uint32
XXX_NoUnkeyedLiteral struct{
}
XXX_unrecognized []byte
XXX_sizecache int32
}
type FieldData struct {
Type DataType
FieldName string
// Types that are valid to be assigned to Field:
//
// *FieldData_Scalars
// *FieldData_Vectors
Field isFieldData_Field
FieldId int64
IsDynamic bool
XXX_NoUnkeyedLiteral struct{
}
XXX_unrecognized []byte
XXX_sizecache int32
}
type isFieldData_Field interface {
isFieldData_Field()
}
type FieldData_Scalars struct {
Scalars *ScalarField
}
type FieldData_Vectors struct {
Vectors *VectorField
}
客户端通过grpc发送数据,FieldData.Field存储接受的数据。
isFieldData_Field是一个接口,有2个实现:FieldData_Scalars和FieldData_Vectors。
真正存储数据的就是这2个实现。
3.执行insertTask的3个方法PreExecute、Execute、PostExecute。
PreExecute()一般为参数校验等工作。
Execute()一般为真正执行逻辑。
PostExecute()执行完后的逻辑,什么都不做,返回nil。
代码路径:internal\proxy\task_insert.go
func (it *insertTask) Execute(ctx context.Context) error {
......
collectionName := it.insertMsg.CollectionName
// 根据collectionName得到collectionID
collID, err := globalMetaCache.GetCollectionID(it.ctx, it.insertMsg.GetDbName(), collectionName)
log := log.Ctx(ctx)
if err != nil {
......
}
it.insertMsg.CollectionID = collID
getCacheDur := tr.RecordSpan()
// 得到stream,类型为mqMsgStream
stream, err := it.chMgr.getOrCreateDmlStream(collID)
if err != nil {
return err
}
getMsgStreamDur := tr.RecordSpan()
// by-dev-rootcoord-dml_0_445811557825249939v0
// by-dev-rootcoord-dml_1_445811557825249939v1
// 如果shardNum=2,则获取2个虚拟channel
channelNames, err := it.chMgr.getVChannels(collID)
if err != nil {
......
}
......
// assign segmentID for insert data and repack data by segmentID
// msgPck包含segmentID
var msgPack *msgstream.MsgPack
if it.partitionKeys == nil {
// 分配segmentID
// 重新打包为2个msgstream.TsMsg,分别发送给2个虚拟通道
msgPack, err = repackInsertData(it.TraceCtx(), channelNames, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
} else {
msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner)
}
if err != nil {
......
}
......
// 生产数据,将数据写入mq
err = stream.Produce(msgPack)
if err != nil {
......
}
......
}
repackInsertData()这个函数还涉及到了segmentID的分配。
总结:
1.Insert由proxy向mq(pulsar)写入数据。通过虚拟channel写入。
2.在pulsar创建topic,向topic写入数据。