阿里云InfluxDB®之snapshot及其内存优化

简介: 作为阿里在APM和IOT领域的重要布局,时序数据库承载着阿里对于物理网和未来应用监控市场的未来和排头兵,作为业内排名第一的时序数据库InfluxDB,其在国内和国际都拥有了大量的用户,阿里适逢其时,重磅推出了阿里云 InfluxDB®。

简介

作为阿里在APM和IOT领域的重要布局,时序数据库承载着阿里对于物理网和未来应用监控市场的未来和排头兵,作为业内排名第一的时序数据库InfluxDB,其在国内和国际都拥有了大量的用户,阿里适逢其时,重磅推出了阿里云 InfluxDB®。
         限于篇幅,本文仅就InfluxDB的其中一个模块:snapshot,对其机制和内存使用的优化进行分析。

为什么要做snapshot

InfluxDB采用的是TSM引擎,TSM 存储引擎主要由几个部分组成: cache、wal、tsm file、compactor
s1

TSM存储引擎,其核心思想类似于LSM Tree,它会将最近的数据缓存在磁盘中,在达到预设的阈值之后就会触发snapshot,也就是我们常说的快照刷盘。
内存的作用是为了缓存,加速查询。snapshot主要是解决数据持久化落盘问题。

Snapshot的工作机制

由于snapshot是将cache中的数据刷到磁盘,那么首先,我们来看一下Cache的内部结构。

Cache的内部结构

s2

如上图所示,每一个cache内部划分成了16个partition。每1个partition内部包含一个map,所有的map其key为SeriesKey,value为entry。

// Value represents a TSM-encoded value.
type Value interface {
    // UnixNano returns the timestamp of the value in nanoseconds since unix epoch.
    UnixNano() int64

    // Value returns the underlying value.
    Value() interface{}

    // Size returns the number of bytes necessary to represent the value and its timestamp.
    Size() int

    // String returns the string representation of the value and its timestamp.
    String() string

    // internalOnly is unexported to ensure implementations of Value
    // can only originate in this package.
    internalOnly()
}

// Values represents a slice of  values.
type Values []Value

// entry is a set of values and some metadata.
type entry struct {
    mu     sync.RWMutex
    values Values // All stored values.
    vtype byte
}

entry是一个Value类型的数组,Value本身是一个接口,按照值类型的不同分为:FloatValue、StringValue、BooleanValue、IntegerValue、FloatValue、StringValue。 
以FloatValue为例,每一种类型的Value包含了一个int类型的时间戳和具体的值value。

type FloatValue struct {
    unixnano int64
    value    float64
}

Snapshot的流程

从代码层面上来讲,从总体的概要流程如下:
s3

下面让我们来逐个分析下:

Snapshot的入口

if e.enableCompactionsOnOpen {
   e.SetCompactionsEnabled(true)
}

Snapshot的机制

// compactCache continually checks if the WAL cache should be written to disk.
func (e *Engine) compactCache() {
    t := time.NewTicker(time.Second)
    defer t.Stop()
    for {
        e.mu.RLock()
        quit := e.snapDone
        e.mu.RUnlock()

        select {
        case <-quit:
            tsdb.UpdateCacheSize(e.id, 0, e.logger)
            return

        case <-t.C:
            e.Cache.UpdateAge()
            tsdb.UpdateCacheSize(e.id, e.Cache.Size(), e.logger)
            if e.ShouldCompactCache(time.Now()) {
                start := time.Now()
                e.traceLogger.Info("Compacting cache", zap.String("path", e.path))
                err := e.WriteSnapshot()
                if err != nil && err != errCompactionsDisabled {
                    e.logger.Info("Error writing snapshot", zap.Error(err))
                    atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
                } else {
                    atomic.AddInt64(&e.stats.CacheCompactions, 1)
                }
                atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds())
            }
        }
    }
}

