LSM-Tree - LevelDb 源码解析(一)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: LSM-Tree - LevelDb 源码解析(一)

LSM-Tree - LevelDb 源码解析(一)

在上一篇文章[[LSM-Tree - LevelDb了解和实现]]中介绍了LevelDb,并且介绍了相关的数据结构和核心组件,最终介绍了有关LevelDB的核心读写部分以及为什么在这个数据库中写入的速度要比读取的速度快上好几倍。

这一节代码内容非常多,所以不建议在手机或者移动设备阅读,更适合在PC上观看。

LevelDB的源代码还是比较好懂的,好懂到我这个主要学JAVA只有基础C语言知识的人也能看懂,另一方面作者在关键的地方都给了注释,甚至告诉你为什么要这么设计,写的很好很棒让人落泪为什么自己没这样的同事,如果还是看不懂,作者也写了很多介绍文档在doc目录中告诉你核心组件的作用介绍。

总之,不要惧怕这个数据库,作为优秀代码和设计模式以及数据结构应用范本都非常值得学习和参考。

源码运行

LevelDB的编译是比较简单的,可以从官网直接克隆代码。github.com/google/leve…,具体操作步骤如下(可以参考源代码中的README文件):

git clone --recurse-submodules https://github.com/google/leveldb.git
mkdir -p build && cd build
cmake -DCMAKE_BUILD_TYPE=Release .. && cmake --build .

完成整个编译动作之后,我们可以新增一个动态库,一个静态库和test目录,接着就可以编写单元测试了,同时官方的源代码中有很多的单元测试可以提供自己编写的测试程序进行调试使用。

底层存储存储结构

除开某些元数据的存储,SSTable是整个数据库的主要结构,所有的SSTable文件本身的内容是不可修改的,虽然通常数据在内存中操作,但是数据不可能无限存储,当数据到达一定量之后就需要持久化到磁盘中,而压缩合并的处理就十分考验系统性能了,为此LevelDb使用分层的结构进行存储,下面我们从外部的使用结构开始来了解内部的设计。

首先是整个DB的源代码,DB源代码可以从下面的路径访问到:

github.com/google/leve…

我们需要了解DB存储结构,可以看到存储引擎的对外提供的接口十分简单,有点类似map的操作方法:

class LEVELDB_EXPORT DB {
public:
  // 设置数据库的key-value结构,如果没有返回OK则视为操作失败,
  // 备注:考虑默认打开sync=true操作,`Put` 方法在内部最终会调用 `Write` 方法,只是在上层为调用者提供了两个不同的选择。
  virtual Status Put(const WriteOptions& options, const Slice& key,
  const Slice& value) = 0;
  // 成功返回OK,如果异常则不反悔OK,如果什么都返回,说明呗删除的Key不存在,
  virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
  // 写入操作
  virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
  // 根据key获取数据
  virtual Status Get(const ReadOptions& options, const Slice& key,
  std::string* value) = 0;
}

GetPut 是 LevelDB 为上层提供的用于读写的接口,如果我们清楚内部的操作逻辑,那么这个数据库的内部结构和运作过程也基本清楚。

下面先从写入操作开始,看看数据是如何进入到LevelDb,以及内部是如何管理的:

write部分

关联:[[WriteLevel0Table]],[[Write]]。

Write的内部逻辑十分复杂,这里画一个基本的流程图:

image.png

我们从Write()的方法切入,简化代码之后大致的流程如下:

// mark1 为写入构建足够的空间,此时可以不需要加锁。
  Status status = MakeRoomForWrite(updates == nullptr);
  // mark2 通过 `AddRecord` 方法向日志中追加一条写操作的记录;
  status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
  // mark3 如果日志记录成功,则将数据进行写入
  if (status.ok()) {
    status = WriteBatchInternal::InsertInto(write_batch, mem_);
  }

整个执行流程如下:

  • 首先调用MakeRoomForWrite方法为即将进行的写入提供足够的空间。
  • 如果当前空间不足需要冻结当前的memtable,发生 Minor Compaction 并创建一个新的 MemTable 对象;
  • 如果满足触发Major Compaction需要对数据进行压缩并且对于SSTable进行合并
  • 通过AddRecord方法向日志中追加一条写操作记录。
  • 最终调用memtable往内存结构中添加key/value,完成最终写入操作。

将写入操作的源代码逻辑简化之后最终如下:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  MakeRoomForWrite(my_batch == NULL);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  WriteBatch* updates = BuildBatchGroup(&last_writer);
  WriteBatchInternal::SetSequence(updates, last_sequence + 1);
  // 记录最终的操作记录点
  last_sequence += WriteBatchInternal::Count(updates);
  // 日志编写
  log_->AddRecord(WriteBatchInternal::Contents(updates));
  // 将数据写入memtable
  WriteBatchInternal::InsertInto(updates, mem_);
  versions_->SetLastSequence(last_sequence);
  return Status::OK();
}

