LotusDB 设计与实现—3 内存 memtable

简介: 顾名思义,memtable 是内存中维护的组件,在 LSM Tree 存储模型中,memtable 相当于一块内存 buffer,数据写入到 WAL 后,然后在 memtable 中更新。memtable 的数据积累到一定的阈值之后,批量 Flush 到磁盘,这样做的目的是延迟写磁盘,并且将随机的 IO 写入转换为批量的顺序 IO,这也是 LSM 存储模型的核心思路。
LotusDB 是一个全新的 KV 存储引擎,Github 地址: https://github.com/flower-corp/lotusdb,希望大家多多支持呀,点个 star 或者参与进来!


顾名思义,memtable 是内存中维护的组件,在 LSM Tree 存储模型中,memtable 相当于一块内存 buffer,数据写入到 WAL 后,然后在 memtable 中更新。memtable 的数据积累到一定的阈值之后,批量 Flush 到磁盘,这样做的目的是延迟写磁盘,并且将随机的 IO 写入转换为批量的顺序 IO,这也是 LSM 存储模型的核心思路。


内存中的 memtable 一般会有多个,一个 memtable 写满之后,会转为不可变的 memtable,不可变的 memtable 不能接收新的写入,并且等待被后台线程 Flush 持久化到磁盘。


LotusDB 的一个 Column Family 结构体中,维护了最新的 memtable,记为 activeMem,以及不可变的 memtable 数组,即 immuMems。

Column Family 这个命名借鉴于 rocksdb,表示一个 key/value 的命名空间,可以理解为表的概念。


从 Column Family 的结构体定义中可以查看:

// ColumnFamily is a namespace of keys and values.
// Each key/value pair in LotusDB is associated with exactly one Column Family.
// If no Column Family is specified, key-value pair is associated with Column Family "cf_default".
// Column Families provide a way to logically partition the database.
type ColumnFamily struct {
  // Active memtable for writing.
  activeMem *memtable
  // Immutable memtables, waiting to be flushed to disk.
  immuMems []*memtable
  // Value Log(Put value into value log according to options ValueThreshold).
  vlog *valueLog
  // Store keys and meta info.
  indexer index.Indexer
  // When the active memtable is full, send it to the flushChn, see listenAndFlush.
  flushChn  chan *memtable
  flushLock sync.RWMutex // guarantee flush and compaction exclusive.
    // ...
}


当有数据写入时,需要判断当前 memtable 是否已满,如果已满,则将 memtable 存放到 immuMems 这个 slice 里面,然后将 memtable 通过 channel 发送,由 channel 的另一端进行 flush 工作。发送到 channel 之后,需要创建一个新的 memtable 做为 activeMem。


大致的逻辑如下代码:

func (cf *ColumnFamily) waitWritesMemSpace(size uint32) error {
    // check whether memtable is full
  if !cf.activeMem.isFull(size) {
    return nil
  }
  timer := time.NewTimer(cf.opts.MemSpaceWaitTimeout)
  defer timer.Stop()
  select {
  case cf.flushChn <- cf.activeMem:
    cf.immuMems = append(cf.immuMems, cf.activeMem)
    // open a new active memtable.
        // ...
    if table, err := openMemtable(memOpts); err != nil {
      return err
    } else {
      cf.activeMem = table
    }
  case <-timer.C:
    return ErrWaitMemSpaceTimeout
  }
  return nil
}

如果当前 memtable 容量和数量都达到了最大值, 不可变的 memtable 还来不及 flush,这时候新的写入需要阻塞等待,等待的超时时间可以配置,默认是 100ms。

但实际上,这种情况发生的概率较低,只有在 memtable 阈值小、数量少,并且有大量写入的情况下才有可能发生,如果在写入的过程中遇到了类似 wait memtable space timeout 的错误,建议调大 memtable 的阈值,或者增加超时时间的配置。


需要注意在 memtable 中,如果是删除数据,那么实际上也是添加记录,并不会真正去执行删除,只是添加的记录加上了一个特殊的标记,一般称为墓碑值。

具体在 LotusDB 里面,添加记录操作的是 LogEntry 这个结构体,所以在里面加上了一个 Type 字段,标识这个 LogEntry 数据的类型,然后再添加到 memtable 中:

// put new writes to memtable.
func (mt *memtable) put(key []byte, value []byte, deleted bool, opts WriteOptions) error {
  entry := &logfile.LogEntry{
    Key:   key,
    Value: value,
  }
  if opts.ExpiredAt > 0 {
    entry.ExpiredAt = opts.ExpiredAt
  }
  if deleted {
    entry.Type = logfile.TypeDelete
  }
    // ....
}


在 channel 的另一端(listenAndFlush 方法中),启动了一个 goroutine 进行监听,如果有新的 memtable 数据进来,则会开始 flush 操作。LotusDB 的 memtable 的具体数据结构采用的是跳表,所以就取出对应的跳表的迭代器,从头开始遍历跳表中的数据。

