milvus的compaction分析(小文件合并大文件)

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
简介: milvus的compaction分析(小文件合并大文件)

dataCoord的Compaction分析

milvus版本:2.3.2

流程图:

compaction数据流向.jpg

compaction用来合并对象存储的小文件,将小的segment合并为大的segment。

Compaction 有一个配置项来控制是否启用自动压缩。此配置是全局的,会影响系统中的所有集合。

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true

enableAutoCompaction生效的前提是enableCompaction为true。

1.启动dataCoord:

func (s *Server) startDataCoord() {
   
   
    if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
   
   
        s.compactionHandler.start()
        s.compactionTrigger.start()
    }
    s.startServerLoop()
    s.stateCode.Store(commonpb.StateCode_Healthy)
    sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.ServerID)
}

2.进入s.compactionHandler.start()

func (c *compactionPlanHandler) start() {
   
   
    interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
    c.quit = make(chan struct{
   
   })
    c.wg.Add(1)

    go func() {
   
   
        defer c.wg.Done()
        ticker := time.NewTicker(interval)
        defer ticker.Stop()
        for {
   
   
            select {
   
   
            case <-c.quit:
                log.Info("compaction handler quit")
                return
            case <-ticker.C:
                cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
                ts, err := c.allocator.allocTimestamp(cctx)
                if err != nil {
   
   
                    log.Warn("unable to alloc timestamp", zap.Error(err))
                    cancel()
                    continue
                }
                cancel()
                _ = c.updateCompaction(ts)
            }
        }
    }()
}

CompactionCheckIntervalInSeconds默认10秒。10秒更新一次状态。

3.进入s.compactionTrigger.start()

func (t *compactionTrigger) start() {
   
   
    t.quit = make(chan struct{
   
   })
    t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
    t.wg.Add(2)
    go func() {
   
   
        defer logutil.LogPanic()
        defer t.wg.Done()

        for {
   
   
            select {
   
   
            case <-t.quit:
                log.Info("compaction trigger quit")
                return
            case signal := <-t.signals:
                switch {
   
   
                case signal.isGlobal:
                    // 处理全局compaction信号
                    t.handleGlobalSignal(signal)
                default:
                    // 处理collection级别信号
                    t.handleSignal(signal)
                    // shouldn't reset, otherwise a frequent flushed collection will affect other collections
                    // t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
                }
            }
        }
    }()
    // 触发全局compaction信号
    go t.startGlobalCompactionLoop()
}

GlobalCompactionInterval默认60秒。

这段代码启动了2个goroutine。

第1个goroutine里面,判断compaction信号(signal)是全局还是collection级别。

4.进入t.handleGlobalSignal(signal)

func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
   
   
    t.forceMu.Lock()
    defer t.forceMu.Unlock()

    log := log.With(zap.Int64("compactionID", signal.id))
    // 返回Channel-Partition维度的segment
    m := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
   
   
        // segment选择器(选择算法)
        return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) &&
            isSegmentHealthy(segment) &&
            isFlush(segment) &&
            !segment.isCompacting && // not compacting now
            !segment.GetIsImporting() // not importing now
    }) // m is list of chanPartSegments, which is channel-partition organized segments

    if len(m) == 0 {
   
   
        return
    }

    ts, err := t.allocTs()
    ......

    for _, group := range m {
   
   
        if !signal.isForce && t.compactionHandler.isFull() {
   
   
            break
        }
        if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
   
   
            group.segments = FilterInIndexedSegments(t.handler, t.meta, group.segments...)
        }

        isDiskIndex, err := t.updateSegmentMaxSize(group.segments)
        if err != nil {
   
   
            log.Warn("failed to update segment max size", zap.Error(err))
            continue
        }

        coll, err := t.getCollection(group.collectionID)
        ......

        if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
   
   
            log.RatedInfo(20, "collection auto compaction disabled",
                zap.Int64("collectionID", group.collectionID),
            )
            return
        }

        ct, err := t.getCompactTime(ts, coll)
        ......
        // 寻找需要compaction的segment,产生合并plan
        plans := t.generatePlans(group.segments, signal.isForce, isDiskIndex, ct)
        for _, plan := range plans {
   
   
            segIDs := fetchSegIDs(plan.GetSegmentBinlogs())

            if !signal.isForce && t.compactionHandler.isFull() {
   
   
                ......
            }
            start := time.Now()
            if err := t.fillOriginPlan(plan); err != nil {
   
   
                ......
            }
            // 执行compaction
            err := t.compactionHandler.execCompactionPlan(signal, plan)
            if err != nil {
   
   
                ......
            }

            segIDMap := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
            for _, seg := range plan.SegmentBinlogs {
   
   
                segIDMap[seg.SegmentID] = seg.Deltalogs
            }

            log.Info("time cost of generating global compaction",
                zap.Any("segID2DeltaLogs", segIDMap),
                zap.Int64("planID", plan.PlanID),
                zap.Int64("time cost", time.Since(start).Milliseconds()),
                zap.Int64("collectionID", signal.collectionID),
                zap.String("channel", group.channelName),
                zap.Int64("partitionID", group.partitionID),
                zap.Int64s("segmentIDs", segIDs))
        }
    }
}

