MySQL · RocksDB · Memtable flush分析

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 概述首先我们知道在RocksDB中,最终数据的持久化都是保存在SST中,而SST则是由Memtable刷新到磁盘生成的,因此这次我们就主要来分析在RocksDB中何时以及如何来Flush内存数据(memtable)到SST.

概述

首先我们知道在RocksDB中,最终数据的持久化都是保存在SST中,而SST则是由Memtable刷新到磁盘生成的,因此这次我们就主要来分析在RocksDB中何时以及如何来Flush内存数据(memtable)到SST.

简单来说在RocksDB中,每一个ColumnFamily都有自己的Memtable,当Memtable超过固定大小之后(或者WAL文件超过限制),它将会被设置为immutable,然后会有后台的线程启动来刷新这个immutable memtable到磁盘(SST).

相关设置

  1. write_buffer_size 表示每个columnfamily的memtable的大小限制
  2. db_write_buffer_size 总的memtable的大小限制(所有的ColumnFamily).
  3. max_write_buffer_number 最大的memtable的个数
  4. min_write_buffer_number_to_merge 表示最小的可以被flush的memtable的个数

Flush Memtable的触发条件

在下面这几种条件下RocksDB会flush memtable到磁盘.

  1. 当某一个memtable的大小超过write_buffer_size.
  2. 当总的memtable的大小超过db_write_buffer_size.
  3. 当WAL文件的大小超过max_total_wal_size之后 最后一个条件的原因是,当WAL文件大小太大之后,我们需要清理WAL,因此此时我们需要将此WAL对应的数据都刷新到磁盘,也是刷新Memtable.

源码

首先在全局的DBImpl中包含了一个flush_queue_的队列,这个队列将会保存所有的将要被flush到磁盘的ColumnFamily.只有当当前的ColumnFamily满足flush条件(cfd->imm()->IsFlushPending())才会将此CF加入到flush队列.

class DBImpl {
................................
  std::deque<ColumnFamilyData*> flush_queue_;
...................
};

然后我们来看IsFlushPending的实现.这个函数的意思就是至少有一个memtable需要被flush.而MemTableList这个类则是保存了所有的immutable memtables.

bool MemTableList::IsFlushPending() const {
  if ((flush_requested_ && num_flush_not_started_ >= 1) ||
      (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
    assert(imm_flush_needed.load(std::memory_order_relaxed));
    return true;
  }
  return false;
}

上面这几个变量的含义在注释中比较清楚, 而min_write_buffer_number_to_merge_就是min_write_buffer_number_to_merge.

  // the number of elements that still need flushing
  int num_flush_not_started_;

  // committing in progress
  bool commit_in_progress_;

  // Requested a flush of all memtables to storage
  bool flush_requested_;

可以看到在SchedulePendingFlush函数中,最终会将对应的ColumnFamily加入到flush queue中.

void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd,
                                  FlushReason flush_reason) {
  if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
    AddToFlushQueue(cfd, flush_reason);
    ++unscheduled_flushes_;
  }
}

而刷新MemTable到磁盘是一个后台线程来做的,这个后台线程叫做BGWorkFlush,最终这个函数会调用BackgroundFlush函数,而BackgroundFlush主要功能是在flush_queue_中找到一个ColumnFamily然后刷新它的memtable到磁盘.

Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
                               LogBuffer* log_buffer) {
................................
  while (!flush_queue_.empty()) {
    // This cfd is already referenced
    auto first_cfd = PopFirstFromFlushQueue();

    if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
      // can't flush this CF, try next one
      if (first_cfd->Unref()) {
        delete first_cfd;
      }
      continue;
    }

    // found a flush!
    cfd = first_cfd;
    break;
  }

  if (cfd != nullptr) {
....................................
    status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
                                       job_context, log_buffer);
    if (cfd->Unref()) {
      delete cfd;
    }
  }
  return status;
}

通过上面可以看到最终会调用FlushMemTableToOutputFile来刷新Memtable到磁盘,等到最后我们来分析这个函数.

而这个刷新线程的调用是在MaybeScheduleFlushOrCompaction函数中进行的。这里可以看到刷新县城的限制是在max_flushes中设置的.

void DBImpl::MaybeScheduleFlushOrCompaction() {
..........................................
  auto bg_job_limits = GetBGJobLimits();
  bool is_flush_pool_empty =
      env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
  while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
         bg_flush_scheduled_ < bg_job_limits.max_flushes) {
    unscheduled_flushes_--;
    bg_flush_scheduled_++;
    env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
  }
...........................................
}

