dataCoord的Compaction分析2

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: dataCoord的Compaction分析2

dataCoord的Compaction分析2

milvus版本:2.3.2

流程图:

compaction数据流向.jpg

compaction用来合并对象存储的小文件,将小的segment合并为大的segment。

Compaction 有一个配置项来控制是否启用自动压缩。此配置是全局的,会影响系统中的所有集合。

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true

enableAutoCompaction生效的前提是enableCompaction为true。

增加了collection级别的控制。

compaction相关参数(全局):

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
dataCoord.compaction.indexBasedCompaction = true
dataCoord.compaction.global.interval = 60 #默认60秒,触发compaction信号
dataCoord.compaction.check.interval = 10 #默认10秒,更新状态
dataCoord.segment.smallProportion = 0.5 #默认0.5
dataCoord.compaction.max.segment = 30 #默认30
dataCoord.compaction.min.segment = 3 #默认3

在collection级别设置属性:

collection.autocompaction.enabled = true

python设置代码:

hello_milvus.set_properties({
   
   "collection.autocompaction.enabled": True})

1.执行compaction的代码

func (t *compactionTrigger) start() {
   
   
    t.quit = make(chan struct{
   
   })
    t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
    t.wg.Add(2)
    go func() {
   
   
        defer logutil.LogPanic()
        defer t.wg.Done()

        for {
   
   
            select {
   
   
            case <-t.quit:
                log.Info("compaction trigger quit")
                return
            case signal := <-t.signals:
                switch {
   
   
                case signal.isGlobal:
                    // 处理全局compaction信号
                    t.handleGlobalSignal(signal)
                default:
                    // collection级别信号
                    t.handleSignal(signal)
                    // shouldn't reset, otherwise a frequent flushed collection will affect other collections
                    // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
                }
            }
        }
    }()
    // 触发全局compaction信号
    go t.startGlobalCompactionLoop()
}

t.handleGlobalSignal(signal) 用来处理全局信号。

t.handleSignal(signal) 用来处理collection级别的信号。

go t.startGlobalCompactionLoop() 定时触发全局信号。

collection信号在flush的时候触发。

2.触发全局

// triggerCompaction trigger a compaction if any compaction condition satisfy.
func (t *compactionTrigger) triggerCompaction() error {
   
   
    id, err := t.allocSignalID()
    if err != nil {
   
   
        return err
    }
    signal := &compactionSignal{
   
   
        id:       id,
        isForce:  false,
        isGlobal: true,
    }
    t.signals <- signal
    return nil
}

3.触发collection级别

// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
   
   
    // If AutoCompaction disabled, flush request will not trigger compaction
    if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
   
   
        return nil
    }

    id, err := t.allocSignalID()
    if err != nil {
   
   
        return err
    }
    signal := &compactionSignal{
   
   
        id:           id,
        isForce:      false,
        isGlobal:     false,
        collectionID: collectionID,
        partitionID:  partitionID,
        segmentID:    segmentID,
        channel:      channel,
    }
    t.signals <- signal
    return nil
}

调用堆栈:

SaveBinlogPaths()(internal\datacoord\services.go)
  |--s.compactionTrigger.triggerSingleCompaction()

当调用flush的时候触发。

// SaveBinlogPaths updates segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
   
   
    ......

    if req.GetFlushed() {
   
   
        s.segmentManager.DropSegment(ctx, req.SegmentID)
        s.flushCh <- req.SegmentID

        if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
   
   
            err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
                segmentID, segment.GetInsertChannel())
            if err != nil {
   
   
                log.Warn("failed to trigger single compaction")
            } else {
   
   
                log.Info("compaction triggered for segment")
            }
        }
    }
    return merr.Success(), nil
}

4.进入handleSignal()

// handleSignal processes segment flush caused partition-chan level compaction signal
func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
   
   
    ......

    if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
   
   
        log.RatedInfo(20, "collection auto compaction disabled",
            zap.Int64("collectionID", collectionID),
        )
        return
    }

    ......

    plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct)
    for _, plan := range plans {
   
   
        ......
    }
}

isCollectionAutoCompactionEnabled()判断是否设置collection级别。

func (t *compactionTrigger) isCollectionAutoCompactionEnabled(coll *collectionInfo) bool {
   
   
    enabled, err := getCollectionAutoCompactionEnabled(coll.Properties)
    if err != nil {
   
   
        log.Warn("collection properties auto compaction not valid, returning false", zap.Error(err))
        return false
    }
    return enabled
}

进入getCollectionAutoCompactionEnabled():

// getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled.
// if not set, returns global auto compaction config.
func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) {
   
   
    v, ok := properties[common.CollectionAutoCompactionKey]
    if ok {
   
   
        enabled, err := strconv.ParseBool(v)
        if err != nil {
   
   
            return false, err
        }
        return enabled, nil
    }
    return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil
}

common.CollectionAutoCompactionKey=collection.autocompaction.enabled

目录
相关文章
|
存储 SQL API
milvus insert api流程源码分析
milvus insert api流程源码分析
499 3
|
人工智能 自然语言处理 API
向量检索服务实践测评
向量检索服务是一种基于阿里云自研的向量引擎 Proxima 内核,提供具备水平拓展、全托管、云原生的高效向量检索服务。向量检索服务将强大的向量管理、查询等能力,通过简洁易用的 SDK/API 接口透出,方便在大模型知识库搭建、多模态 AI 搜索等多种应用场景上集成。
138987 5
|
开发工具 Python
milvus的delete操作
milvus的delete操作
1496 0
|
Go API 数据库
milvus的db和collection信息查询
milvus的db和collection信息查询
1429 0
|
存储 Linux 数据安全/隐私保护
安装部署milvus单机版(快速体验)
安装部署milvus单机版(快速体验)
3098 0
|
开发工具 数据库 git
向量检索服务体验评测
通过一个实用的例子带你全方位了解向量检索服务DashVector
120860 4
|
存储 算法 数据挖掘
向量数据库技术分享
向量数据库主要用于支持高效的向量检索场景(以图搜图、以文搜图等),通过本次培训可以掌握向量数据库的核心理论以及两种向量索引技术的特点、场景与算法原理,并通过实战案例掌握向量数据库的应用与性能优化策略。
1361 3
|
存储 Cloud Native NoSQL
向量数据库汇总
向量数据库汇总
954 0
|
11月前
|
传感器 机器学习/深度学习 数据采集
AI在环保中的角色:污染监测与防治
【10月更文挑战第6天】AI在环保领域的应用,不仅提升了污染监测的精准度和防治效率,还推动了环保技术的创新和升级。作为未来环保事业的重要力量,AI正以其独特的优势,为构建更加绿色、可持续的生态环境贡献着智慧与力量。我们有理由相信,在AI的助力下,我们的地球将变得更加美好。
|
Cloud Native 关系型数据库 新能源