5.进入t.generatePlans()

func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, isDiskIndex bool, compactTime *compactTime) []*datapb.CompactionPlan {
   
   
    // find segments need internal compaction
    // TODO add low priority candidates, for example if the segment is smaller than full 0.9 * max segment size but larger than small segment boundary, we only execute compaction when there are no compaction running actively
    var prioritizedCandidates []*SegmentInfo
    var smallCandidates []*SegmentInfo
    var nonPlannedSegments []*SegmentInfo

    // TODO, currently we lack of the measurement of data distribution, there should be another compaction help on redistributing segment based on scalar/vector field distribution
    for _, segment := range segments {
   
   
        segment := segment.ShadowClone()
        // TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted?
        // 判断是否需要SingleCompaction
        if force || t.ShouldDoSingleCompaction(segment, isDiskIndex, compactTime) {
   
   
            prioritizedCandidates = append(prioritizedCandidates, segment)
        } else if t.isSmallSegment(segment) {
   
    // 判断是否属于small segment
            smallCandidates = append(smallCandidates, segment)
        } else {
   
   
            nonPlannedSegments = append(nonPlannedSegments, segment)
        }
    }

    var plans []*datapb.CompactionPlan
    // sort segment from large to small
    // 排序,从大到小
    sort.Slice(prioritizedCandidates, func(i, j int) bool {
   
   
        if prioritizedCandidates[i].GetNumOfRows() != prioritizedCandidates[j].GetNumOfRows() {
   
   
            return prioritizedCandidates[i].GetNumOfRows() > prioritizedCandidates[j].GetNumOfRows()
        }
        return prioritizedCandidates[i].GetID() < prioritizedCandidates[j].GetID()
    })
    // 排序,从大到小
    sort.Slice(smallCandidates, func(i, j int) bool {
   
   
        if smallCandidates[i].GetNumOfRows() != smallCandidates[j].GetNumOfRows() {
   
   
            return smallCandidates[i].GetNumOfRows() > smallCandidates[j].GetNumOfRows()
        }
        return smallCandidates[i].GetID() < smallCandidates[j].GetID()
    })

    // Sort non-planned from small to large.
    // 排序,从大到小
    sort.Slice(nonPlannedSegments, func(i, j int) bool {
   
   
        if nonPlannedSegments[i].GetNumOfRows() != nonPlannedSegments[j].GetNumOfRows() {
   
   
            return nonPlannedSegments[i].GetNumOfRows() < nonPlannedSegments[j].GetNumOfRows()
        }
        return nonPlannedSegments[i].GetID() > nonPlannedSegments[j].GetID()
    })

    getSegmentIDs := func(segment *SegmentInfo, _ int) int64 {
   
   
        return segment.GetID()
    }
    // greedy pick from large segment to small, the goal is to fill each segment to reach 512M
    // we must ensure all prioritized candidates is in a plan
    // TODO the compaction selection policy should consider if compaction workload is high
    for len(prioritizedCandidates) > 0 {
   
   
        ......
    }

    getSegIDsFromPlan := func(plan *datapb.CompactionPlan) []int64 {
   
   
        var segmentIDs []int64
        for _, binLog := range plan.GetSegmentBinlogs() {
   
   
            segmentIDs = append(segmentIDs, binLog.GetSegmentID())
        }
        return segmentIDs
    }
    var remainingSmallSegs []*SegmentInfo
    // check if there are small candidates left can be merged into large segments
    for len(smallCandidates) > 0 {
   
   
        var bucket []*SegmentInfo
        // pop out the first element
        segment := smallCandidates[0]
        bucket = append(bucket, segment)
        // 下标为0的是最大的segment
        smallCandidates = smallCandidates[1:]

        var result []*SegmentInfo
        free := segment.GetMaxRowNum() - segment.GetNumOfRows()
        // for small segment merge, we pick one largest segment and merge as much as small segment together with it
        // Why reverse?     try to merge as many segments as expected.
        // for instance, if a 255M and 255M is the largest small candidates, they will never be merged because of the MinSegmentToMerge limit.
        // result为需要合并的segments
        // MaxSegmentToMerge默认为30
        smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt()-1)
        bucket = append(bucket, result...)

        var size int64
        var targetRow int64
        for _, s := range bucket {
   
   
            size += s.getSegmentSize()
            targetRow += s.GetNumOfRows()
        }
        // only merge if candidate number is large than MinSegmentToMerge or if target row is large enough
        if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() ||
            len(bucket) > 1 && t.isCompactableSegment(targetRow, segment) {
   
   
            // 产生plan
            plan := segmentsToPlan(bucket, compactTime)
            log.Info("generate a plan for small candidates",
                zap.Int64s("plan segmentIDs", lo.Map(bucket, getSegmentIDs)),
                zap.Int64("target segment row", targetRow),
                zap.Int64("target segment size", size))
            plans = append(plans, plan)
        } else {
   
   
            remainingSmallSegs = append(remainingSmallSegs, bucket...)
        }
    }
    // Try adding remaining segments to existing plans.
    for i := len(remainingSmallSegs) - 1; i >= 0; i-- {
   
   
        s := remainingSmallSegs[i]
        if !isExpandableSmallSegment(s) {
   
   
            continue
        }
        // Try squeeze this segment into existing plans. This could cause segment size to exceed maxSize.
        for _, plan := range plans {
   
   
            if plan.TotalRows+s.GetNumOfRows() <= int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(s.GetMaxRowNum())) {
   
   
                segmentBinLogs := &datapb.CompactionSegmentBinlogs{
   
   
                    SegmentID:           s.GetID(),
                    FieldBinlogs:        s.GetBinlogs(),
                    Field2StatslogPaths: s.GetStatslogs(),
                    Deltalogs:           s.GetDeltalogs(),
                }
                plan.TotalRows += s.GetNumOfRows()
                plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinLogs)
                log.Info("small segment appended on existing plan",
                    zap.Int64("segmentID", s.GetID()),
                    zap.Int64("target rows", plan.GetTotalRows()),
                    zap.Int64s("plan segmentID", getSegIDsFromPlan(plan)),
                )

                remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...)
                break
            }
        }
    }
    // If there are still remaining small segments, try adding them to non-planned segments.
    for _, npSeg := range nonPlannedSegments {
   
   
        bucket := []*SegmentInfo{
   
   npSeg}
        targetRow := npSeg.GetNumOfRows()
        for i := len(remainingSmallSegs) - 1; i >= 0; i-- {
   
   
            // Note: could also simply use MaxRowNum as limit.
            if targetRow+remainingSmallSegs[i].GetNumOfRows() <=
                int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(npSeg.GetMaxRowNum())) {
   
   
                bucket = append(bucket, remainingSmallSegs[i])
                targetRow += remainingSmallSegs[i].GetNumOfRows()
                remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...)
            }
        }
        if len(bucket) > 1 {
   
   
            plan := segmentsToPlan(bucket, compactTime)
            plans = append(plans, plan)
            log.Info("generate a plan for to squeeze small candidates into non-planned segment",
                zap.Int64s("plan segmentIDs", lo.Map(bucket, getSegmentIDs)),
                zap.Int64("target segment row", targetRow),
            )
        }
    }
    return plans
}