对于压缩合并在源码中有如下判断,系统会定时检查是否可以进行压缩合并,这一些if/else用于多线程并发写入的时候进行合并写入的操作,当发现有且他线程在操作就会等待结果或者等到拿到锁之后接管合并写入的操作。

如果对于下面的代码有疑问,可以阅读[[LSM-Tree - LevelDb了解和实现]]中关于“合并写入”的部分,为了节省时间,可以在网页中直接输入关键字“合并写入”快速定位,这里假设读者已经了解基本的工作流程,就不再赘述了。

#LevelDb合并写入操作

void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
  // 正在压缩
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
  // DB正在被删除;不再有后台压缩
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
  // 已经发生异常,不能做更多改动。
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
  !versions_->NeedsCompaction()) {
  // 不需要合并则不工作
  } else {
  // 设置当前正常进行压缩合并
  background_compaction_scheduled_ = true;
  // 开始压缩合并
  env_->Schedule(&DBImpl::BGWork, this);
  }
}

不可变memtable

在write的函数内部有这样一串代码,此时会暂停解锁等待写入,这个写入又是干嘛的?

Status status = MakeRoomForWrite(updates == nullptr);

访问内部会发现通过一个while循环判断当前的 memtable状态,一旦发现memtable写入已经写满整个mem,则需要停止写入并且将当前的memtable转为imutiablememtable,并且创建新的mem切换写入,此时还会同时根据一些条件判断是否可以进行压缩mem。

源码中GUARDED_BY含义: GUARDED_BY是数据成员的属性,该属性声明数据成员受给定功能保护。对数据的读操作需要共享访问,而写操作则需要互斥访问。 该 GUARDED_BY属性声明线程必须先锁定listener_list_mutex才能对其进行读写listener_list,从而确保增量和减量操作是原子的。

mem可以看作是当前的系统备忘录或者说一个临时的记账板,这种设计和大多数的日志或者关系型数据库类似,都是先写入日志在进行后续的所有操作,日志优先于记录操作。同时根据日志写入操作加锁来完成并发操作的正常运行。

MakeRoomForWrite方法中比较关键的部分都加了注释,很多操作作者都有介绍意图,代码逻辑都比较简单,多看几遍基本就了解了(C++语法看不懂不必过多纠结)

while (true) {
if (!bg_error_.ok()) {
  // Yield previous error
  s = bg_error_;
  break;
} else if (allow_delay && versions_->NumLevelFiles(0) >=
config::kL0_SlowdownWritesTrigger) {
  // 我们正接近于达到对L0文件数量的硬性限制。L0文件的数量。当我们遇到硬性限制时,与其将单个写操作延迟数而是在我们达到硬限制时,开始将每个mem单独写1ms以减少延迟变化。另外。这个延迟将一些CPU移交给压缩线程,因为 如果它与写入者共享同一个核心的话。
  mutex_.Unlock();
  env_->SleepForMicroseconds(1000);
  // 不要将一个单一的写入延迟超过一次
  allow_delay = false; 
  mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
  // 在当前的mem中还有空间
  break;
} else if (imm_ != nullptr) {
  // 我们已经填满了当前的memtable,但之前的的mem还在写入,所以需要等待
  background_work_finished_signal_.Wait();
} else if (versions_->NumLevelFiles(0) >= 
      config::kL0_StopWritesTrigger) {
  background_work_finished_signal_.Wait();
} else {
  // A试图切换到一个新的memtable并触发对旧memtable的压缩
  assert(versions_->PrevLogNumber() == 0);
  // 新建文件号
  uint64_t new_log_number = versions_->NewFileNumber(); //return next_file_number_++;
  WritableFile* lfile = nullptr;
  // 新建可写入文件, 内部通过一个map构建一个文件:文件状态的简易文件系统
  // typedef std::map<std::string, FileState*> FileSystem;
  s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
  if (!s.ok()) {
    // 避免死循环重复新增文件号
    versions_->ReuseFileNumber(new_log_number);
    break;
  }
  delete log_;
  delete logfile_;
  logfile_ = lfile;
  logfile_number_ = new_log_number;
  // 写入日志
  log_ = new log::Writer(lfile);
  // **重点:imm_ 就是immutable 他将引用指向当前已经写满的mem,其实和mem对象没什么区别,就是加了一个互斥共享锁而已(写互斥,读共享)**
  imm_ = mem_;
  has_imm_.store(true, std::memory_order_release);
  // 新建新的memtable
  mem_ = new MemTable(internal_comparator_);
  // 引用至新块
  mem_->Ref();
  force = false; // Do not force another compaction if have room
  // 尝试对于已满mem压缩合并 ,此处承接上文
  MaybeScheduleCompaction();
}
}