在RocksDB中,有一个SwitchMemtable函数,这个函数用来将现在的memtable改变为immutable,然后再新建一个memtable,也就是说理论上来说每一次内存的memtable被刷新到磁盘之前肯定会调用这个函数.而在实现中,每一次调用SwitchMemtable之后,都会调用对应immutable memtable的FlushRequested函数来设置对应memtable的flush_requeseted_, 并且会调用上面的SchedulePendingFlush来将对应的ColumnFamily加入到flush_queue_队列中.因此这里我们就通过这几个函数的调用栈来分析RocksDB中何时会触发flush操作.

在RocksDB中会有四个地方会调用SwitchMemtable,分别是:

  1. DbImpl::HandleWriteBufferFull
  2. DBImpl::SwitchWAL
  3. DBImpl::FlushMemTable
  4. DBImpl::ScheduleFlushes

接下来我们就来一个个分析这几个函数.

先来看HandleWriteBufferFull.这个函数主要是处理所有ColumnFamily的memtable内存超过限制的情况.可以看到它会调用SwitchMemtable然后再将对应的cfd加入到flush_queue_,最后再来调用后台刷新线程.

Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
...................................
  for (auto cfd : *versions_->GetColumnFamilySet()) {
...............................
  if (cfd_picked != nullptr) {
    status = SwitchMemtable(cfd_picked, write_context,
                            FlushReason::kWriteBufferFull);
    if (status.ok()) {
      cfd_picked->imm()->FlushRequested();
      SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull);
      MaybeScheduleFlushOrCompaction();
    }
  }
  return status;
}

这个函数的调用是在是在写WAL之前,也就是每次写WAL都会进行这个判断.

Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
                               bool* need_log_sync,
                               WriteContext* write_context) {
..........................................
  if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
    // Before a new memtable is added in SwitchMemtable(),
    // write_buffer_manager_->ShouldFlush() will keep returning true. If another
    // thread is writing to another DB with the same write buffer, they may also
    // be flushed. We may end up with flushing much more DBs than needed. It's
    // suboptimal but still correct.
    status = HandleWriteBufferFull(write_context);
  }
........................................
}

可以看到会调用write_buffer的shouldflush来判断是否处理bufferfull.而这个函数很简单,就是判断memtable所使用的内存是否已经超过限制.

  // Should only be called from write thread
  bool ShouldFlush() const {
    if (enabled()) {
      if (mutable_memtable_memory_usage() > mutable_limit_) {
        return true;
      }
      if (memory_usage() >= buffer_size_ &&
          mutable_memtable_memory_usage() >= buffer_size_ / 2) {
        // If the memory exceeds the buffer size, we trigger more aggressive
        // flush. But if already more than half memory is being flushed,
        // triggering more flush may not help. We will hold it instead.
        return true;
      }
    }
    return false;
  }

而mutable_limit_和buffer_size_的初始化在这里,这里buffer_size_就是db_write_buffer_size这个可配置的选项.

WriteBufferManager::WriteBufferManager(size_t _buffer_size,
                                       std::shared_ptr<Cache> cache)
    : buffer_size_(_buffer_size),
      mutable_limit_(buffer_size_ * 7 / 8),

然后我们来看mutable_memtable_memory_usage和memory_usage,这两个函数用来返回整体的write_buffer所使用的内存(memory_used_)以及将要被释放的内存(memory_active_),比如一个memory table被标记为immutable,则表示这块内存将要被释放.

  // Only valid if enabled()
  size_t memory_usage() const {
    return memory_used_.load(std::memory_order_relaxed);
  }
  size_t mutable_memtable_memory_usage() const {
    return memory_active_.load(std::memory_order_relaxed);
  }

然后我们来看SwitchWAL,流程和上面的HandleWriteBufferFull基本一致.

Status DBImpl::SwitchWAL(WriteContext* write_context) {
...............................................
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    if (cfd->IsDropped()) {
      continue;
    }
    if (cfd->OldestLogToKeep() <= oldest_alive_log) {
      status = SwitchMemtable(cfd, write_context);
      if (!status.ok()) {
        break;
      }
      cfd->imm()->FlushRequested();
      SchedulePendingFlush(cfd, FlushReason::kWriteBufferManager);
    }
  }
  MaybeScheduleFlushOrCompaction();
  return status;
}

这个函数被调用比较简单,就是判断是否WAL的大小是否已经超过了设置的wal大小(max_total_wal_size).可以看到它的调用也是在每次写WAL之前.

Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
                               bool* need_log_sync,
                               WriteContext* write_context) {
.................................................
  if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
               total_log_size_ > GetMaxTotalWalSize())) {
    status = SwitchWAL(write_context);
  }