t.isSmallSegment(segment) 判断是否属于small segment。

func (t *compactionTrigger) isSmallSegment(segment *SegmentInfo) bool {
   
   
    return segment.GetNumOfRows() < int64(float64(segment.GetMaxRowNum())*Params.DataCoordCfg.SegmentSmallProportion.GetAsFloat())
}

segment.GetMaxRowNum()为3441480。

Params.DataCoordCfg.SegmentSmallProportion.GetAsFloat()为0.5。

这里只考虑smallCandidates。因此执行流程如下:

func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, isDiskIndex bool, compactTime *compactTime) []*datapb.CompactionPlan {
   
   
    // find segments need internal compaction
    // TODO add low priority candidates, for example if the segment is smaller than full 0.9 * max segment size but larger than small segment boundary, we only execute compaction when there are no compaction running actively
    var prioritizedCandidates []*SegmentInfo
    var smallCandidates []*SegmentInfo
    var nonPlannedSegments []*SegmentInfo

    // TODO, currently we lack of the measurement of data distribution, there should be another compaction help on redistributing segment based on scalar/vector field distribution
    for _, segment := range segments {
   
   
        segment := segment.ShadowClone()
        // TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted?
        // 判断是否需要SingleCompaction
        if force || t.ShouldDoSingleCompaction(segment, isDiskIndex, compactTime) {
   
   
            prioritizedCandidates = append(prioritizedCandidates, segment)
        } else if t.isSmallSegment(segment) {
   
    // 判断是否属于small segment
            smallCandidates = append(smallCandidates, segment)
        } else {
   
   
            nonPlannedSegments = append(nonPlannedSegments, segment)
        }
    }

    var plans []*datapb.CompactionPlan
    // sort segment from large to small
    // 排序,从大到小
    sort.Slice(prioritizedCandidates, func(i, j int) bool {
   
   
        if prioritizedCandidates[i].GetNumOfRows() != prioritizedCandidates[j].GetNumOfRows() {
   
   
            return prioritizedCandidates[i].GetNumOfRows() > prioritizedCandidates[j].GetNumOfRows()
        }
        return prioritizedCandidates[i].GetID() < prioritizedCandidates[j].GetID()
    })
    // 排序,从大到小
    sort.Slice(smallCandidates, func(i, j int) bool {
   
   
        if smallCandidates[i].GetNumOfRows() != smallCandidates[j].GetNumOfRows() {
   
   
            return smallCandidates[i].GetNumOfRows() > smallCandidates[j].GetNumOfRows()
        }
        return smallCandidates[i].GetID() < smallCandidates[j].GetID()
    })

    // Sort non-planned from small to large.
    // 排序,从大到小
    sort.Slice(nonPlannedSegments, func(i, j int) bool {
   
   
        if nonPlannedSegments[i].GetNumOfRows() != nonPlannedSegments[j].GetNumOfRows() {
   
   
            return nonPlannedSegments[i].GetNumOfRows() < nonPlannedSegments[j].GetNumOfRows()
        }
        return nonPlannedSegments[i].GetID() > nonPlannedSegments[j].GetID()
    })

    getSegmentIDs := func(segment *SegmentInfo, _ int) int64 {
   
   
        return segment.GetID()
    }
    // greedy pick from large segment to small, the goal is to fill each segment to reach 512M
    // we must ensure all prioritized candidates is in a plan
    // TODO the compaction selection policy should consider if compaction workload is high
    for len(prioritizedCandidates) > 0 {
   
   
        ......
    }

    getSegIDsFromPlan := func(plan *datapb.CompactionPlan) []int64 {
   
   
        var segmentIDs []int64
        for _, binLog := range plan.GetSegmentBinlogs() {
   
   
            segmentIDs = append(segmentIDs, binLog.GetSegmentID())
        }
        return segmentIDs
    }
    var remainingSmallSegs []*SegmentInfo
    // check if there are small candidates left can be merged into large segments
    for len(smallCandidates) > 0 {
   
   
        var bucket []*SegmentInfo
        // pop out the first element
        segment := smallCandidates[0]
        bucket = append(bucket, segment)
        // 下标为0的是最大的segment
        smallCandidates = smallCandidates[1:]

        var result []*SegmentInfo
        free := segment.GetMaxRowNum() - segment.GetNumOfRows()
        // for small segment merge, we pick one largest segment and merge as much as small segment together with it
        // Why reverse?     try to merge as many segments as expected.
        // for instance, if a 255M and 255M is the largest small candidates, they will never be merged because of the MinSegmentToMerge limit.
        // result为需要合并的segments
        // MaxSegmentToMerge默认为30
        smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt()-1)
        bucket = append(bucket, result...)

        var size int64
        var targetRow int64
        for _, s := range bucket {
   
   
            size += s.getSegmentSize()
            targetRow += s.GetNumOfRows()
        }
        // only merge if candidate number is large than MinSegmentToMerge or if target row is large enough
        // MinSegmentToMerge默认为3
        if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() ||
            len(bucket) > 1 && t.isCompactableSegment(targetRow, segment) {
   
   
            plan := segmentsToPlan(bucket, compactTime)
            log.Info("generate a plan for small candidates",
                zap.Int64s("plan segmentIDs", lo.Map(bucket, getSegmentIDs)),
                zap.Int64("target segment row", targetRow),
                zap.Int64("target segment size", size))
            plans = append(plans, plan)
        } else {
   
   
            remainingSmallSegs = append(remainingSmallSegs, bucket...)
        }
    }
    // Try adding remaining segments to existing plans.
    for i := len(remainingSmallSegs) - 1; i >= 0; i-- {
   
   
        ......
    }
    // If there are still remaining small segments, try adding them to non-planned segments.
    for _, npSeg := range nonPlannedSegments {
   
   
        ......
    }
    return plans
}

