“关键代码”——POLARDB大赛冠军吐血整理,你盘吗【下篇】

本文涉及的产品
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: “上篇讲到:yche针对落幕不久的第一届POLARDB数据库性能大赛整理了一套详细的比赛攻略,(赛题:实现一个简化、高效的kv存储引擎,支持Write、Read以及Range接口),那么今天小天就要贡献出最核心的部分——关键代码~”

四、关键代码

1. 随机写入

  1. 实现逻辑

通过锁一个bucket使得key-value在bucket中一一对应, 并且使得bucket的meta-count被正确地更改; 写入之前先写bucket对应buffer, buffer满了之后进行阻塞的pwrite系统调用。

大体逻辑如下代码所示:

{
    unique_lock<mutex> lock(bucket_mtx_[bucket_id]);
    // Write value to the value file, with a tmp file as value_buffer.
    uint32_t val_buffer_offset = (mmap_meta_cnt_[bucket_id] % TMP_VALUE_BUFFER_SIZE) * VALUE_SIZE;
    char *value_buffer = mmap_value_aligned_buffer_view_[bucket_id];
    memcpy(value_buffer + val_buffer_offset, value.data(), VALUE_SIZE);

    // Write value to the value file.
    if ((mmap_meta_cnt_[bucket_id] + 1) % TMP_VALUE_BUFFER_SIZE == 0) {
        uint32_t in_bucket_id = mmap_meta_cnt_[bucket_id] - (TMP_VALUE_BUFFER_SIZE - 1);
        uint32_t fid;
        uint64_t foff;
        tie(fid, foff) = get_value_fid_foff(bucket_id, in_bucket_id);
        pwrite(value_file_dp_[fid], value_buffer, VALUE_SIZE * TMP_VALUE_BUFFER_SIZE, foff);
    }

    // Write key to the key file.
    uint32_t key_buffer_offset = (mmap_meta_cnt_[bucket_id] % TMP_KEY_BUFFER_SIZE);
    uint64_t *key_buffer = mmap_key_aligned_buffer_view_[bucket_id];
    key_buffer[key_buffer_offset] = key_int_big_endian;
    if (((mmap_meta_cnt_[bucket_id] + 1) % TMP_KEY_BUFFER_SIZE) == 0) {
        uint32_t in_bucket_id = (mmap_meta_cnt_[bucket_id] - (TMP_KEY_BUFFER_SIZE - 1));

        uint32_t fid;
        uint64_t foff;
        tie(fid, foff) = get_key_fid_foff(bucket_id, in_bucket_id);
        pwrite(key_file_dp_[fid], key_buffer, sizeof(uint64_t) * TMP_KEY_BUFFER_SIZE, foff);
    }

    // Update the meta data.
    mmap_meta_cnt_[bucket_id]++;
}
2. 优化: 调整文件个数为32

调整文件个数为32后, 可以利用不同线程写同一文件时候的阻塞,取得一定程度上的同步效果。 使得QueueDepth在基本所有时刻(包括最终快结束时刻)还处于大于8的水平, 来应对tail threads queue-depth打不高的挑战。

2. 并行索引构建设计

思路: 对每个Bucket构建SortedArray作为Index。

回顾: 文件设计中统一bucket的key-value对应起来了, 那么在构建中key的in-bucket offset和value的in-bucket offset是一样的。

每个worker处理对应的buckets, 逻辑上的buckets可以通过之前讲的K-V Log文件设计对应过去。 在整个数组被填充好了之后可以根据下面这个comparator函数对象进行排序。