然后是FlushMemTable,这个函数用来强制刷新刷新memtable到磁盘,比如用户直接调用Flush接口.可以看到和上面的集中情况基本一致,switchmemtable->flushrequested->maybescheduleflushorcompaction.

Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
                             const FlushOptions& flush_options,
                             FlushReason flush_reason, bool writes_stopped) {
  Status s;
  uint64_t flush_memtable_id = 0;
  {
.........................................

    // SwitchMemtable() will release and reacquire mutex during execution
    s = SwitchMemtable(cfd, &context);
    flush_memtable_id = cfd->imm()->GetLatestMemTableID();

    if (!writes_stopped) {
      write_thread_.ExitUnbatched(&w);
    }

    cfd->imm()->FlushRequested();

    // schedule flush
    SchedulePendingFlush(cfd, flush_reason);
    MaybeScheduleFlushOrCompaction();
  }
...........................
  return s;
}

最后我们来看最后一种情况,这种情况和前面三种有一个最大的区别就是前面三种情况的出现都是需要立即调用flush线程来刷新memtable到磁盘,而还有一种情况则是没那么紧急的情况,也就是说可以等到后面某个时间段再调用flush线程来刷新内容到磁盘.

在这种情况下,每一个memtable都会有一个状态叫做flush_state_,而每个memtable都有可能有三种状态.而状态的更新是通过UpdateFlushState来进行的.这里可以推测的到这些都是对于单个memtable的限制.

  enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };

void MemTable::UpdateFlushState() {
  auto state = flush_state_.load(std::memory_order_relaxed);
  if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
    // ignore CAS failure, because that means somebody else requested
    // a flush
    flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
                                         std::memory_order_relaxed,
                                         std::memory_order_relaxed);
  }
}

而UpdateFlushState什么时候会被调用呢,很简单,就是当你每次操作memtable的时候,比如update/add这些操作.

可以看到当shoudflushnow之后,将会设置flush_state_状态为FLUSH_REQUESTED,也就是此memtable将会被flush.

然后来看shouldflushnow函数,这个函数主要的判断就是判断是否当前MemTable的内存使用是否超过了write_buffer_size,如果超过了,那么就返回true.

bool MemTable::ShouldFlushNow() const {
  size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
  const double kAllowOverAllocationRatio = 0.6;

  // If arena still have room for new block allocation, we can safely say it
  // shouldn't flush.
  auto allocated_memory = table_->ApproximateMemoryUsage() +
                          range_del_table_->ApproximateMemoryUsage() +
                          arena_.MemoryAllocatedBytes();

  // if we can still allocate one more block without exceeding the
  // over-allocation ratio, then we should not flush.
  if (allocated_memory + kArenaBlockSize <
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
    return false;
  }

  // if user keeps adding entries that exceeds write_buffer_size, we need to
  // flush earlier even though we still have much available memory left.
  if (allocated_memory >
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
    return true;
  }

  return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}

然后我们来看当设置了flush_state_状态之后,会做什么操作.对应的MEmtable有一个ShouldScheduleFlush函数,这个函数用来返回当前的memtable是否已经被设置flush_requested状态位。

 bool ShouldScheduleFlush() const {
    return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
  }

而这个函数会在checkmemtablefull中被调用,这个函数主要用来将已经设置flush_state_为flush_requested的memtable的状态改变为flush_schedule(意思就是已经进入flush的调度队列),然后将这个columnfamily加入到对应的调度队列.

  void CheckMemtableFull() {
    if (flush_scheduler_ != nullptr) {
      auto* cfd = cf_mems_->current();
      assert(cfd != nullptr);
      if (cfd->mem()->ShouldScheduleFlush() &&
          cfd->mem()->MarkFlushScheduled()) {
        // MarkFlushScheduled only returns true if we are the one that
        // should take action, so no need to dedup further
        flush_scheduler_->ScheduleFlush(cfd);
      }
    }
  }

其中MarkFlushScheduled就是用来改变状态.

  bool MarkFlushScheduled() {
    auto before = FLUSH_REQUESTED;
    return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
                                                std::memory_order_relaxed,
                                                std::memory_order_relaxed);
  }

而ScheduleFlush则是比较重要的一个函数,就是用来将对应的CF加入到flush调度队列(FlushScheduler).

void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
#ifndef NDEBUG
  std::lock_guard<std::mutex> lock(checking_mutex_);
  assert(checking_set_.count(cfd) == 0);
  checking_set_.insert(cfd);
#endif  // NDEBUG
  cfd->Ref();
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
  Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
  while (!head_.compare_exchange_strong(
      node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) {
    // failing CAS updates the first param, so we are already set for
    // retry.  TakeNextColumnFamily won't happen until after another
    // inter-thread synchronization, so we don't even need release
    // semantics for this CAS
  }
#endif  // __clang_analyzer__
}