MinSegmentToMerge默认设置为3,也就是最少需要3个segment才进行合并。

[INFO] [datacoord/compaction_trigger.go:664] ["generate a plan for small candidates"] ["plan segmentIDs"="[446150338661456776,446150338661256646,446150338661456735]"] ["target segment row"=12000] ["target segment size"=953490]

6.查看plans

plans := t.generatePlans(group.segments, signal.isForce, isDiskIndex, ct)

查看plans的值:

下面是待合并的segment:

segments.jpg

plans.jpg

7.执行compaction

t.compactionHandler.execCompactionPlan(signal, plan)

8.进入execCompactionPlan

// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
   
   
    c.mu.Lock()
    defer c.mu.Unlock()

    nodeID, err := c.chManager.FindWatcher(plan.GetChannel())
    if err != nil {
   
   
        log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err))
        return err
    }

    log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", nodeID))
    c.setSegmentsCompacting(plan, true)

    task := &compactionTask{
   
   
        triggerInfo: signal,
        plan:        plan,
        state:       pipelining,
        dataNodeID:  nodeID,
    }
    c.plans[plan.PlanID] = task
    c.executingTaskNum++

    go func() {
   
   
        log.Info("acquire queue")
        c.acquireQueue(nodeID)

        ts, err := c.allocator.allocTimestamp(context.TODO())
        if err != nil {
   
   
            log.Warn("Alloc start time for CompactionPlan failed", zap.Error(err))
            // update plan ts to TIMEOUT ts
            c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout))
            return
        }
        c.updateTask(plan.PlanID, setStartTime(ts))
        err = c.sessions.Compaction(nodeID, plan)
        c.updateTask(plan.PlanID, setState(executing))
        if err != nil {
   
   
            log.Warn("try to Compaction but DataNode rejected", zap.Error(err))
            // do nothing here, prevent double release, see issue#21014
            // release queue will be done in `updateCompaction`
            return
        }
        log.Info("start compaction")
    }()
    return nil
}