[](KeyEntry l, KeyEntry r) {
if (l.key_ == r.key_) {
    return l.value_offset_ > r.value_offset_;
} else {
    return l.key_ < r.key_;
}

具体逻辑: 每个线程分配到的一个任务分成两部分:1. 读取填充in-bucket-offset, 2. 排序。 1024 buckets被均匀地分到64个线程(key大致均匀地分布到每个bucket), 构建过程中排序和磁盘IO是overlap在一起的。

这个阶段主要时间开销在于读key-logs文件(sort开销可以忽略不计), 总开销大概 0.2 seconds左右。

详细代码如下:

vector <thread> workers(NUM_READ_KEY_THREADS);
for (uint32_t tid = 0; tid < NUM_READ_KEY_THREADS; ++tid) {
    workers[tid] = thread([tid, local_buffers_g, this]() {
        uint64_t *local_buffer = local_buffers_g[tid];
        uint32_t avg = BUCKET_NUM / NUM_READ_KEY_THREADS;
        for (uint32_t bucket_id = tid * avg; bucket_id < (tid + 1) * avg; bucket_id++) {
            uint32_t entry_count = mmap_meta_cnt_[bucket_id];
            if (entry_count > 0) {
                uint32_t passes = entry_count / KEY_READ_BLOCK_COUNT;
                uint32_t remain_entries_count = entry_count - passes * KEY_READ_BLOCK_COUNT;
                uint32_t file_offset = 0;

                auto fid_foff = get_key_fid_foff(bucket_id, 0);
                uint32_t key_fid = fid_foff.first;
                size_t read_offset = fid_foff.second;
                for (uint32_t j = 0; j < passes; ++j) {
                    auto ret = pread(key_file_dp_[key_fid], local_buffer,
                                    KEY_READ_BLOCK_COUNT * sizeof(uint64_t), read_offset);
                    if (ret != KEY_READ_BLOCK_COUNT * sizeof(uint64_t)) {
                        log_info("ret: %d, err: %s", ret, strerror(errno));
                    }
                    for (uint32_t k = 0; k < KEY_READ_BLOCK_COUNT; k++) {
                        index_[bucket_id][file_offset].key_ = local_buffer[k];
                        index_[bucket_id][file_offset].value_offset_ = file_offset;
                        file_offset++;
                    }
                    read_offset += KEY_READ_BLOCK_COUNT * sizeof(uint64_t);
                }

                if (remain_entries_count != 0) {
                    size_t num_bytes = (remain_entries_count * sizeof(uint64_t) + FILESYSTEM_BLOCK_SIZE - 1) /
                                        FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_SIZE;
                    auto ret = pread(key_file_dp_[key_fid], local_buffer, num_bytes, read_offset);
                    if (ret < static_cast<ssize_t>(remain_entries_count * sizeof(uint64_t))) {
                        log_info("ret: %d, err: %s, fid:%zu off: %zu", ret, strerror(errno), key_fid,
                                    read_offset);
                    }
                    for (uint32_t k = 0; k < remain_entries_count; k++) {
                        index_[bucket_id][file_offset].key_ = local_buffer[k];
                        index_[bucket_id][file_offset].value_offset_ = file_offset;
                        file_offset++;
                    }
                }

                sort(index_[bucket_id], index_[bucket_id] + entry_count, [](KeyEntry l, KeyEntry r) {
                    if (l.key_ == r.key_) {
                        return l.value_offset_ > r.value_offset_;
                    } else {
                        return l.key_ < r.key_;
                    }
                });
            }
        }
    });
}

3. 随机读取

1. 实现逻辑

随机读取基本逻辑就是:查询index, 如果是key-not-found就返回; 否则读文件。

查询index代码如下,其中主要用了带prefetch优化的二分查找:

uint64_t big_endian_key_uint = bswap_64(TO_UINT64(key.data()));

KeyEntry tmp{};
tmp.key_ = big_endian_key_uint;
auto bucket_id = get_par_bucket_id(big_endian_key_uint);

auto it = index_[bucket_id] + branchfree_search(index_[bucket_id], mmap_meta_cnt_[bucket_id], tmp);

剩余的key-not-found判断和读value逻辑如下所示:

if (it == index_[bucket_id] + mmap_meta_cnt_[bucket_id] || it->key_ != big_endian_key_uint) {
    NotifyRandomReader(local_block_offset, tid);
    return kNotFound;
}

uint32_t fid;
uint64_t foff;
std::tie(fid, foff) = get_value_fid_foff(bucket_id, it->value_offset_);

pread(value_file_dp_[fid], value_buffer, VALUE_SIZE, foff);
NotifyRandomReader(local_block_offset, tid);

value->assign(value_buffer, VALUE_SIZE);
2. 优化:细粒度同步

思路: 我们设计了同步策略来保证足够的queue-depth (25-30之间)的同时, 又使得不同线程可以尽量同时退出, 尽量避免少queue-depth打IO情况的出现。

实现细节: 我们引入了4个blocking queues notify_queues_来作为 偶数和奇数线程的 当前和下一轮读取的同步通信工具 (tid%2==0与tid%2==1线程互相通知)。

实现细节1 (初始化逻辑): 初始化时候放入偶数线程的blocking queue来让他们启动起来。

if (local_block_offset == 0) {
    if (tid == 0) {
        notify_queues_.resize(4);
        for (auto i = 0; i < 4; i++) {
            // Even-0,1  Odd-2,3
                notify_queues_[i] = new moodycamel::BlockingConcurrentQueue<int32_t>(NUM_THREADS);
        }
        for (uint32_t i = 0; i < NUM_THREADS / 2; i++) {
                notify_queues_[0]->enqueue(1);
            }
    }
    read_barrier_.Wait();
}

实现细节2 (等待逻辑): 每一round的开始的时候会有一个等待。

uint32_t current_round = local_block_offset - 1;
if ((current_round % SHRINK_SYNC_FACTOR) == 0) {
    uint32_t notify_big_round_idx = get_notify_big_round(current_round);
    if (tid % 2 == 0) {
        notify_queues_[notify_big_round_idx % 2]->wait_dequeue(tmp_val);
    } else {
        notify_queues_[notify_big_round_idx % 2 + 2]->wait_dequeue(tmp_val);
    }
}

实现细节3 (通知逻辑): 偶数线程通知奇数线程当前round, 奇数线程通知偶数线程下一round。

void EngineRace::NotifyRandomReader(uint32_t local_block_offset, int64_t tid) {
    uint32_t current_round = local_block_offset - 1;
    if ((current_round % SHRINK_SYNC_FACTOR) == SHRINK_SYNC_FACTOR - 1) {
        uint32_t notify_big_round_idx = get_notify_big_round(current_round);
        if (tid % 2 == 0) {
            notify_queues_[(notify_big_round_idx) % 2 + 2]->enqueue(1);   // Notify This Round
        } else {
            notify_queues_[(notify_big_round_idx + 1) % 2]->enqueue(1);   // Notify Next Round
        }
    }
}

4.顺序读取和并发全量遍历

  1. 内存访问和磁盘IO流水线设计

主体逻辑: 单个IO协调线程一直发任务让IO线程打IO, 其他线程消费内存, 每进行一个bucket进行一次barrier来防止visit内存占用太多资源。

实现细节1 (IO协调线程通知memory visit 线程 value buffer结果ready 的同步): 通过使用promise和future进行 (promises_, futures_). 每个bucket会对应一个promise, 来表示一个未来的获取到的返回结果(也就是读取完的buffer), 这个promise对应了一个shared_future, 使得所有visitors可以等待该返回结果。

实现细节2 (通知IO协调线程free buffers已经有了): 通过一个blocking queue free_buffers_来记录free buffers, visitor线程push buffer进入 free_buffers_, IO协调者从中pop buffer。

2. 具体实现

具体实现分为三个部分:IO协调者,IO线程,以及内存vistor线程。

对应的IO协调thread逻辑如下 (其中最重要的ReadBucketToBuffer 通过保证request-size = 128KB,和queue-depth=8来打满

single_range_io_worker_ = new thread([this]() {
    // Odd Round.
    log_info("In Range IO");
    for (uint32_t next_bucket_idx = 0; next_bucket_idx < BUCKET_NUM; next_bucket_idx++) {
        // 1st: Pop Buffer.
        auto range_clock_beg = high_resolution_clock::now();
        char *buffer = free_buffers_->pop(total_io_sleep_time_);
        auto range_clock_end = high_resolution_clock::now();
        double elapsed_time =
                duration_cast<nanoseconds>(range_clock_end - range_clock_beg).count() /
                    static_cast<double>(1000000000);
        total_blocking_queue_time_ += elapsed_time;

        // 2nd: Read
        ReadBucketToBuffer(next_bucket_idx, buffer);
        promises_[next_bucket_idx].set_value(buffer);
    }
    log_info("In Range IO, Finish Odd Round");

    // Even Round.
    for (uint32_t next_bucket_idx = 0; next_bucket_idx < BUCKET_NUM; next_bucket_idx++) {
        uint32_t future_id = next_bucket_idx + BUCKET_NUM;
        char *buffer;
        if (next_bucket_idx >= KEEP_REUSE_BUFFER_NUM) {
            // 1st: Pop Buffer.
            auto range_clock_beg = high_resolution_clock::now();
            buffer = free_buffers_->pop(total_io_sleep_time_);
            auto range_clock_end = high_resolution_clock::now();
            double elapsed_time =
                    duration_cast<nanoseconds>(range_clock_end - range_clock_beg).count() /
                        static_cast<double>(1000000000);
            total_blocking_queue_time_ += elapsed_time;

            // 2nd: Read
            ReadBucketToBuffer(next_bucket_idx, buffer);
        } else {
            buffer = cached_front_buffers_[next_bucket_idx];
        }
        promises_[future_id].set_value(buffer);
    }
    log_info("In Range IO, Finish Even Round");
});

其中IO协调thread具体的submit读单个bucket任务的ReadBucketToBufferh函数, 通过保证request-size = 128KB,和queue-depth=8来打满IO, 详细逻辑如下:

void EngineRace::ReadBucketToBuffer(uint32_t bucket_id, char *value_buffer) {
    auto range_clock_beg = high_resolution_clock::now();

    if (value_buffer == nullptr) {
        return;
    }

    // Get fid, and off.
    uint32_t fid;
    uint64_t foff;
    std::tie(fid, foff) = get_value_fid_foff(bucket_id, 0);

    uint32_t value_num = mmap_meta_cnt_[bucket_id];
    uint32_t remain_value_num = value_num % VAL_AGG_NUM;
    uint32_t total_block_num = (remain_value_num == 0 ? (value_num / VAL_AGG_NUM) :
                                (value_num / VAL_AGG_NUM + 1));
    uint32_t completed_block_num = 0;
    uint32_t last_block_size = (remain_value_num == 0 ? (VALUE_SIZE * VAL_AGG_NUM) :
                                (remain_value_num * VALUE_SIZE));
    uint32_t submitted_block_num = 0;

    // Submit to Maintain Queue Depth.
    while (completed_block_num < total_block_num) {
        for (uint32_t io_id = 0; io_id < RANGE_QUEUE_DEPTH; io_id++) {
            // Peek Completions If Possible.
            if (range_worker_status_tls_[io_id] == WORKER_COMPLETED) {
                completed_block_num++;
                range_worker_status_tls_[io_id] = WORKER_IDLE;
            }

            // Submit If Possible.
            if (submitted_block_num < total_block_num && range_worker_status_tls_[io_id] == WORKER_IDLE) {
                size_t offset = submitted_block_num * (size_t) VAL_AGG_NUM * VALUE_SIZE;
                uint32_t size = (submitted_block_num == (total_block_num - 1) ?
                                last_block_size : (VAL_AGG_NUM * VALUE_SIZE));
                range_worker_status_tls_[io_id] = WORKER_SUBMITTED;
                range_worker_task_tls_[io_id]->enqueue(
                UserIOCB(value_buffer + offset, value_file_dp_[fid], size, offset + foff));
                submitted_block_num++;
            }
        }
    }

    auto range_clock_end = high_resolution_clock::now();
    double elapsed_time = duration_cast<nanoseconds>(range_clock_end - range_clock_beg).count() /
                              static_cast<double>(1000000000);
    total_time_ += elapsed_time;
}

对应的IO线程逻辑如下:

void EngineRace::InitPoolingContext() {
    io_threads_ = vector<thread>(RANGE_QUEUE_DEPTH);
    range_worker_task_tls_.resize(RANGE_QUEUE_DEPTH);
    range_worker_status_tls_ = new atomic_int[RANGE_QUEUE_DEPTH];
    for (uint32_t io_id = 0; io_id < RANGE_QUEUE_DEPTH; io_id++) {
        range_worker_task_tls_[io_id] = new moodycamel::BlockingConcurrentQueue<UserIOCB>();
        range_worker_status_tls_[io_id] = WORKER_IDLE;
        io_threads_[io_id] = thread([this, io_id]() {
            UserIOCB user_iocb;
#ifdef IO_AFFINITY_EXP
            setThreadSelfAffinity(io_id);
#endif
            double wait_time = 0;
            for (;;) {
                range_worker_task_tls_[io_id]->wait_dequeue(user_iocb);

                if (user_iocb.fd_ == FD_FINISHED) {
                    log_info("yes! notified, %d", io_id);
                    break;
                } else {
                    pread(user_iocb.fd_, user_iocb.buffer_, user_iocb.size_, user_iocb.offset_);
                    range_worker_status_tls_[io_id] = WORKER_COMPLETED;
                }
            }
        });
    }
}

对应的内存visitor线程的逻辑如下: 其中每个bucket开始有个barrier过程, 在每次结束的时候会更新free_buffers_。 更新buffers的逻辑就是最后一个线程将使用完的buffer放入blocking queue。

// End of inner loop, Submit IO Jobs.
int32_t my_order = ++bucket_consumed_num_[future_id];
if (my_order == total_range_num_threads_) {
    if ((future_id % (2 * BUCKET_NUM)) < KEEP_REUSE_BUFFER_NUM) {
        cached_front_buffers_[future_id] = shared_buffer;
    } else {
        free_buffers_->push(shared_buffer);
    }
}
static thread_local uint32_t bucket_future_id_beg = 0;

uint32_t lower_key_par_id = 0;
uint32_t upper_key_par_id = BUCKET_NUM - 1;
for (uint32_t bucket_id = lower_key_par_id; bucket_id < upper_key_par_id + 1; bucket_id++) {
    range_barrier_ptr_->Wait();

    uint32_t future_id = bucket_id + bucket_future_id_beg;
    char *shared_buffer;
    uint32_t relative_id = future_id % (2 * BUCKET_NUM);
    if (relative_id >= BUCKET_NUM && relative_id < BUCKET_NUM + KEEP_REUSE_BUFFER_NUM) {
        shared_buffer = cached_front_buffers_[relative_id - BUCKET_NUM];
    } else {
        if (tid == 0) {
            auto wait_start_clock = high_resolutIOn_clock::now();
            shared_buffer = futures_[future_id].get();
            auto wait_end_clock = high_resolutIOn_clock::now();
            double elapsed_time = duratIOn_cast<nanoseconds>(wait_end_clock - wait_start_clock).count() /
                                          static_cast<double>(1000000000);
            wait_get_time_ += elapsed_time;
        } else {
            shared_buffer = futures_[future_id].get();
        }
    }

    uint32_t in_par_id_beg = 0;
    uint32_t in_par_id_end = mmap_meta_cnt_[bucket_id];
    uint64_t prev_key = 0;
    for (uint32_t in_par_id = in_par_id_beg; in_par_id < in_par_id_end; in_par_id++) {
        // Skip the equalities.
        uint64_t big_endian_key = index_[bucket_id][in_par_id].key_;
        if (in_par_id != in_par_id_beg) {
            if (big_endian_key == prev_key) {
                continue;
            }
        }
        prev_key = big_endian_key;

        // Key (to little endian first).
        (*(uint64_t *) polar_key_ptr_->data()) = bswap_64(big_endian_key);

        // Value.
        uint64_t val_id = index_[bucket_id][in_par_id].value_offset_;
        polar_val_ptr_ = PolarString(shared_buffer + val_id * VALUE_SIZE, VALUE_SIZE);

        // Visit Key/Value.
        visitor.Visit(*polar_key_ptr_, polar_val_ptr_);
    }

    // End of inner loop, Submit IO Jobs.
    int32_t my_order = ++bucket_consumed_num_[future_id];
    if (my_order == total_range_num_threads_) {
        if ((future_id % (2 * BUCKET_NUM)) < KEEP_REUSE_BUFFER_NUM) {
            cached_front_buffers_[future_id] = shared_buffer;
        } else {
            free_buffers_->push(shared_buffer);
        }
    }
}

bucket_future_id_beg += BUCKET_NUM;
3. 优化1: 增大IO和内存访问的overlap区域

每次处理两轮的任务, 来最小化没有overlapped的IO和内存访问的时间。 详细可见IO线程的逻辑(Odd Round, Even Round)。

4. 优化2: 减少IO数量 (充分利用剩余的内存)

利用cache一些buffers减少IO数量: cache前几块buffer来进行第二次range的优化, 减少IO数量。

我们设计了free_buffers_的逻辑来精确地控制IO buffers和cache的使用。 详细实现可见内存visitor线程收尾阶段,核心代码如下:

// End of inner loop, Submit IO Jobs.
int32_t my_order = ++bucket_consumed_num_[future_id];
if (my_order == total_range_num_threads_) {
    if ((future_id % (2 * BUCKET_NUM)) < KEEP_REUSE_BUFFER_NUM) {
        cached_front_buffers_[future_id] = shared_buffer;
    } else {
        free_buffers_->push(shared_buffer);
    }
}

5. 优化3: 第一次IO前Populate value-buffer内存

通过耗时0.06seconds 左右的内存populate,来使得buffer在第一次使用时候也能达到磁盘读取峰值性能(减少0.1秒)。

if (is_first && tid < MAX_TOTAL_BUFFER_NUM) {
    // Really populate the physical memory.
    log_info("Tid: %d, Load Physical Mem %d", tid, tid);
    for (uint32_t off = 0; off < val_buffer_max_size_; off += FILESYSTEM_BLOCK_SIZE) {
        value_shared_buffers_[tid][off] = -1;
    }
    is_first = false;
}

6. 优化4: 设置affinity
通过set-affinity减少numa之间的切换(io线程绑定到 core-id 0-7 ),(减少0.2秒)。

可参考文档:https://blogs.igalia.com/dpino/2015/10/15/multicore-architectures-and-cpu-affinity/

If a process is running on a core which heavily interacts with an I/O device belonging to different NUMA node, performance degradation issues may appear. NUMA considerably benefits from the data locality principle, so devices and processes operating on the same data should run within the same NUMA node.

五、比赛经验总结和感想

在Key-Value存储引擎比赛中,我们学习了POLARDB数据库存储引擎相关技术, 设计了相应的文件结构和并发查询算法,来充分榨干傲腾存储的IO能力。 阿里巴巴一向是一个注重开放,使用和创新技术的公司,感谢贵司举办的活动, 让我们了解最新的存储硬件,业界需求以及和其他高手互相切磋,共同进步。

六、附录

1.代码中的一些常量

1、储存于索引设计相关

// Buffers.
#define TMP_KEY_BUFFER_SIZE (512)
#define TMP_VALUE_BUFFER_SIZE (4)
// Key/Value Files.
#define VALUE_SIZE (4096)

// Buckets.
#define BUCKET_DIGITS (10)      // k-v-buckets must be the same for the range query
#define BUCKET_NUM (1 << BUCKET_DIGITS)

// Max Bucket Size * BUCKET_NUM.
#define MAX_TOTAL_SIZE (68 * 1024 * 1024)

#define KEY_FILE_DIGITS (5)     // must make sure same bucket in the same file
#define KEY_FILE_NUM (1 << KEY_FILE_DIGITS)
#define MAX_KEY_BUCKET_SIZE (MAX_TOTAL_SIZE / BUCKET_NUM / FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_SIZE)

#define VAL_FILE_DIGITS (5)
#define VAL_FILE_NUM (1 << VAL_FILE_DIGITS)  // must make sure same bucket in the same file
#define MAX_VAL_BUCKET_SIZE (MAX_TOTAL_SIZE / BUCKET_NUM / FILESYSTEM_BLOCK_SIZE * FILESYSTEM_BLOCK_S

2、三个阶段逻辑相关

// Write.
#define WRITE_BARRIER_NUM (16)
// Read.
#define NUM_READ_KEY_THREADS (NUM_THREADS)
#define NUM_FLUSH_TMP_THREADS (32u)
#define KEY_READ_BLOCK_COUNT (8192u)
// Range.
#define RECYCLE_BUFFER_NUM (2u)
#define KEEP_REUSE_BUFFER_NUM (3u)
#define MAX_TOTAL_BUFFER_NUM (RECYCLE_BUFFER_NUM + KEEP_REUSE_BUFFER_NUM)

#define SHRINK_SYNC_FACTOR (2)      // should be divided

// Range Thread Pool.
#define RANGE_QUEUE_DEPTH (8u)
#define VAL_AGG_NUM (32)

#define WORKER_IDLE (0)
#define WORKER_SUBMITTED (1)
#define WORKER_COMPLETED (2)
#define FD_FINISHED (-2)
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
4月前
|
关系型数据库 分布式数据库 数据库
PolarDB产品使用问题之将RDS切换到PolarDB-X 2.0时,代码层的SQL该如何改动
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
4月前
|
运维 关系型数据库 Java
PolarDB产品使用问题之使用List或Range分区表时,Java代码是否需要进行改动
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
资源调度 关系型数据库 Shell
手把手教你如何参与开源项目的协作、贡献代码: 以PolarDB开源项目为例
开源协作是一种社会进化的体现吗? 昨天体验了一下ChatGPT, 对这几个回答深有感触, 开源协作一定是未来会长期存在的, 更大规模化的人类协作模式. 所以我想写一点东西, 来帮助更多人参与开源协作.
1273 3
手把手教你如何参与开源项目的协作、贡献代码: 以PolarDB开源项目为例
|
SQL 网络协议 关系型数据库
开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(一):CN代码结构
开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(一):CN代码结构
5135 0
|
SQL 存储 监控
开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(三):CDC代码结构
开源分布式数据库PolarDB-X源码解读——PolarDB-X源码解读(三):CDC代码结构
5549 0
|
关系型数据库 分布式数据库 数据库
PolarDB-X开源代码更名公告
PolarDB-X开源以来一直和开源社区保持着良性的互动,根据社区开发者和用户的真实反馈,同时为了更好的服务广大用户和开发者,PolarDB-X决定优化开源代码仓库的地址与名称,新地址和新名称会更加便于理解与记忆。虽然名称做了改变,但是PolarDB-X开源的初心不会改变,会持续向社区输出卓越的分布式数据库技术,和广大用户与开发者共同创造国产数据库的美好未来。
PolarDB-X 1.0-API参考-1.0(2017版本)-错误码-服务器端错误代码表
错误代码 描述 HTTP 状态码 ServiceUnavailable The request has failed due to a temporary failure of the server. 503
247 0
|
SQL 算法 关系型数据库
PolarDB-X 1.0-常见问题-错误代码
本文档列出了PolarDB-X返回的常见错误码及解决方法
2315 0
|
1月前
|
关系型数据库 MySQL 分布式数据库
零基础教你用云数据库PolarDB搭建企业网站,完成就送桌面收纳桶!
零基础教你用云数据库PolarDB搭建企业网站,完成就送桌面收纳桶,邀请好友完成更有机会获得​小米Watch S3、小米体重称​等诸多好礼!
零基础教你用云数据库PolarDB搭建企业网站,完成就送桌面收纳桶!
下一篇
无影云桌面