dataCoord的Compaction分析2

简介: dataCoord的Compaction分析2

dataCoord的Compaction分析2

milvus版本:2.3.2

流程图:

compaction数据流向.jpg

compaction用来合并对象存储的小文件,将小的segment合并为大的segment。

Compaction 有一个配置项来控制是否启用自动压缩。此配置是全局的,会影响系统中的所有集合。

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true

enableAutoCompaction生效的前提是enableCompaction为true。

增加了collection级别的控制。

compaction相关参数(全局):

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
dataCoord.compaction.indexBasedCompaction = true
dataCoord.compaction.global.interval = 60 #默认60秒,触发compaction信号
dataCoord.compaction.check.interval = 10 #默认10秒,更新状态
dataCoord.segment.smallProportion = 0.5 #默认0.5
dataCoord.compaction.max.segment = 30 #默认30
dataCoord.compaction.min.segment = 3 #默认3

在collection级别设置属性:

collection.autocompaction.enabled = true

python设置代码:

hello_milvus.set_properties({
   
   "collection.autocompaction.enabled": True})

1.执行compaction的代码

func (t *compactionTrigger) start() {
   
   
    t.quit = make(chan struct{
   
   })
    t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
    t.wg.Add(2)
    go func() {
   
   
        defer logutil.LogPanic()
        defer t.wg.Done()

        for {
   
   
            select {
   
   
            case <-t.quit:
                log.Info("compaction trigger quit")
                return
            case signal := <-t.signals:
                switch {
   
   
                case signal.isGlobal:
                    // 处理全局compaction信号
                    t.handleGlobalSignal(signal)
                default:
                    // collection级别信号
                    t.handleSignal(signal)
                    // shouldn't reset, otherwise a frequent flushed collection will affect other collections
                    // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
                }
            }
        }
    }()
    // 触发全局compaction信号
    go t.startGlobalCompactionLoop()
}

t.handleGlobalSignal(signal) 用来处理全局信号。

t.handleSignal(signal) 用来处理collection级别的信号。

go t.startGlobalCompactionLoop() 定时触发全局信号。

collection信号在flush的时候触发。

2.触发全局

// triggerCompaction trigger a compaction if any compaction condition satisfy.
func (t *compactionTrigger) triggerCompaction() error {
   
   
    id, err := t.allocSignalID()
    if err != nil {
   
   
        return err
    }
    signal := &compactionSignal{
   
   
        id:       id,
        isForce:  false,
        isGlobal: true,
    }
    t.signals <- signal
    return nil
}

3.触发collection级别

// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
   
   
    // If AutoCompaction disabled, flush request will not trigger compaction
    if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
   
   
        return nil
    }

    id, err := t.allocSignalID()
    if err != nil {
   
   
        return err
    }
    signal := &compactionSignal{
   
   
        id:           id,
        isForce:      false,
        isGlobal:     false,
        collectionID: collectionID,
        partitionID:  partitionID,
        segmentID:    segmentID,
        channel:      channel,
    }
    t.signals <- signal
    return nil
}

调用堆栈:

SaveBinlogPaths()(internal\datacoord\services.go)
  |--s.compactionTrigger.triggerSingleCompaction()

当调用flush的时候触发。

// SaveBinlogPaths updates segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
   
   
    ......

    if req.GetFlushed() {
   
   
        s.segmentManager.DropSegment(ctx, req.SegmentID)
        s.flushCh <- req.SegmentID

        if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
   
   
            err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
                segmentID, segment.GetInsertChannel())
            if err != nil {
   
   
                log.Warn("failed to trigger single compaction")
            } else {
   
   
                log.Info("compaction triggered for segment")
            }
        }
    }
    return merr.Success(), nil
}

4.进入handleSignal()

// handleSignal processes segment flush caused partition-chan level compaction signal
func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
   
   
    ......

    if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
   
   
        log.RatedInfo(20, "collection auto compaction disabled",
            zap.Int64("collectionID", collectionID),
        )
        return
    }

    ......

    plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct)
    for _, plan := range plans {
   
   
        ......
    }
}

isCollectionAutoCompactionEnabled()判断是否设置collection级别。