调用堆栈:

execCompactionPlan()(internal\datacoord\compaction.go)
  |--c.sessions.Compaction(nodeID, plan)(同上)
    |--cli.Compaction(ctx, plan)(internal\datacoord\session_manager.go)
      |--Compaction()(internal\datanode\services.go)
        |--compactionTask.compact()(internal\datanode\compactor.go)

总结:

compaction相关参数:

dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
dataCoord.compaction.indexBasedCompaction = true
dataCoord.compaction.global.interval = 60 #默认60秒,触发compaction信号
dataCoord.compaction.check.interval = 10 #默认10秒,更新状态
dataCoord.segment.smallProportion = 0.5 #默认0.5
dataCoord.compaction.max.segment = 30 #默认30
dataCoord.compaction.min.segment = 3 #默认3
目录
相关文章
|
6月前
|
存储 消息中间件 Kafka
Hudi 压缩(Compaction)实现分析
Hudi 压缩(Compaction)实现分析
233 1
|
1月前
|
easyexcel Java UED
SpringBoot中大量数据导出方案:使用EasyExcel并行导出多个excel文件并压缩zip后下载
在SpringBoot环境中,为了优化大量数据的Excel导出体验,可采用异步方式处理。具体做法是将数据拆分后利用`CompletableFuture`与`ThreadPoolTaskExecutor`并行导出,并使用EasyExcel生成多个Excel文件,最终将其压缩成ZIP文件供下载。此方案提升了导出效率,改善了用户体验。代码示例展示了如何实现这一过程,包括多线程处理、模板导出及资源清理等关键步骤。
|
3月前
|
分布式计算 并行计算 数据处理
|
XML JSON 搜索推荐
PHP ZipArchive 大文件分片下载压缩 支持断点续传
PHP ZipArchive 大文件分片下载压缩 支持断点续传
188 0
|
NoSQL 大数据 分布式数据库
【HBase】(6)-Compact合并StoreFile流程
【HBase】(6)-Compact合并StoreFile流程
261 0
【HBase】(6)-Compact合并StoreFile流程
|
分布式计算 关系型数据库 大数据
大批量处理excel文件到ODPS中方案
大批量处理excel文件到ODPS中方案 前言 最近所做的项目需要将近五千个excel文件导入到ODPS,并且excel文件内容格式全部不一样。 刚接到需求时特别的懵,不过工作还是要做的,首先通过查询dataworks中文档来看怎么实现该需求,通过文档了解到只支持csv、txt等格式进行单个上传,然后就开始着手做。
4260 0
|
存储 人工智能 并行计算
以加速 compaction 和 scan 为例:谈 GPU 与 LSM-tree 的优化
以加速 compaction 和 scan 为例:谈 GPU 与 LSM-tree 的优化
413 0
以加速 compaction 和 scan 为例:谈 GPU 与 LSM-tree 的优化
|
SQL 存储 缓存
8月27日 Spark 社区直播【OAP Spark 优化介绍: 通过索引和缓存优化交互式查询性能】
简单介绍OAP的总体蓝图。同时详细介绍其中的一个具体优化,使用索引和缓存来解决交互式查询性能挑战。
8月27日 Spark 社区直播【OAP Spark 优化介绍: 通过索引和缓存优化交互式查询性能】
|
分布式计算 Java 大数据
自适应查询执行AQE:在运行时加速SparkSQL
SPARK+AI SUMMIT 2020中文精华版线上峰会将会带领大家一起回顾2020年的SPARK又产生了怎样的最佳实践,技术上取得了哪些突破,以及周边的生态发展。本文是阿里巴巴云智能平台事业部王道远关于Spark3.0中自适应查询执行(AQE)的相关介绍。以下由Spark+AI Summit中文精华版峰会的精彩内容整理。
2520 0
自适应查询执行AQE:在运行时加速SparkSQL