而checkmemtablefull会在下面三种条件下被调用

  1. delete操作
  2. put操作
  3. merge操作.

然后我们来看flushscheduler如何来调度flush线程.首先在每次写WAL之前都会调用PreprocessWrite,然后这个函数会判断flush_scheduler是否为空(也就是是否有已经满掉的memtable需要刷新到磁盘).

Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
                               bool* need_log_sync,
                               WriteContext* write_context) {
..................................................................
  if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
    status = ScheduleFlushes(write_context);
  }

而在SscheduleFlushes中,则会遍历之前所有的需要被flush的memtable,然后调用switchMemtable来进行后续操作.这里要注意在SwitchMemtable也会触发调用flush线程.

Status DBImpl::ScheduleFlushes(WriteContext* context) {
  ColumnFamilyData* cfd;
  while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
    auto status = SwitchMemtable(cfd, context, FlushReason::kWriteBufferFull);
    if (cfd->Unref()) {
      delete cfd;
    }
    if (!status.ok()) {
      return status;
    }
  }
  return Status::OK();
}

刷新memtable到sst

在RocksDB中刷新是通过FlushJob这个类来实现的,整个实现还是比较简单.最终这里是调用WriteLevel0Table来刷新内容到磁盘。这里就不分析sst的格式了,需要了解具体格式的可以看RocksDB的wiki.

Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
                     FileMetaData* file_meta) {
...........................................
  // This will release and re-acquire the mutex.
  Status s = WriteLevel0Table();

  if (s.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
    s = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during flush");
  }

  if (!s.ok()) {
    cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
  } else {
    TEST_SYNC_POINT("FlushJob::InstallResults");
    // Replace immutable memtable with the generated Table
    s = cfd_->imm()->InstallMemtableFlushResults(
        cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
        meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
        log_buffer_);
  }
........................................................
}
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1684 14
|
2月前
|
存储 关系型数据库 MySQL
基于案例分析 MySQL 权限认证中的具体优先原则
【10月更文挑战第26天】本文通过具体案例分析了MySQL权限认证中的优先原则,包括全局权限、数据库级别权限和表级别权限的设置与优先级。全局权限优先于数据库级别权限,后者又优先于表级别权限。在权限冲突时,更严格的权限将被优先执行,确保数据库的安全性与资源合理分配。
|
2月前
|
SQL 关系型数据库 MySQL
MySQL 更新1000万条数据和DDL执行时间分析
MySQL 更新1000万条数据和DDL执行时间分析
161 4
|
2月前
|
SQL 自然语言处理 关系型数据库
Vanna使用ollama分析本地MySQL数据库
这篇文章详细介绍了如何使用Vanna结合Ollama框架来分析本地MySQL数据库,实现自然语言查询功能,包括环境搭建和配置流程。
287 0
|
3月前
|
Oracle NoSQL 关系型数据库
主流数据库对比:MySQL、PostgreSQL、Oracle和Redis的优缺点分析
主流数据库对比:MySQL、PostgreSQL、Oracle和Redis的优缺点分析
583 2
|
3月前
|
存储 关系型数据库 MySQL
分析MySQL主从复制中AUTO_INCREMENT值不一致的问题
通过对 `AUTO_INCREMENT`不一致问题的深入分析和合理应对措施的实施,可以有效地维护MySQL主从复制环境中数据的一致性和完整性,确保数据库系统的稳定性和可靠性。
124 6
|
2月前
|
SQL 关系型数据库 MySQL
MySQL EXPLAIN该如何分析?
本文将详细介绍MySQL中`EXPLAIN`关键字的工作原理及结果字段解析,帮助优化查询性能。`EXPLAIN`可显示查询SQL的执行计划,其结果包括`id`、`select_type`、`table`等字段。通过具体示例和优化建议,帮助你理解和应用`EXPLAIN`,提升数据库查询效率。
116 0
|
3月前
|
存储 关系型数据库 MySQL
分析MySQL主从复制中AUTO_INCREMENT值不一致的问题
通过对 `AUTO_INCREMENT`不一致问题的深入分析和合理应对措施的实施,可以有效地维护MySQL主从复制环境中数据的一致性和完整性,确保数据库系统的稳定性和可靠性。
69 1
|
4月前
|
存储 JSON 关系型数据库
MySQL与JSON的邂逅:开启大数据分析新纪元
MySQL与JSON的邂逅:开启大数据分析新纪元
|
4月前
|
缓存 关系型数据库 MySQL
在Linux中,如何优化MySQL性能,包括索引优化和查询分析?
在Linux中,如何优化MySQL性能,包括索引优化和查询分析?

相关产品

  • 云数据库 RDS MySQL 版