每隔1秒钟检查一次,是否达到snapshot的条件。
snapshot的条件有两个:
是否达到配置的阈值。(默认情况下是25M)
离上次snapshot的间隔是否超越:cache-snapshot-write-cold-duration的配置。(默认情况下是10min)

Snapshot的具体实现

那么,这里涉及两个问题:

1、落盘的文件格式如何?
2、snapshot刷盘的过程本身是如何进行的?
我们先看落盘的文件格式:

/>

TSM文件包括了三个部分:Series Data Section、Series Index Section、 Footer。

1、Series Data Section生成:
Series Data Section有若干个Series Data Block组成。
其中对于Series Data Block,这个是在内存中完成组装的,具体是有cacheKeyIterator.encode函数完成,代码如下:

func (c *cacheKeyIterator) encode() {
    concurrency := runtime.GOMAXPROCS(0)
    n := len(c.ready)

    // Divide the keyset across each CPU
    chunkSize := 1
    idx := uint64(0)

    for i := 0; i < concurrency; i++ {
        // Run one goroutine per CPU and encode a section of the key space concurrently
        go func() {
            tenc := getTimeEncoder(tsdb.DefaultMaxPointsPerBlock)
            fenc := getFloatEncoder(tsdb.DefaultMaxPointsPerBlock)
            benc := getBooleanEncoder(tsdb.DefaultMaxPointsPerBlock)
            uenc := getUnsignedEncoder(tsdb.DefaultMaxPointsPerBlock)
            senc := getStringEncoder(tsdb.DefaultMaxPointsPerBlock)
            ienc := getIntegerEncoder(tsdb.DefaultMaxPointsPerBlock)

            defer putTimeEncoder(tenc)
            defer putFloatEncoder(fenc)
            defer putBooleanEncoder(benc)
            defer putUnsignedEncoder(uenc)
            defer putStringEncoder(senc)
            defer putIntegerEncoder(ienc)

            for {
                i := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize

                if i >= n {
                    break
                }

                key := c.order[i]
                values := c.cache.values(key)

                for len(values) > 0 {

                    end := len(values)
                    if end > c.size {
                        end = c.size
                    }

                    minTime, maxTime := values[0].UnixNano(), values[end-1].UnixNano()
                    var b []byte
                    var err error

                    switch values[0].(type) {
                    case FloatValue:
                        b, err = encodeFloatBlockUsing(nil, values[:end], tenc, fenc)
                    case IntegerValue:
                        b, err = encodeIntegerBlockUsing(nil, values[:end], tenc, ienc)
                    case UnsignedValue:
                        b, err = encodeUnsignedBlockUsing(nil, values[:end], tenc, uenc)
                    case BooleanValue:
                        b, err = encodeBooleanBlockUsing(nil, values[:end], tenc, benc)
                    case StringValue:
                        b, err = encodeStringBlockUsing(nil, values[:end], tenc, senc)
                    default:
                        b, err = Values(values[:end]).Encode(nil)
                    }

                    values = values[end:]

                    c.blocks[i] = append(c.blocks[i], cacheBlock{
                        k:       key,
                        minTime: minTime,
                        maxTime: maxTime,
                        b:       b,
                        err:     err,
                    })

                    if err != nil {
                        c.err = err
                    }
                }
                // Notify this key is fully encoded
                c.ready[i] <- struct{}{}
            }
        }()
    }
}

其中针对几个不同的数据类型分别通过不同Encoder来进行组装,最后是形成了一个cacheBlock的二维数组,并保存在iter当中。
接下来的问题就是如何将这些二维数组刷盘。

2、依次刷盘
前面我们已知iter中保留有这些cacheBlock,我们只需要遍历迭代器就可以将这些数据刷盘。但还有一个问题,Series Index Section如何生成?
因为Series Index Section最终由IndexEntry构成,而IndexEntry中minTime和maxTime、Size都可以由cacheBlock的数据得到,关键是Offset。
其实Offset的计算是随着迭代的过程,不断地往前走,就像在一个Buffer中,填满了一个Series Data Block,就会更新一次Offset。