func (t *compactionTrigger) isCollectionAutoCompactionEnabled(coll *collectionInfo) bool {
   
   
    enabled, err := getCollectionAutoCompactionEnabled(coll.Properties)
    if err != nil {
   
   
        log.Warn("collection properties auto compaction not valid, returning false", zap.Error(err))
        return false
    }
    return enabled
}

进入getCollectionAutoCompactionEnabled():

// getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled.
// if not set, returns global auto compaction config.
func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) {
   
   
    v, ok := properties[common.CollectionAutoCompactionKey]
    if ok {
   
   
        enabled, err := strconv.ParseBool(v)
        if err != nil {
   
   
            return false, err
        }
        return enabled, nil
    }
    return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil
}

common.CollectionAutoCompactionKey=collection.autocompaction.enabled

目录
相关文章
|
存储 SQL API
milvus insert api流程源码分析
milvus insert api流程源码分析
785 3
|
人工智能 自然语言处理 API
向量检索服务实践测评
向量检索服务是一种基于阿里云自研的向量引擎 Proxima 内核,提供具备水平拓展、全托管、云原生的高效向量检索服务。向量检索服务将强大的向量管理、查询等能力,通过简洁易用的 SDK/API 接口透出,方便在大模型知识库搭建、多模态 AI 搜索等多种应用场景上集成。
139356 5
|
Go API 数据库
milvus的db和collection信息查询
milvus的db和collection信息查询
1770 0
|
开发工具 Python
milvus的delete操作
milvus的delete操作
1848 0
|
2月前
|
弹性计算 人工智能 编解码
告别“算力焦虑”?实测阿里云第九代ECS,AMX指令集带来的意外惊喜**
阿里云第九代ECS搭载英特尔®至强®6处理器,创新集成AMX矩阵加速与TDX机密计算:AMX提升AI推理效率、降低延迟与成本;TDX提供硬件级内存加密隔离,强化数据安全。适用于游戏、推荐、音视频及核心数据库等场景。(239字)
|
存储 弹性计算 固态存储
阿里云服务器收费标准租用价格及价格计算器使用参考
阿里云服务器租用价格参考,不同时期阿里云服务器的租用价格不同,2024年阿里云多款云服务器的收费标准都做了降价调整,最高降幅达93%,同时,阿里云还推出了多款价格比较实惠的云服务器,现在购买阿里云轻量应用服务器2核2G3M带宽82元1年,经济型e实例ECS云服务器2核2G3M带宽新购和续费优惠价99元1年,通用算力型u1实例2核4G5M带宽新购和续费优惠价199元1年,4核8G云服务器955元1年,本文为大家介绍一下阿里云服务器的最新收费标准租用价格以及使用价格计算器查询云服务器价格的方法。
|
11月前
|
人工智能 自然语言处理 搜索推荐
传统产品经理思维在AI时代‘失灵’,能力图谱如何助力AI产品经理构建认知框架?
本文AI产品专家三桥君探讨了AI产品经理在技术快速发展背景下如何通过构建AI能力图谱来指导智能产品设计。三桥君从知识与推理、自然语言处理、交互能力和辅助决策四个维度系统梳理AI核心能力,帮助产品经理理解技术边界与应用场景。能力图谱不仅是技术地图,更是方法论工具,能够有效指导从需求分析到产品落地的全流程,包括发现问题、设计闭环系统和规划产品路径。掌握这一框架将帮助AI产品经理突破技术认知局限,打造真正智能化的产品解决方案。
451 10
|
数据采集 人工智能 运维
工业巡检进入‘无人化+AI’时代:无人机智能系统的落地实践与未来
无人机智能巡检系统凭借高效性、智能化和精准性,解决了传统人工巡检效率低、成本高、漏检风险大的问题。该系统通过“空中机器人+AI分析”,实现多维度数据采集与分析,大幅提升巡检效率和准确性。广泛应用于能源、交通、工业等领域,助力运维模式升级,成为工业4.0时代基础设施运维的标配工具。
2087 19
工业巡检进入‘无人化+AI’时代:无人机智能系统的落地实践与未来
|
Cloud Native 关系型数据库 新能源
|
对象存储
milvus的compaction分析(小文件合并大文件)
milvus的compaction分析(小文件合并大文件)
1547 0