下面用一个简单的示意图了解上面的大致流程:

![[Pasted image 20220330174257.png]]

对于[[SSTable]]的原始理论来说,这样的实现结构显然是有出入的。观察上面的条件判断也可以知道,在通常情况下memtable可以通过短暂的延迟读写请求等待压缩完成,但是一旦发现mem占用的内存过大,此时就需要给当前的mem加锁变为_imu状态,然后创建一个新的 MemTable 实例并且把新进来的请求转到新的mem中,这样就可以继续接受外界的写操作,不再需要等待 Minor Compaction 的结束了。

再次注意此处会通过函数 MaybeScheduleCompaction 是否进行压缩合并的操作判断。

log部分

了解完写入的大致操作流程之后,下面来看看LevelDb的日志管理也就是AddRecord()函数的操作:

image.png

但是日志的核心部分并不在AddRecord()内部,因为内部只有一些简单的字符串拼接操作,这里将核心放到了RecordType的部分,可以看到这里通过当前日志字符长度判断不同的类型,RecordType标识当前记录在块里面的位置:

//....
  enum RecordType {
    // Zero is reserved for preallocated files
    kZeroType = 0,
    kFullType = 1,
    // For fragments
    kFirstType = 2,
    kMiddleType = 3,
    kLastType = 4
  };
  //....
  RecordType type;
  const bool end = (left == fragment_length);
  if (begin && end) {
    type = kFullType;
  } else if (begin) {
    type = kFirstType;
  } else if (end) {
    type = kLastType;
  } else {
    type = kMiddleType;
  }

First:是用户记录第一个片段的类型, Last:是用户记录的最后一个片段的类型。 Middle:是一个用户记录的所有内部片段的类型。

从RecordType内部的定义可以看到日志固定为32KB大小,在日志文件中将分为多部分,但是一个日志只包含一个chunk:

  • 前面4个字节用于CRC校验
  • 接着两个字节是chuck数据长度
  • 接着是一个字节的类型标识(标识当前日志记录在块中位置)
  • 最后是数据payload部分

32kb大小选择也是有考究的,主要考虑日志记录行的磁盘对齐和日志读写,针对日志写的速度也非常快,通过fdatasync(...)方法将缓冲区fflush到磁盘中并且持久化。最后通过日志完成故障恢复的操作。

需要注意如果一个日志记录较大可能存在于多个block块中。

需要注意一个记录永远不会在一个块的最后六个字节内开始,理由是一个记录前面需要一些其他部分占用空间(也就是记录行的校验和数据长度标识信息等),为了保持单个日志块被拆分到多个文件以及压缩考虑,这种“浪费”是可以被接受。 如果读者非要清楚最后几个字节存储的是什么可以看下面的代码: dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));

如果看不懂源代码,可以根据作者的md文档介绍也可以大致了解日志文件结构

record :=
      checksum: uint32     // crc32c of type and data[] ; little-endian
      length: uint16       // little-endian
      type: uint8          // One of FULL, FIRST, MIDDLE, LAST
      data: uint8[length]

我们可以根据描述简单画一个图:

image.png

日志写流程图

日志写的流程比较简单,主要分歧点是当前块剩余空间是否够写入一个header,并且最后6个字节将会填充空格进行补齐。在日志写入的过程中通过一个while(ture)不断判断buffer大小,如果大小超过32KB,则需要停止写入并且把开始写入到现在位置为一个chunk。

image.png

日志读流程图:

image.png

既然一个block大小为32kb,那么一次日志的读写也应该是32kb,接着便是扫描chunk,在扫描chunk的时候如果发现CRC校验不通过则返回错误信息, 如果数据破损则丢弃当前chunk。

整个读取通过while(true)循环read,直到读取到类型为Last的chunk,日志记录读取完成。

memtable比较有意思的特点是无论插入还是删除都是通过“新增”的方式实现的(你没有看错),内部通过Mainfest维护状态,同时根据版本号和序列号维护一条记录是新增还是删除并且保证读取到的内容是最新值,具体介绍同样在上一节[[LSM-Tree - LevelDb了解和实现]]中解释了对于记录的写入来说即使写入日志也是不能查询的(因为中间有可能存在断电故障导致真实记录没有写入),日志仅作为故障恢复,只有数据写入到mem新增数据才会被访问到。