这里需要注意判断,如果 memtable 中的 key 被标记为删除或已过期的话,也需要记录一下,否则,则说明是有效的 key/value 数据,那么便会将数据追加写到 value log 中,获取到对应数据的索引信息,即文件 id 和位置偏移 offset。

for iter.SeekToFirst(); iter.Valid(); iter.Next() {
    key := iter.Key()
    node := &index.IndexerNode{Key: key}
    mv := decodeMemValue(iter.Value())
    // delete invalid keys from indexer.
    if mv.typ == byte(logfile.TypeDelete) || (mv.expiredAt != 0 && mv.expiredAt <= time.Now().Unix()) {
        deletedKeys = append(deletedKeys, key)
    } else {
        // wiret data into value log.
        valuePos, esize, err := cf.vlog.Write(&logfile.LogEntry{
            Key:       key,
            Value:     mv.value,
            ExpiredAt: mv.expiredAt,
        })
        if err != nil {
            return err
        }
        node.Meta = &index.IndexerMeta{
            Fid:       valuePos.Fid,
            Offset:    valuePos.Offset,
            EntrySize: esize,
        }
        nodes = append(nodes, node)
    }
}


遍历完 memtable 中的数据之后,再调用索引提供的方法,针对无效的数据进行批量删除,针对有效的数据进行批量添加,完成之后需要调用 Sync 方法持久化写入的内容,保证数据的一致性。

func (cf *ColumnFamily) flushUpdateIndex(nodes []*index.IndexerNode, keys [][]byte) error {
  cf.flushLock.Lock()
  defer cf.flushLock.Unlock()
  // must put and delete in batch.
  writeOpts := index.WriteOptions{SendDiscard: true}
  if _, err := cf.indexer.PutBatch(nodes, writeOpts); err != nil {
    return err
  }
  if len(keys) > 0 {
    if err := cf.indexer.DeleteBatch(keys, writeOpts); err != nil {
      return err
    }
  }
  // must fsync before delete wal.
  if err := cf.indexer.Sync(); err != nil {
    return err
  }
  return nil
}


Flush 完成之后,便会将 immuMems 这个数组中的 memtable 删除掉,给后续新的 memtable 空出位置。


memtable 相关的配置项:

ColumnFamilyOptions:

MemtableSize:一个 memtable 所占内存空间的阈值,默认 64MB

MemtableNums:最多可存在的 memtable 的数量,默认 5

MemSpaceWaitTimeout:等待 memtable 空闲空间的超时,默认 100ms


相关文章
|
4月前
|
存储 分布式计算 Hadoop
HadoopCPU、内存、存储限制
【7月更文挑战第13天】
277 14
|
3月前
|
存储 编译器 C语言
【C语言篇】数据在内存中的存储(超详细)
浮点数就采⽤下⾯的规则表⽰,即指数E的真实值加上127(或1023),再将有效数字M去掉整数部分的1。
366 0
|
21天前
|
存储 C语言
数据在内存中的存储方式
本文介绍了计算机中整数和浮点数的存储方式,包括整数的原码、反码、补码,以及浮点数的IEEE754标准存储格式。同时,探讨了大小端字节序的概念及其判断方法,通过实例代码展示了这些概念的实际应用。
43 1
|
25天前
|
存储
共用体在内存中如何存储数据
共用体(Union)在内存中为所有成员分配同一段内存空间,大小等于最大成员所需的空间。这意味着所有成员共享同一块内存,但同一时间只能存储其中一个成员的数据,无法同时保存多个成员的值。
|
30天前
|
存储 弹性计算 算法
前端大模型应用笔记(四):如何在资源受限例如1核和1G内存的端侧或ECS上运行一个合适的向量存储库及如何优化
本文探讨了在资源受限的嵌入式设备(如1核处理器和1GB内存)上实现高效向量存储和检索的方法,旨在支持端侧大模型应用。文章分析了Annoy、HNSWLib、NMSLib、FLANN、VP-Trees和Lshbox等向量存储库的特点与适用场景,推荐Annoy作为多数情况下的首选方案,并提出了数据预处理、索引优化、查询优化等策略以提升性能。通过这些方法,即使在资源受限的环境中也能实现高效的向量检索。
|
1月前
|
存储 编译器
数据在内存中的存储
数据在内存中的存储
41 4
|
1月前
|
存储 Java
JVM知识体系学习四:排序规范(happens-before原则)、对象创建过程、对象的内存中存储布局、对象的大小、对象头内容、对象如何定位、对象如何分配
这篇文章详细地介绍了Java对象的创建过程、内存布局、对象头的MarkWord、对象的定位方式以及对象的分配策略,并深入探讨了happens-before原则以确保多线程环境下的正确同步。
53 0
JVM知识体系学习四:排序规范(happens-before原则)、对象创建过程、对象的内存中存储布局、对象的大小、对象头内容、对象如何定位、对象如何分配
|
1月前
|
存储 机器学习/深度学习 人工智能
数据在内存中的存储
数据在内存中的存储
|
1月前
|
存储 C语言
深入C语言内存:数据在内存中的存储
深入C语言内存:数据在内存中的存储