关键的代码如下:

    n, err := t.w.Write(block) // Write的过程中,会更新t.n,也就是offset
    if err != nil {
        return err
    }
    n += len(checksum)

    // Record this block in index
    t.index.Add(key, blockType, minTime, maxTime, t.n, uint32(n)) //t.n 就是offset

总结以上所述,大体的过程是,encode生层Series Data Block, 在迭代过程中,生成了Series Index Section,最终,将Series Index Section Append到Series Data Block 就生成了TSM文件。

那么问题来了Series Index Section的保存时需要空间的,如果是Series Index Section占用的内存过大,则可能会因此加大了程序OOME的风险。

snapshot内存使用的优化

如果有n个cache,同时做snapshot。 则耗费的内存为: n IndexSize。 
例如:5db
4 retention, IndexSize = 50m。 则节省:5 4 50 = 1G的内存使用量。

所以,我们可以想到的一个优化点是:在snapshot过程当中利用文件来做Series Index Section的暂存区,从而节省这一部分内存。

验证

当我们利用磁盘来做Index的缓冲区时,系统在snapshot的过程中,会生成1个临时的索引文件,如下图所示。
s5

而不用磁盘来做Index缓冲区的时候,则不会生成这个文件,如下图所示。
s6

经过我们的长时间的稳定性测试,证实,在利用磁盘来做Index的缓冲区时,能有效降低系统大压力下的OOME概率。

商业化