关于mem新增和删除的代码如下:

namespace {
class MemTableInserter : public WriteBatch::Handler {
public:
  SequenceNumber sequence_;
  MemTable* mem_;
  void Put(const Slice& key, const Slice& value) override {
    mem_->Add(sequence_, kTypeValue, key, value);
    sequence_++;
  }
  void Delete(const Slice& key) override {
    mem_->Add(sequence_, kTypeDeletion, key, Slice());
    sequence_++;
  }
  };
} // namespace

Add()函数的内部通过一个[[skiplist 跳表]]完成数据的插入,在数据的node中包含了记录键值,为了保证读取的数据永远是最新的,记录需要在skiplist内部进行排序,节点排序使用的是比较常见的比较器Compare,如果用户想要自定义排序(例如处理不同的字符编码等)可以编写自己的比较器实现。

对于一条记录的结构我们也可以从Add()函数中看到作者的注释:

// Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  tag          : uint64((sequence << 8) | type)
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]

[[VarInt32编码]]:在这里虽然是变长整型类型但是实际使用4个字节表示。 uint64((sequence << 8) | type:运算之后实际为7个字节的sequence长度 注意在tag和value_size中间有一个ValueType标记来标记记录是新增还是删除。

根据get()代码内部通过valueType进行区分,valuetype占用一个字节的空间进行判断新增还是删除记录,默认比较器判断新增或者删除记录逻辑如下:

if (comparator_.comparator.user_comparator()->Compare(
  Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
  // Correct user key
  const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
  switch (static_cast<ValueType>(tag & 0xff)) {
  case kTypeValue: {
    Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
    value->assign(v.data(), v.size());
    return true;
  }
  case kTypeDeletion:
    *s = Status::NotFound(Slice());
    return true;
  }
}

根据代码定义和上面的描述画出下面的结构图:

image.png

Compare键排序

LevelDb的memtable通过跳表维护了键,内部默认情况下使用InternalKeyComparator对于键进行比较,下面是比较内部逻辑: 比较器通过 user_keysequence_number 进行排序,同时按照user_key进行升序排序,序列号通过插入的时间递增,以此来保证无论是增加还是删除都是获取到最新的信息。

/*
 一个用于内部键的比较器,它使用一个指定的比较器用于用户键部分比较,并通过递减序列号来打破平衡。
*/
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
  // Order by:
  // 增加用户密钥(根据用户提供的比较器)。
  // 递减序列号
  // 递减类型(尽管序列号应该足以消除歧义)。
  int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
  if (r == 0) {
    const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
    const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
    if (anum > bnum) {
      r = -1;
    } else if (anum < bnum) {
      r = +1;
    }
  }
  return r;
}

需要注意被比较key可能包含完全不同的内容,这里读者肯定会有疑问对于key获取值进行提取信是否会有影响,然而从get的逻辑来看它可以通过键长度,和序列号等信息进行获取Key,并且获取是header的头部信息,所以key是任何类型都没有英雄。

记录查询

现在我们再回过头来看一下memtable是如何读取的,从memtable和imumemble的关系可以看出有点类似缓存,当memtable写满之后转为imumem并且等待同步至磁盘。

我们可以回顾一下上一节提到的读取等步骤: 读取的步骤:

  • 在memtable中获取指定Key,如果数据符合条件则结束查找。
  • 在Imumemtable中查找指定Key,如果数据符合条件则结束查找。
  • 按低层至高层的顺序在level i层的sstable文件中查找指定的key,若搜索到符合条件的数据项就会结束查找,否则返回Not Found错误,表示数据库中不存在指定的数据。

记录查询按照下面的层级关系进行搜索,首先是从当前内存中正在写入的memtable搜索,接着是imumemtable,再接着是存在于磁盘不同层级的SSTable,SSTable通过*.ldb的形式进行处理。

最终我们可以把LevelDb的查询看作下面的形式:

image.png

小结

这一部分我们了解了LevelDB源代码部分等基础结构DB,介绍了LevelDB的基础对外接口,其实LevelDB和map的接口十分类似,同时重点讲述了读写操作等源代码,以及内部合并压缩的一些细节。

另外记录查询等动作和之前介绍LevelDB等读写流程大致类似,当然代码简化了很多的内容,读者可以根据自己感兴趣的内容研究。

相关文章
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
76 2
|
24天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
51 12
|
19天前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
1月前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
1月前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
54 3
|
2月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
63 5
|
2月前
|
Java Spring
Spring底层架构源码解析(三)
Spring底层架构源码解析(三)
160 5
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
78 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
63 0
|
2月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
66 0

推荐镜像

更多