CreateCollectiondataSyncService执行流程源码解析
milvus版本:v2.3.2
CreateCollection这个API流程较长,也是milvus的核心API之一,涉及的内容比较复杂。这里介绍dataSyncService相关的流程。
这边文章基于【CreateCollection流程_addCollectionMetaStep_milvus源码解析】这篇文章。
整体架构:
CreateCollection 的数据流向:
1.客户端sdk发出CreateCollection API请求。
客户端SDK向proxy发送一个CreateCollection API请求,创建一个名为hello_milvus的collection。
dataCoord会向etcd写入2种类型的key:
- channelwatch/{nodeID}/{chName}
- datacoord-meta/channel-removal/{channelName}
2.dataNode启动时会watch etcd的key(channelwatch/)
这样当dataCoord写入此类型的key,就会触发dataNode相应的动作。
下面进行源码分析:
1.从dataNode启动开始
堆栈:
Run()
|--d.svr.Run()(cmd\components\data_node.go)
|--s.start()(internal\distributed\datanode\service.go)
|--s.datanode.Start()(同上)
|--Start()(grpc调用internal\datanode\data_node.go)
|--go node.StartWatchChannels(node.ctx)(同上)
go node.StartWatchChannels():开启一个goroutine进行watch。
2.进入node.StartWatchChannels()
代码路径:internal\datanode\event_manager.go
// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now)
func (node *DataNode) StartWatchChannels(ctx context.Context) {
defer node.stopWaiter.Done()
defer logutil.LogPanic()
// 构建key的规则:channelwatch/{nodeID}
watchPrefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID))
log.Info("Start watch channel", zap.String("prefix", watchPrefix))
// 在etcd上watch key,例如:channelwatch/5
// 创建collection会产生key:channelwatch/{nodeID}/{chName}
evtChan := node.watchKv.WatchWithPrefix(watchPrefix)
// after watch, first check all exists nodes first
err := node.checkWatchedList()
if err != nil {
log.Warn("StartWatchChannels failed", zap.Error(err))
return
}
// 处理watch事件
for {
select {
case <-ctx.Done():
log.Info("watch etcd loop quit")
return
case event, ok := <-evtChan:
if !ok {
......
}
if err := event.Err(); err != nil {
......
}
// 处理watch的event
for _, evt := range event.Events {
// We need to stay in order until events enqueued
node.handleChannelEvt(evt)
}
}
}
}
node.handleChannelEvt(evt)用来处理具体的event,主要是PUT和DELETE事件。
创建collection会向etcd写入kv,属于PUT事件。
删除collection会删除etcd的kv,属于DELETE事件。
3.进入node.handleChannelEvt()
代码路径:internal\datanode\data_node.go
// handleChannelEvt handles event from kv watch event
func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
var e *event
// 根据type填充event
switch evt.Type {
case clientv3.EventTypePut: // datacoord shall put channels needs to be watched here
e = &event{
eventType: putEventType,
version: evt.Kv.Version,
}
case clientv3.EventTypeDelete:
e = &event{
eventType: deleteEventType,
version: evt.Kv.Version,
}
}
node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value)
}
evt.Kv.Key的值:
by-dev/meta/channelwatch/6/by-dev-rootcoord-dml_0_445698323354747022v0
4.进入node.handleWatchInfo()
代码路径:internal\datanode\event_manager.go
func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
switch e.eventType {
case putEventType:
// 反序列化得到ChannelWatchInfo
watchInfo, err := parsePutEventData(data)
if err != nil {
log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err))
return
}
if isEndWatchState(watchInfo.State) {
log.Info("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
return
}
if watchInfo.Progress != 0 {
log.Info("DataNode received a PUT event with tickler update progress", zap.String("channel", watchInfo.Vchan.ChannelName), zap.Int64("version", e.version))
return
}
e.info = watchInfo
// 填充虚拟channel名称
e.vChanName = watchInfo.GetVchan().GetChannelName()
log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.Any("watch state", watchInfo.GetState().String()))
case deleteEventType:
e.vChanName = parseDeleteEventKey(key)
log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key))
}
actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager(
node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
))
// loaded=false
if !loaded {
actualManager.Run()
}
actualManager.handleEvent(*e)
// Whenever a delete event comes, this eventManager will be removed from map
if e.eventType == deleteEventType {
if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded {
m.Close()
}
}
}
变量e的值:
newChannelEventManager()返回一个channelEventManager结构体:
func newChannelEventManager(handlePut func(*datapb.ChannelWatchInfo, int64) error,
handleDel func(string), retryInterval time.Duration,
) *channelEventManager {
return &channelEventManager{
eventChan: make(chan event, 10),
closeChan: make(chan struct{
}),
handlePutEvent: handlePut,// 设置PUT处理函数
handleDeleteEvent: handleDel,// 设置DELETE处理函数
retryInterval: retryInterval,
}
}
handlePutEvent设置为node.handlePutEvent()。
handleDeleteEvent设置为node.handleDeleteEvent()。
4.进入actualManager.Run()
代码路径:internal\datanode\event_manager.go
func (e *channelEventManager) Run() {
e.wg.Add(1)
go func() {
defer e.wg.Done()
for {
select {
case event := <-e.eventChan:
switch event.eventType {
case putEventType:
// 处理PUT事件:node.handlePutEvent()
err := e.handlePutEvent(event.info, event.version)
if err != nil {
// logging the error is convenient for follow-up investigation of problems
log.Warn("handle put event failed", zap.String("vChanName", event.vChanName), zap.Error(err))
}
case deleteEventType:
// 处理DELETE事件:node.handleDeleteEvent()
e.handleDeleteEvent(event.vChanName)
}
case <-e.closeChan:
return
}
}
}()
}
event.version的值为1
event.info的值如下:
5.进入e.handlePutEvent(),其实就是进入node.handlePutEvent()
代码路径:internal\datanode\event_manager.go
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) {
// 获取虚拟channel名称,例如:by-dev-rootcoord-dml_2_445698762473996462v0
vChanName := watchInfo.GetVchan().GetChannelName()
// 获取key,例如:channelwatch/7/by-dev-rootcoord-dml_2_445698762473996462v0
key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName)
tickler := newEtcdTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second))
switch watchInfo.State {
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
// 走这条路径
if err := node.flowgraphManager.addAndStartWithEtcdTickler(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
......
} else {
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
}
case datapb.ChannelWatchState_ToRelease:
......
}
v, err := proto.Marshal(watchInfo)
if err != nil {
......
}
success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v))
if err != nil {
......
}
log.Info("handle put event success", zap.String("key", key),
zap.String("state", watchInfo.State.String()), zap.String("vChanName", vChanName))
return nil
}
6.进入node.flowgraphManager.addAndStartWithEtcdTickler()
代码路径:internal\datanode\flow_graph_manager.go
func (fm *flowgraphManager) addAndStartWithEtcdTickler(dn *DataNode, vchan *datapb.VchannelInfo, schema *schemapb.CollectionSchema, tickler *etcdTickler) error {
log := log.With(zap.String("channel", vchan.GetChannelName()))
if fm.flowgraphs.Contain(vchan.GetChannelName()) {
log.Warn("try to add an existed DataSyncService")
return nil
}
// 构建dataSyncService结构体
dataSyncService, err := newServiceWithEtcdTickler(context.TODO(), dn, &datapb.ChannelWatchInfo{
Schema: schema,
Vchan: vchan,
}, tickler)
if err != nil {
log.Warn("fail to create new DataSyncService", zap.Error(err))
return err
}
// 启动dataSyncService
dataSyncService.start()
fm.flowgraphs.Insert(vchan.GetChannelName(), dataSyncService)
metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
return nil
}
总结:
1.datanode对channelwatch/{nodeID}进行watch
2.当创建或者删除collection,对etcd进行PUT或者DELETE,触发datanode相应事件。
当PUT事件发生,触发dataSyncService的启动。
一个shard对应一个vchannel,同时启动一个dataSyncService。