dataCoord的Compaction分析2

简介: dataCoord的Compaction分析2

dataCoord的Compaction分析2

milvus版本:2.3.2

流程图:

compaction数据流向.jpg

compaction用来合并对象存储的小文件,将小的segment合并为大的segment。

Compaction 有一个配置项来控制是否启用自动压缩。此配置是全局的,会影响系统中的所有集合。

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true

enableAutoCompaction生效的前提是enableCompaction为true。

增加了collection级别的控制。

compaction相关参数(全局):

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
dataCoord.compaction.indexBasedCompaction = true
dataCoord.compaction.global.interval = 60 #默认60秒,触发compaction信号
dataCoord.compaction.check.interval = 10 #默认10秒,更新状态
dataCoord.segment.smallProportion = 0.5 #默认0.5
dataCoord.compaction.max.segment = 30 #默认30
dataCoord.compaction.min.segment = 3 #默认3

在collection级别设置属性:

collection.autocompaction.enabled = true

python设置代码:

hello_milvus.set_properties({
   
   "collection.autocompaction.enabled": True})

1.执行compaction的代码

func (t *compactionTrigger) start() {
   
   
    t.quit = make(chan struct{
   
   })
    t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
    t.wg.Add(2)
    go func() {
   
   
        defer logutil.LogPanic()
        defer t.wg.Done()

        for {
   
   
            select {
   
   
            case <-t.quit:
                log.Info("compaction trigger quit")
                return
            case signal := <-t.signals:
                switch {
   
   
                case signal.isGlobal:
                    // 处理全局compaction信号
                    t.handleGlobalSignal(signal)
                default:
                    // collection级别信号
                    t.handleSignal(signal)
                    // shouldn't reset, otherwise a frequent flushed collection will affect other collections
                    // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
                }
            }
        }
    }()
    // 触发全局compaction信号
    go t.startGlobalCompactionLoop()
}

t.handleGlobalSignal(signal) 用来处理全局信号。

t.handleSignal(signal) 用来处理collection级别的信号。

go t.startGlobalCompactionLoop() 定时触发全局信号。

collection信号在flush的时候触发。

2.触发全局

// triggerCompaction trigger a compaction if any compaction condition satisfy.
func (t *compactionTrigger) triggerCompaction() error {
   
   
    id, err := t.allocSignalID()
    if err != nil {
   
   
        return err
    }
    signal := &compactionSignal{
   
   
        id:       id,
        isForce:  false,
        isGlobal: true,
    }
    t.signals <- signal
    return nil
}

3.触发collection级别

// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
   
   
    // If AutoCompaction disabled, flush request will not trigger compaction
    if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
   
   
        return nil
    }

    id, err := t.allocSignalID()
    if err != nil {
   
   
        return err
    }
    signal := &compactionSignal{
   
   
        id:           id,
        isForce:      false,
        isGlobal:     false,
        collectionID: collectionID,
        partitionID:  partitionID,
        segmentID:    segmentID,
        channel:      channel,
    }
    t.signals <- signal
    return nil
}

调用堆栈:

SaveBinlogPaths()(internal\datacoord\services.go)
  |--s.compactionTrigger.triggerSingleCompaction()

当调用flush的时候触发。

// SaveBinlogPaths updates segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
   
   
    ......

    if req.GetFlushed() {
   
   
        s.segmentManager.DropSegment(ctx, req.SegmentID)
        s.flushCh <- req.SegmentID

        if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
   
   
            err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
                segmentID, segment.GetInsertChannel())
            if err != nil {
   
   
                log.Warn("failed to trigger single compaction")
            } else {
   
   
                log.Info("compaction triggered for segment")
            }
        }
    }
    return merr.Success(), nil
}

4.进入handleSignal()

// handleSignal processes segment flush caused partition-chan level compaction signal
func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
   
   
    ......

    if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
   
   
        log.RatedInfo(20, "collection auto compaction disabled",
            zap.Int64("collectionID", collectionID),
        )
        return
    }

    ......

    plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct)
    for _, plan := range plans {
   
   
        ......
    }
}

isCollectionAutoCompactionEnabled()判断是否设置collection级别。

func (t *compactionTrigger) isCollectionAutoCompactionEnabled(coll *collectionInfo) bool {
   
   
    enabled, err := getCollectionAutoCompactionEnabled(coll.Properties)
    if err != nil {
   
   
        log.Warn("collection properties auto compaction not valid, returning false", zap.Error(err))
        return false
    }
    return enabled
}

进入getCollectionAutoCompactionEnabled():

// getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled.
// if not set, returns global auto compaction config.
func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) {
   
   
    v, ok := properties[common.CollectionAutoCompactionKey]
    if ok {
   
   
        enabled, err := strconv.ParseBool(v)
        if err != nil {
   
   
            return false, err
        }
        return enabled, nil
    }
    return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil
}

common.CollectionAutoCompactionKey=collection.autocompaction.enabled

目录
相关文章
|
存储 SQL API
milvus insert api流程源码分析
milvus insert api流程源码分析
617 3
|
人工智能 自然语言处理 API
向量检索服务实践测评
向量检索服务是一种基于阿里云自研的向量引擎 Proxima 内核,提供具备水平拓展、全托管、云原生的高效向量检索服务。向量检索服务将强大的向量管理、查询等能力,通过简洁易用的 SDK/API 接口透出,方便在大模型知识库搭建、多模态 AI 搜索等多种应用场景上集成。
139115 5
|
开发工具 Python
milvus的delete操作
milvus的delete操作
1644 0
|
Go API 数据库
milvus的db和collection信息查询
milvus的db和collection信息查询
1556 0
|
存储 Linux 数据安全/隐私保护
安装部署milvus单机版(快速体验)
安装部署milvus单机版(快速体验)
3723 0
|
存储 弹性计算 固态存储
阿里云服务器收费标准租用价格及价格计算器使用参考
阿里云服务器租用价格参考,不同时期阿里云服务器的租用价格不同,2024年阿里云多款云服务器的收费标准都做了降价调整,最高降幅达93%,同时,阿里云还推出了多款价格比较实惠的云服务器,现在购买阿里云轻量应用服务器2核2G3M带宽82元1年,经济型e实例ECS云服务器2核2G3M带宽新购和续费优惠价99元1年,通用算力型u1实例2核4G5M带宽新购和续费优惠价199元1年,4核8G云服务器955元1年,本文为大家介绍一下阿里云服务器的最新收费标准租用价格以及使用价格计算器查询云服务器价格的方法。
|
开发工具 数据库 git
向量检索服务体验评测
通过一个实用的例子带你全方位了解向量检索服务DashVector
121067 4
|
存储 算法 数据挖掘
向量数据库技术分享
向量数据库主要用于支持高效的向量检索场景(以图搜图、以文搜图等),通过本次培训可以掌握向量数据库的核心理论以及两种向量索引技术的特点、场景与算法原理,并通过实战案例掌握向量数据库的应用与性能优化策略。
1704 3
|
存储 Cloud Native NoSQL
向量数据库汇总
向量数据库汇总
1186 0
|
Cloud Native 关系型数据库 新能源