四、关键代码
1. 随机写入
- 实现逻辑
通过锁一个bucket使得key-value在bucket中一一对应, 并且使得bucket的meta-count被正确地更改; 写入之前先写bucket对应buffer, buffer满了之后进行阻塞的pwrite系统调用。
大体逻辑如下代码所示:
{
unique_lock<mutex> lock(bucket_mtx_[bucket_id]);
// Write value to the value file, with a tmp file as value_buffer.
uint32_t val_buffer_offset = (mmap_meta_cnt_[bucket_id] % TMP_VALUE_BUFFER_SIZE) * VALUE_SIZE;
char *value_buffer = mmap_value_aligned_buffer_view_[bucket_id];
memcpy(value_buffer + val_buffer_offset, value.data(), VALUE_SIZE);
// Write value to the value file.
if ((mmap_meta_cnt_[bucket_id] + 1) % TMP_VALUE_BUFFER_SIZE == 0) {
uint32_t in_bucket_id = mmap_meta_cnt_[bucket_id] - (TMP_VALUE_BUFFER_SIZE - 1);
uint32_t fid;
uint64_t foff;
tie(fid, foff) = get_value_fid_foff(bucket_id, in_bucket_id);
pwrite(value_file_dp_[fid], value_buffer, VALUE_SIZE * TMP_VALUE_BUFFER_SIZE, foff);
}
// Write key to the key file.
uint32_t key_buffer_offset = (mmap_meta_cnt_[bucket_id] % TMP_KEY_BUFFER_SIZE);
uint64_t *key_buffer = mmap_key_aligned_buffer_view_[bucket_id];
key_buffer[key_buffer_offset] = key_int_big_endian;
if (((mmap_meta_cnt_[bucket_id] + 1) % TMP_KEY_BUFFER_SIZE) == 0) {
uint32_t in_bucket_id = (mmap_meta_cnt_[bucket_id] - (TMP_KEY_BUFFER_SIZE - 1));
uint32_t fid;
uint64_t foff;
tie(fid, foff) = get_key_fid_foff(bucket_id, in_bucket_id);
pwrite(key_file_dp_[fid], key_buffer, sizeof(uint64_t) * TMP_KEY_BUFFER_SIZE, foff);
}
// Update the meta data.
mmap_meta_cnt_[bucket_id]++;
}
2. 优化: 调整文件个数为32
调整文件个数为32后, 可以利用不同线程写同一文件时候的阻塞,取得一定程度上的同步效果。 使得QueueDepth在基本所有时刻(包括最终快结束时刻)还处于大于8的水平, 来应对tail threads queue-depth打不高的挑战。
2. 并行索引构建设计
思路: 对每个Bucket构建SortedArray作为Index。
回顾: 文件设计中统一bucket的key-value对应起来了, 那么在构建中key的in-bucket offset和value的in-bucket offset是一样的。
每个worker处理对应的buckets, 逻辑上的buckets可以通过之前讲的K-V Log文件设计对应过去。 在整个数组被填充好了之后可以根据下面这个comparator函数对象进行排序。
[](KeyEntry l, KeyEntry r) {
if (l.key_ == r.key_) {
return l.value_offset_ > r.value_offset_;
} else {
return l.key_ < r.key_;
}
具体逻辑: 每个线程分配到的一个任务分成两部分:1. 读取填充in-bucket-offset, 2. 排序。 1024 buckets被均匀地分到64个线程(key大致均匀地分布到每个bucket), 构建过程中排序和磁盘IO是overlap在一起的。
这个阶段主要时间开销在于读key-logs文件(sort开销可以忽略不计), 总开销大概 0.2 seconds左右。
详细代码如下:
vector <thread> workers(NUM_READ_KEY_THREADS);
for (uint32_t tid = 0; tid < NUM_READ_KEY_THREADS; ++tid) {
workers[tid] = thread([tid, local_buffers_g, this]() {
uint64_t *local_buffer = local_buffers_g[tid];
uint32_t avg = BUCKET_NUM / NUM_READ_KEY_THREADS;
for (uint32_t bucket_id = tid * avg; bucket_id < (tid + 1) * avg; bucket_id++) {
uint32_t entry_count = mmap_meta_cnt_[bucket_id];
if (entry_count > 0) {
uint32_t passes = entry_count / KEY_READ_BLOCK_COUNT;
uint32_t remain_entries_count = entry_count - passes * KEY_READ_BLOCK_COUNT;
uint32_t file_offset = 0;
auto fid_foff = get_key_fid_foff(bucket_id, 0);
uint32_t key_fid = fid_foff.first;
size_t read_offset = fid_foff.second;
for (uint32_t j = 0; j < passes; ++j) {
auto ret = pread(key_file_dp_[key_fid], local_buffer,
KEY_READ_BLOCK_COUNT * sizeof(uint64_t), read_offset);
if (ret != KEY_READ_BLOCK_COUNT * sizeof(uint64_t)) {
log_info("ret: %d, err: %s", ret, strerror(errno));
}
for (uint32_t k = 0; k < KEY_READ_BLOCK_COUNT; k++) {
index_[bucket_id][file_offset].key_ = local_buffer[k];
index_[bucket_id][file_offset].value_offset_ = file_offset;
file_offset++;
}
read_offset += KEY_READ_BLOCK_COUNT * sizeof(uint64_t);
}
if (remain_entries_count != 0) {
size_t num_bytes = (remain_entries_count * sizeof(uint64_t) + FILESYSTEM_BLOCK_SIZE - 1) /
FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_SIZE;
auto ret = pread(key_file_dp_[key_fid], local_buffer, num_bytes, read_offset);
if (ret < static_cast<ssize_t>(remain_entries_count * sizeof(uint64_t))) {
log_info("ret: %d, err: %s, fid:%zu off: %zu", ret, strerror(errno), key_fid,
read_offset);
}
for (uint32_t k = 0; k < remain_entries_count; k++) {
index_[bucket_id][file_offset].key_ = local_buffer[k];
index_[bucket_id][file_offset].value_offset_ = file_offset;
file_offset++;
}
}
sort(index_[bucket_id], index_[bucket_id] + entry_count, [](KeyEntry l, KeyEntry r) {
if (l.key_ == r.key_) {
return l.value_offset_ > r.value_offset_;
} else {
return l.key_ < r.key_;
}
});
}
}
});
}
3. 随机读取
1. 实现逻辑
随机读取基本逻辑就是:查询index, 如果是key-not-found就返回; 否则读文件。
查询index代码如下,其中主要用了带prefetch优化的二分查找:
uint64_t big_endian_key_uint = bswap_64(TO_UINT64(key.data()));
KeyEntry tmp{};
tmp.key_ = big_endian_key_uint;
auto bucket_id = get_par_bucket_id(big_endian_key_uint);
auto it = index_[bucket_id] + branchfree_search(index_[bucket_id], mmap_meta_cnt_[bucket_id], tmp);
剩余的key-not-found判断和读value逻辑如下所示:
if (it == index_[bucket_id] + mmap_meta_cnt_[bucket_id] || it->key_ != big_endian_key_uint) {
NotifyRandomReader(local_block_offset, tid);
return kNotFound;
}
uint32_t fid;
uint64_t foff;
std::tie(fid, foff) = get_value_fid_foff(bucket_id, it->value_offset_);
pread(value_file_dp_[fid], value_buffer, VALUE_SIZE, foff);
NotifyRandomReader(local_block_offset, tid);
value->assign(value_buffer, VALUE_SIZE);
2. 优化:细粒度同步
思路: 我们设计了同步策略来保证足够的queue-depth (25-30之间)的同时, 又使得不同线程可以尽量同时退出, 尽量避免少queue-depth打IO情况的出现。
实现细节: 我们引入了4个blocking queues notify_queues_来作为 偶数和奇数线程的 当前和下一轮读取的同步通信工具 (tid%2==0与tid%2==1线程互相通知)。
实现细节1 (初始化逻辑): 初始化时候放入偶数线程的blocking queue来让他们启动起来。
if (local_block_offset == 0) {
if (tid == 0) {
notify_queues_.resize(4);
for (auto i = 0; i < 4; i++) {
// Even-0,1 Odd-2,3
notify_queues_[i] = new moodycamel::BlockingConcurrentQueue<int32_t>(NUM_THREADS);
}
for (uint32_t i = 0; i < NUM_THREADS / 2; i++) {
notify_queues_[0]->enqueue(1);
}
}
read_barrier_.Wait();
}
实现细节2 (等待逻辑): 每一round的开始的时候会有一个等待。
uint32_t current_round = local_block_offset - 1;
if ((current_round % SHRINK_SYNC_FACTOR) == 0) {
uint32_t notify_big_round_idx = get_notify_big_round(current_round);
if (tid % 2 == 0) {
notify_queues_[notify_big_round_idx % 2]->wait_dequeue(tmp_val);
} else {
notify_queues_[notify_big_round_idx % 2 + 2]->wait_dequeue(tmp_val);
}
}
实现细节3 (通知逻辑): 偶数线程通知奇数线程当前round, 奇数线程通知偶数线程下一round。
void EngineRace::NotifyRandomReader(uint32_t local_block_offset, int64_t tid) {
uint32_t current_round = local_block_offset - 1;
if ((current_round % SHRINK_SYNC_FACTOR) == SHRINK_SYNC_FACTOR - 1) {
uint32_t notify_big_round_idx = get_notify_big_round(current_round);
if (tid % 2 == 0) {
notify_queues_[(notify_big_round_idx) % 2 + 2]->enqueue(1); // Notify This Round
} else {
notify_queues_[(notify_big_round_idx + 1) % 2]->enqueue(1); // Notify Next Round
}
}
}
4.顺序读取和并发全量遍历
- 内存访问和磁盘IO流水线设计
主体逻辑: 单个IO协调线程一直发任务让IO线程打IO, 其他线程消费内存, 每进行一个bucket进行一次barrier来防止visit内存占用太多资源。
实现细节1 (IO协调线程通知memory visit 线程 value buffer结果ready 的同步): 通过使用promise和future进行 (promises_, futures_). 每个bucket会对应一个promise, 来表示一个未来的获取到的返回结果(也就是读取完的buffer), 这个promise对应了一个shared_future, 使得所有visitors可以等待该返回结果。
实现细节2 (通知IO协调线程free buffers已经有了): 通过一个blocking queue free_buffers_来记录free buffers, visitor线程push buffer进入 free_buffers_, IO协调者从中pop buffer。
2. 具体实现
具体实现分为三个部分:IO协调者,IO线程,以及内存vistor线程。
对应的IO协调thread逻辑如下 (其中最重要的ReadBucketToBuffer 通过保证request-size = 128KB,和queue-depth=8来打满
single_range_io_worker_ = new thread([this]() {
// Odd Round.
log_info("In Range IO");
for (uint32_t next_bucket_idx = 0; next_bucket_idx < BUCKET_NUM; next_bucket_idx++) {
// 1st: Pop Buffer.
auto range_clock_beg = high_resolution_clock::now();
char *buffer = free_buffers_->pop(total_io_sleep_time_);
auto range_clock_end = high_resolution_clock::now();
double elapsed_time =
duration_cast<nanoseconds>(range_clock_end - range_clock_beg).count() /
static_cast<double>(1000000000);
total_blocking_queue_time_ += elapsed_time;
// 2nd: Read
ReadBucketToBuffer(next_bucket_idx, buffer);
promises_[next_bucket_idx].set_value(buffer);
}
log_info("In Range IO, Finish Odd Round");
// Even Round.
for (uint32_t next_bucket_idx = 0; next_bucket_idx < BUCKET_NUM; next_bucket_idx++) {
uint32_t future_id = next_bucket_idx + BUCKET_NUM;
char *buffer;
if (next_bucket_idx >= KEEP_REUSE_BUFFER_NUM) {
// 1st: Pop Buffer.
auto range_clock_beg = high_resolution_clock::now();
buffer = free_buffers_->pop(total_io_sleep_time_);
auto range_clock_end = high_resolution_clock::now();
double elapsed_time =
duration_cast<nanoseconds>(range_clock_end - range_clock_beg).count() /
static_cast<double>(1000000000);
total_blocking_queue_time_ += elapsed_time;
// 2nd: Read
ReadBucketToBuffer(next_bucket_idx, buffer);
} else {
buffer = cached_front_buffers_[next_bucket_idx];
}
promises_[future_id].set_value(buffer);
}
log_info("In Range IO, Finish Even Round");
});
其中IO协调thread具体的submit读单个bucket任务的ReadBucketToBufferh函数, 通过保证request-size = 128KB,和queue-depth=8来打满IO, 详细逻辑如下:
void EngineRace::ReadBucketToBuffer(uint32_t bucket_id, char *value_buffer) {
auto range_clock_beg = high_resolution_clock::now();
if (value_buffer == nullptr) {
return;
}
// Get fid, and off.
uint32_t fid;
uint64_t foff;
std::tie(fid, foff) = get_value_fid_foff(bucket_id, 0);
uint32_t value_num = mmap_meta_cnt_[bucket_id];
uint32_t remain_value_num = value_num % VAL_AGG_NUM;
uint32_t total_block_num = (remain_value_num == 0 ? (value_num / VAL_AGG_NUM) :
(value_num / VAL_AGG_NUM + 1));
uint32_t completed_block_num = 0;
uint32_t last_block_size = (remain_value_num == 0 ? (VALUE_SIZE * VAL_AGG_NUM) :
(remain_value_num * VALUE_SIZE));
uint32_t submitted_block_num = 0;
// Submit to Maintain Queue Depth.
while (completed_block_num < total_block_num) {
for (uint32_t io_id = 0; io_id < RANGE_QUEUE_DEPTH; io_id++) {
// Peek Completions If Possible.
if (range_worker_status_tls_[io_id] == WORKER_COMPLETED) {
completed_block_num++;
range_worker_status_tls_[io_id] = WORKER_IDLE;
}
// Submit If Possible.
if (submitted_block_num < total_block_num && range_worker_status_tls_[io_id] == WORKER_IDLE) {
size_t offset = submitted_block_num * (size_t) VAL_AGG_NUM * VALUE_SIZE;
uint32_t size = (submitted_block_num == (total_block_num - 1) ?
last_block_size : (VAL_AGG_NUM * VALUE_SIZE));
range_worker_status_tls_[io_id] = WORKER_SUBMITTED;
range_worker_task_tls_[io_id]->enqueue(
UserIOCB(value_buffer + offset, value_file_dp_[fid], size, offset + foff));
submitted_block_num++;
}
}
}
auto range_clock_end = high_resolution_clock::now();
double elapsed_time = duration_cast<nanoseconds>(range_clock_end - range_clock_beg).count() /
static_cast<double>(1000000000);
total_time_ += elapsed_time;
}
对应的IO线程逻辑如下:
void EngineRace::InitPoolingContext() {
io_threads_ = vector<thread>(RANGE_QUEUE_DEPTH);
range_worker_task_tls_.resize(RANGE_QUEUE_DEPTH);
range_worker_status_tls_ = new atomic_int[RANGE_QUEUE_DEPTH];
for (uint32_t io_id = 0; io_id < RANGE_QUEUE_DEPTH; io_id++) {
range_worker_task_tls_[io_id] = new moodycamel::BlockingConcurrentQueue<UserIOCB>();
range_worker_status_tls_[io_id] = WORKER_IDLE;
io_threads_[io_id] = thread([this, io_id]() {
UserIOCB user_iocb;
#ifdef IO_AFFINITY_EXP
setThreadSelfAffinity(io_id);
#endif
double wait_time = 0;
for (;;) {
range_worker_task_tls_[io_id]->wait_dequeue(user_iocb);
if (user_iocb.fd_ == FD_FINISHED) {
log_info("yes! notified, %d", io_id);
break;
} else {
pread(user_iocb.fd_, user_iocb.buffer_, user_iocb.size_, user_iocb.offset_);
range_worker_status_tls_[io_id] = WORKER_COMPLETED;
}
}
});
}
}
对应的内存visitor线程的逻辑如下: 其中每个bucket开始有个barrier过程, 在每次结束的时候会更新free_buffers_。 更新buffers的逻辑就是最后一个线程将使用完的buffer放入blocking queue。
// End of inner loop, Submit IO Jobs.
int32_t my_order = ++bucket_consumed_num_[future_id];
if (my_order == total_range_num_threads_) {
if ((future_id % (2 * BUCKET_NUM)) < KEEP_REUSE_BUFFER_NUM) {
cached_front_buffers_[future_id] = shared_buffer;
} else {
free_buffers_->push(shared_buffer);
}
}
static thread_local uint32_t bucket_future_id_beg = 0;
uint32_t lower_key_par_id = 0;
uint32_t upper_key_par_id = BUCKET_NUM - 1;
for (uint32_t bucket_id = lower_key_par_id; bucket_id < upper_key_par_id + 1; bucket_id++) {
range_barrier_ptr_->Wait();
uint32_t future_id = bucket_id + bucket_future_id_beg;
char *shared_buffer;
uint32_t relative_id = future_id % (2 * BUCKET_NUM);
if (relative_id >= BUCKET_NUM && relative_id < BUCKET_NUM + KEEP_REUSE_BUFFER_NUM) {
shared_buffer = cached_front_buffers_[relative_id - BUCKET_NUM];
} else {
if (tid == 0) {
auto wait_start_clock = high_resolutIOn_clock::now();
shared_buffer = futures_[future_id].get();
auto wait_end_clock = high_resolutIOn_clock::now();
double elapsed_time = duratIOn_cast<nanoseconds>(wait_end_clock - wait_start_clock).count() /
static_cast<double>(1000000000);
wait_get_time_ += elapsed_time;
} else {
shared_buffer = futures_[future_id].get();
}
}
uint32_t in_par_id_beg = 0;
uint32_t in_par_id_end = mmap_meta_cnt_[bucket_id];
uint64_t prev_key = 0;
for (uint32_t in_par_id = in_par_id_beg; in_par_id < in_par_id_end; in_par_id++) {
// Skip the equalities.
uint64_t big_endian_key = index_[bucket_id][in_par_id].key_;
if (in_par_id != in_par_id_beg) {
if (big_endian_key == prev_key) {
continue;
}
}
prev_key = big_endian_key;
// Key (to little endian first).
(*(uint64_t *) polar_key_ptr_->data()) = bswap_64(big_endian_key);
// Value.
uint64_t val_id = index_[bucket_id][in_par_id].value_offset_;
polar_val_ptr_ = PolarString(shared_buffer + val_id * VALUE_SIZE, VALUE_SIZE);
// Visit Key/Value.
visitor.Visit(*polar_key_ptr_, polar_val_ptr_);
}
// End of inner loop, Submit IO Jobs.
int32_t my_order = ++bucket_consumed_num_[future_id];
if (my_order == total_range_num_threads_) {
if ((future_id % (2 * BUCKET_NUM)) < KEEP_REUSE_BUFFER_NUM) {
cached_front_buffers_[future_id] = shared_buffer;
} else {
free_buffers_->push(shared_buffer);
}
}
}
bucket_future_id_beg += BUCKET_NUM;
3. 优化1: 增大IO和内存访问的overlap区域
每次处理两轮的任务, 来最小化没有overlapped的IO和内存访问的时间。 详细可见IO线程的逻辑(Odd Round, Even Round)。
4. 优化2: 减少IO数量 (充分利用剩余的内存)
利用cache一些buffers减少IO数量: cache前几块buffer来进行第二次range的优化, 减少IO数量。
我们设计了free_buffers_的逻辑来精确地控制IO buffers和cache的使用。 详细实现可见内存visitor线程收尾阶段,核心代码如下:
// End of inner loop, Submit IO Jobs.
int32_t my_order = ++bucket_consumed_num_[future_id];
if (my_order == total_range_num_threads_) {
if ((future_id % (2 * BUCKET_NUM)) < KEEP_REUSE_BUFFER_NUM) {
cached_front_buffers_[future_id] = shared_buffer;
} else {
free_buffers_->push(shared_buffer);
}
}
5. 优化3: 第一次IO前Populate value-buffer内存
通过耗时0.06seconds 左右的内存populate,来使得buffer在第一次使用时候也能达到磁盘读取峰值性能(减少0.1秒)。
if (is_first && tid < MAX_TOTAL_BUFFER_NUM) {
// Really populate the physical memory.
log_info("Tid: %d, Load Physical Mem %d", tid, tid);
for (uint32_t off = 0; off < val_buffer_max_size_; off += FILESYSTEM_BLOCK_SIZE) {
value_shared_buffers_[tid][off] = -1;
}
is_first = false;
}
6. 优化4: 设置affinity
通过set-affinity减少numa之间的切换(io线程绑定到 core-id 0-7 ),(减少0.2秒)。
可参考文档:https://blogs.igalia.com/dpino/2015/10/15/multicore-architectures-and-cpu-affinity/
If a process is running on a core which heavily interacts with an I/O device belonging to different NUMA node, performance degradation issues may appear. NUMA considerably benefits from the data locality principle, so devices and processes operating on the same data should run within the same NUMA node.
五、比赛经验总结和感想
在Key-Value存储引擎比赛中,我们学习了POLARDB数据库存储引擎相关技术, 设计了相应的文件结构和并发查询算法,来充分榨干傲腾存储的IO能力。 阿里巴巴一向是一个注重开放,使用和创新技术的公司,感谢贵司举办的活动, 让我们了解最新的存储硬件,业界需求以及和其他高手互相切磋,共同进步。
六、附录
1.代码中的一些常量
1、储存于索引设计相关
// Buffers.
#define TMP_KEY_BUFFER_SIZE (512)
#define TMP_VALUE_BUFFER_SIZE (4)
// Key/Value Files.
#define VALUE_SIZE (4096)
// Buckets.
#define BUCKET_DIGITS (10) // k-v-buckets must be the same for the range query
#define BUCKET_NUM (1 << BUCKET_DIGITS)
// Max Bucket Size * BUCKET_NUM.
#define MAX_TOTAL_SIZE (68 * 1024 * 1024)
#define KEY_FILE_DIGITS (5) // must make sure same bucket in the same file
#define KEY_FILE_NUM (1 << KEY_FILE_DIGITS)
#define MAX_KEY_BUCKET_SIZE (MAX_TOTAL_SIZE / BUCKET_NUM / FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_SIZE)
#define VAL_FILE_DIGITS (5)
#define VAL_FILE_NUM (1 << VAL_FILE_DIGITS) // must make sure same bucket in the same file
#define MAX_VAL_BUCKET_SIZE (MAX_TOTAL_SIZE / BUCKET_NUM / FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_S
2、三个阶段逻辑相关
// Write.
#define WRITE_BARRIER_NUM (16)
// Read.
#define NUM_READ_KEY_THREADS (NUM_THREADS)
#define NUM_FLUSH_TMP_THREADS (32u)
#define KEY_READ_BLOCK_COUNT (8192u)
// Range.
#define RECYCLE_BUFFER_NUM (2u)
#define KEEP_REUSE_BUFFER_NUM (3u)
#define MAX_TOTAL_BUFFER_NUM (RECYCLE_BUFFER_NUM + KEEP_REUSE_BUFFER_NUM)
#define SHRINK_SYNC_FACTOR (2) // should be divided
// Range Thread Pool.
#define RANGE_QUEUE_DEPTH (8u)
#define VAL_AGG_NUM (32)
#define WORKER_IDLE (0)
#define WORKER_SUBMITTED (1)
#define WORKER_COMPLETED (2)
#define FD_FINISHED (-2)