阿里云InfluxDB®现已正式商业化,欢迎访问购买页面(https://common-buy.aliyun.com/?commodityCode=hitsdb_influxdb_pre#/buy)与文档(https://help.aliyun.com/document_detail/113093.html?spm=a2c4e.11153940.0.0.57b04a02biWzGa)。

目录
相关文章
|
6月前
|
存储 安全 数据库
阿里云服务器计算型、通用型、内存型主要实例规格性能特点和适用场景汇总
阿里云服务器ECS计算型、通用型、内存型规格族属于独享型云服务器,在高负载不会出现计算资源争夺现象,因为每一个vCPU都对应一个Intel ® Xeon ®处理器核心的超线程,具有性能稳定且资源独享的特点。本文为大家整理汇总了阿里云服务器ECS计算型、通用型、内存型主要实例规格族具体实例规格有哪些,各个实例规格的性能特点和主要适用场景。
阿里云服务器计算型、通用型、内存型主要实例规格性能特点和适用场景汇总
|
7月前
|
弹性计算 安全 前端开发
阿里云服务器ECS通用型、计算型和内存型详细介绍和性能参数表
阿里云ECS实例有计算型(c)、通用型(g)和内存型(r)三种,主要区别在于CPU和内存比例。计算型CPU内存比1:2,如2核4G;通用型为1:4,如2核8G;内存型为1:8,如2核16G。随着技术迭代,有第五代至第八代产品,如c7、g5、r8a等。每代实例在CPU型号和主频上相同,但性能有所提升。实例性能参数包括网络带宽、收发包能力、连接数等。具体应用场景如计算型适合高网络包收发、通用型适合企业级应用,内存型适合内存数据库等。详细信息可参阅阿里云ECS页面。
400 0
|
2月前
|
弹性计算
阿里云2核16G云服务器多少钱?亲测ECS内存型r8i租赁价格
阿里云2核16G云服务器,内存型r8i实例1年6折优惠后价格为1901元,月付334.19元,按小时计费0.696221元。更多配置及优惠详情,请访问阿里云ECS页面。
|
3月前
|
存储 机器学习/深度学习 应用服务中间件
阿里云倚天云服务器实例:计算型c8y、通用型g8y、内存型r8y实例介绍
阿里云倚天云服务器是基于阿里云自研的倚天710 ARM架构CPU打造的高性能计算产品系列,它依托先进的第四代神龙架构,旨在为用户提供稳定可预期的超高效能体验。倚天云服务器在存储、网络性能及计算稳定性方面实现了显著提升,主要得益于其芯片级的快速路径加速技术。本文将深度解析阿里云倚天云服务器的计算型c8y、通用型g8y、内存型r8y实例,探讨其优势及适用场景,以供选择参考。
|
8月前
|
存储 弹性计算 固态存储
阿里云服务器CPU内存配置详细指南,如何选择合适云服务器配置?
阿里云服务器配置选择涉及CPU、内存、公网带宽和磁盘。个人开发者或中小企业推荐使用轻量应用服务器或ECS经济型e实例,如2核2G3M配置,适合低流量网站。企业用户则应选择企业级独享型ECS,如通用算力型u1、计算型c7或通用型g7,至少2核4G配置,公网带宽建议5M,系统盘可选SSD或ESSD云盘。选择时考虑实际应用需求和性能稳定性。
1020 6
|
3月前
|
弹性计算 关系型数据库 数据安全/隐私保护
阿里云国际版如何配置Windows服务器的虚拟内存
阿里云国际版如何配置Windows服务器的虚拟内存
|
5月前
|
弹性计算 固态存储 ice
阿里云服务器ECS内存型r8i、通用算力u1、r7、AMD内存r8a、高主频内存hfr8i价格和性能差异
2024年阿里云提供2核16G、4核32G及8核64G等多种服务器配置,用户可根据需求选择不同实例规格如内存型r8i、通用算力型u1等。以华北2(北京)为例,2核16G月费从286.2元起,4核32G从572.4元起,8核64G则从1144.8元起。公网带宽1Mbps预付费为23元/月,系统盘如ESSD PL1按量计费0.0021元/小时/GiB。具体价格与折扣请参考阿里云官网。
|
6月前
|
存储 缓存 安全
阿里云服务器实例规格选择参考:经济型、通用算力型、计算型、通用型、内存型区别
当我们在通过阿里云的各种活动选择云服务器实例规格的时候会发现,相同配置的云服务器往往有多个不同的实例可选,而且价格差别也比较大,这会是因为不同实例规格的由于采用的处理器不同,底层架构也有所不同(例如X86 计算架构与Arm 计算架构),因此不同实例的云服务器其性能与适用场景是有所不同。目前阿里云的活动中,主要的实例规格可分为经济型、通用算力型、计算型、通用型、内存型,对于很多初次接触阿里云服务器的用户来说,了解他们之间的差别就是比较重要的了,下面小编来为大家简单介绍下它们之间的区别。
阿里云服务器实例规格选择参考:经济型、通用算力型、计算型、通用型、内存型区别
|
6月前
|
存储 弹性计算 固态存储
阿里云服务器CPU内存配置怎么选?ECS实例规格有啥区别?
阿里云服务器配置选择需考虑ECS实例规格、CPU内存、公网带宽与系统盘。个人开发者或中小企业推荐轻量应用服务器或ECS经济型e实例(2核2G3M带宽,99元/年),适合搭建低流量网站。企业用户应选择企业级独享型如通用算力型u1、计算型c7或通用型g7实例,至少2核4G内存起,推荐5M公网带宽以平衡成本与性能。系统盘推荐ESSD云盘以获得更好的性能。更多详情及链接参见原文。
136 3
|
6月前
|
存储 弹性计算 程序员
新手程序员如何阿里云服务器配置?新人开发者CPU内存带宽存储怎么选?
对于新手开发者、个人或学生选择阿里云服务器,推荐ECS经济型e实例(ecs.e-c1m1.large),适用于小型网站或轻量应用。配置2核2G内存、3M固定带宽、40G ESSD系统盘,仅99元/年且续费同价。

热门文章

最新文章