dataCoord的Compaction分析2

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: dataCoord的Compaction分析2

dataCoord的Compaction分析2

milvus版本:2.3.2

流程图:

compaction数据流向.jpg

compaction用来合并对象存储的小文件,将小的segment合并为大的segment。

Compaction 有一个配置项来控制是否启用自动压缩。此配置是全局的,会影响系统中的所有集合。

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true

enableAutoCompaction生效的前提是enableCompaction为true。

增加了collection级别的控制。

compaction相关参数(全局):

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
dataCoord.compaction.indexBasedCompaction = true
dataCoord.compaction.global.interval = 60 #默认60秒,触发compaction信号
dataCoord.compaction.check.interval = 10 #默认10秒,更新状态
dataCoord.segment.smallProportion = 0.5 #默认0.5
dataCoord.compaction.max.segment = 30 #默认30
dataCoord.compaction.min.segment = 3 #默认3

在collection级别设置属性:

collection.autocompaction.enabled = true

python设置代码:

hello_milvus.set_properties({
   
   "collection.autocompaction.enabled": True})

1.执行compaction的代码

func (t *compactionTrigger) start() {
   
   
    t.quit = make(chan struct{
   
   })
    t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
    t.wg.Add(2)
    go func() {
   
   
        defer logutil.LogPanic()
        defer t.wg.Done()

        for {
   
   
            select {
   
   
            case <-t.quit:
                log.Info("compaction trigger quit")
                return
            case signal := <-t.signals:
                switch {
   
   
                case signal.isGlobal:
                    // 处理全局compaction信号
                    t.handleGlobalSignal(signal)
                default:
                    // collection级别信号
                    t.handleSignal(signal)
                    // shouldn't reset, otherwise a frequent flushed collection will affect other collections
                    // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
                }
            }
        }
    }()
    // 触发全局compaction信号
    go t.startGlobalCompactionLoop()
}

t.handleGlobalSignal(signal) 用来处理全局信号。

t.handleSignal(signal) 用来处理collection级别的信号。

go t.startGlobalCompactionLoop() 定时触发全局信号。

collection信号在flush的时候触发。

2.触发全局

// triggerCompaction trigger a compaction if any compaction condition satisfy.
func (t *compactionTrigger) triggerCompaction() error {
   
   
    id, err := t.allocSignalID()
    if err != nil {
   
   
        return err
    }
    signal := &compactionSignal{
   
   
        id:       id,
        isForce:  false,
        isGlobal: true,
    }
    t.signals <- signal
    return nil
}

3.触发collection级别

// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
   
   
    // If AutoCompaction disabled, flush request will not trigger compaction
    if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
   
   
        return nil
    }

    id, err := t.allocSignalID()
    if err != nil {
   
   
        return err
    }
    signal := &compactionSignal{
   
   
        id:           id,
        isForce:      false,
        isGlobal:     false,
        collectionID: collectionID,
        partitionID:  partitionID,
        segmentID:    segmentID,
        channel:      channel,
    }
    t.signals <- signal
    return nil
}

调用堆栈:

SaveBinlogPaths()(internal\datacoord\services.go)
  |--s.compactionTrigger.triggerSingleCompaction()

当调用flush的时候触发。

// SaveBinlogPaths updates segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
   
   
    ......

    if req.GetFlushed() {
   
   
        s.segmentManager.DropSegment(ctx, req.SegmentID)
        s.flushCh <- req.SegmentID

        if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
   
   
            err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
                segmentID, segment.GetInsertChannel())
            if err != nil {
   
   
                log.Warn("failed to trigger single compaction")
            } else {
   
   
                log.Info("compaction triggered for segment")
            }
        }
    }
    return merr.Success(), nil
}

4.进入handleSignal()

// handleSignal processes segment flush caused partition-chan level compaction signal
func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
   
   
    ......

    if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
   
   
        log.RatedInfo(20, "collection auto compaction disabled",
            zap.Int64("collectionID", collectionID),
        )
        return
    }

    ......

    plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct)
    for _, plan := range plans {
   
   
        ......
    }
}

isCollectionAutoCompactionEnabled()判断是否设置collection级别。

func (t *compactionTrigger) isCollectionAutoCompactionEnabled(coll *collectionInfo) bool {
   
   
    enabled, err := getCollectionAutoCompactionEnabled(coll.Properties)
    if err != nil {
   
   
        log.Warn("collection properties auto compaction not valid, returning false", zap.Error(err))
        return false
    }
    return enabled
}

进入getCollectionAutoCompactionEnabled():

// getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled.
// if not set, returns global auto compaction config.
func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) {
   
   
    v, ok := properties[common.CollectionAutoCompactionKey]
    if ok {
   
   
        enabled, err := strconv.ParseBool(v)
        if err != nil {
   
   
            return false, err
        }
        return enabled, nil
    }
    return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil
}

common.CollectionAutoCompactionKey=collection.autocompaction.enabled

目录
相关文章
|
8月前
|
移动开发 分布式计算 Spark
Spark的几种去重的原理分析
Spark的几种去重的原理分析
152 0
|
8月前
|
存储 消息中间件 Kafka
Hudi 压缩(Compaction)实现分析
Hudi 压缩(Compaction)实现分析
276 1
|
8月前
|
分布式计算 测试技术 Apache
探索Apache Hudi核心概念 (3) - Compaction
探索Apache Hudi核心概念 (3) - Compaction
145 5
|
7月前
|
缓存 开发者 索引
深入解析 `org.elasticsearch.action.search.SearchRequest` 类
深入解析 `org.elasticsearch.action.search.SearchRequest` 类
|
8月前
|
存储 测试技术 API
Apache Hudi 负载类Payload使用案例剖析
Apache Hudi 负载类Payload使用案例剖析
157 4
|
8月前
|
分布式计算 Apache 调度
Apache Hudi 异步Compaction部署方式汇总
Apache Hudi 异步Compaction部署方式汇总
150 0
|
8月前
|
对象存储
milvus的compaction分析(小文件合并大文件)
milvus的compaction分析(小文件合并大文件)
1037 0
|
NoSQL 算法 分布式数据库
Cassandra Leveled Compaction Strategy
这篇文章19年写的,重新发到开发者社区翻新下。
149 0
Cassandra Leveled Compaction Strategy
|
SQL 分布式计算 大数据
【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析(一)
【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析(一)
695 0
【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析(一)
|
分布式计算 Spark
【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析(二)
【spark系列3】spark 3.0.1 AQE(Adaptive Query Exection)分析(二)
454 0