TokuDB · 特性分析 · 导入数据大杀器:Loader

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Loader简介Loader设计思路是把<pk_key,pk_value>二元组缓存在内存中,对每个索引在后台计算索引key。等所有的数据插入操作完成后,对每个索引进行排序,最后用排好序的索引key数组建立FT文件。创建FT的过程:依次创建每个叶子节点(leaf node)和中间节点(internal node)。最后维护元数据,这里包括index descriptor,bloc

Loader简介

Loader设计思路是把<pk_key,pk_value>二元组缓存在内存中,对每个索引在后台计算索引key。等所有的数据插入操作完成后,对每个索引进行排序,最后用排好序的索引key数组建立FT文件。

创建FT的过程:依次创建每个叶子节点(leaf node)和中间节点(internal node)。最后维护元数据,这里包括index descriptor,block allocation table和FT header。

对于数据量很大的索引,因为loader预留的空间有限无法保存所有的key,所以排序的过程可能会有多轮。

每轮把内存的索引key进行merge sort,之后写入临时文件。最后,把所有经过部分排序的临时文件进行多路归并,得到一个有序的索引key数组。

Tokudb loader的主要应用场景:1)外部导入数据,2)同步创建索引。

使用loader有个限制:每个索引必须是空的。所以,loader不适合增量数据插入场景。

以导入数据场景为例:

  • 创建表:(pk和二级索引)
  • 调用db_env->create_loader创建loader
  • 不断调用loader->put()插入数据
  • 中途出错,调用loader->abort()
  • 全部成功,最后调用loader->close()
  • 若loader处理出错,loader使用者需显式drop table删除第一步中创建的表

酱,新的表就可以使用了。

主要数据结构

Note:由于篇幅有限,本文只列出了数据结构重要字段和代码片段。

DB_LOADER

描述了loader提供给Tokudb handler的接口。

typedef struct __toku_loader DB_LOADER;
struct __toku_loader {
  struct __toku_loader_internal *i;
  int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra);
  int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra);
  int (*put)(DB_LOADER *loader, DBT *key, DBT* val);
  int (*close)(DB_LOADER *loader);
  int (*abort)(DB_LOADER *loader);
};

__toku_loader_internal

描述了一些全局信息(如env和事务txn),还有一些callback函数(poll,error)

除此之外,__toku_loader_internal还包含索引的信息:

  • env:全局的db_env
  • txn:创建loader上下文指定的事务
  • N指的是新建索引个数
  • src_db是源索引,一般是NULL
  • dbs是新建索引数组,共N个
  • db_flags是数组,表示每个新建索引的put_flags,pk的put_flags是在set_main_dict_put_flags生成;非pk索引的一般是0
  • dbt_flags也是数组,一般是DB_DBT_REALLOC
  • loader_flags:可能是LOADER_COMPRESS_INTERMEDIATES表示临时文件保存的中间结果需要压缩,一般是0
  • temp_file_template:临时文件的文件名模板
  • err_key:记录loader->put失败的key
  • err_val:记录loader->put失败的value
  • err_i:未使用,本意是希望记录loader->put在写哪个dictionary失败的。loader是pipleline方式实现的,而在loader->put阶段无法确定是在哪个dictionary失败的
  • err_errno:记录loader->put失败的errno
  • inames_in_env:每个新建索引对应的文件名
struct __toku_loader_internal {
    DB_ENV *env;
    DB_TXN *txn;
    FTLOADER ft_loader;
    int N;
    DB **dbs; /* [N] */
    DB *src_db;
    uint32_t *db_flags;
    uint32_t *dbt_flags;
    uint32_t loader_flags;
    void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra);
    void *error_extra;
    int  (*poll_func)(void *poll_extra, float progress);
    void *poll_extra;
    char *temp_file_template;
    DBT err_key;   /* error key */
    DBT err_val;   /* error val */
    int err_i;     /* error i   */
    int err_errno;
    char **inames_in_env; /* [N]  inames of new files to be created */

};

ft_loader

Loader内部主要数据结构的定义在ft_loader,用于生成FT文件。这个数据结构比较重要,也比较大,我们分成几个部分来介绍。

struct ft_loader_s {
    ...
    generate_row_for_put_func generate_row_for_put;
    CACHETABLE cachetable;
    bool did_reserve_memory;
    bool compress_intermediates;
    bool allow_puts;
    uint64_t reserved_memory;
    ...
}
  • generate_row_for_put:为每个索引生成索引key的callback函数
  • cachetable:全局buffer pool指针
  • did_reserve_memory:是否从cachetable里reserve内存
  • compress_intermediates:中间结果是否需要压缩
  • allow_puts:是否接收数据输入;设置成false,表示把directory重定向到空的FT
  • reserved_memory:did_reserve_memory为TRUE,表示从cachetable中reserve了多少内存;did_reserve_memory为FALSE时,使用是512M内存
struct ft_loader_s {
    ...
    DB *src_db;
    int N;
    DB **dbs; // N of these
    DESCRIPTOR *descriptors; // N of these.
    TXNID *root_xids_that_created; // N of these.
    const char **new_fnames_in_env; // N of these.
    ft_compare_func *bt_compare_funs; // N of these
    ...
}
  • N,src_db,dbs跟__toku_loader_internal里相应字段的是一样的
  • descriptors:descriptors[which_db]表示索引的descriptor
  • root_xids_that_created:root_xids_that_created[which_db]表示索引创建时的root txnid
  • new_fnames_in_env:new_fnames_in_env[which_db]表示索引的文件名
  • bt_compare_funs:bt_compare_funs[which_db]表示索引的比较函数
struct ft_loader_s {
    ...
    uint64_t n_rows;
    struct rowset primary_rowset;
    struct rowset primary_rowset_temp;
    QUEUE primary_rowset_queue;
    toku_pthread_t extractor_thread;
    bool extractor_live;
    struct rowset *rows;// N of these.
    uint64_t *extracted_datasizes; // N of these.
    DBT  *last_key;// N of these.
    struct file_infos file_infos;
    ...
}
  • n_rows:一共有多少行数据
  • primary_rowset:缓存<pk_key,pk_value>二元组的数组,loader->put的数据先被缓存到这里
  • primary_rowset_temp:没有被使用,怀疑是legacy code
  • primary_rowset_queue:当primary_rowset占用一定量的内存时,loader->put所在线程会调用函数enqueue_for_extraction,把当前primary_rowset克隆一份,然后挂到primary_rowset_queue队尾。后台extractor线程在不停取这个队列上的rowset,依次进行处理
  • extractor_thread:extractor线程
  • extractor_live:extractor线程是否正在工作
  • rows:rows[which_db]保存索引的key
  • extracted_datasizes:extracted_datasizes[which_db]表示索引缓存了多少数据
  • last_key:没有被使用,怀疑是legacy code
  • file_infos:记录打开文件的信息
struct ft_loader_s {
    ...
    LSN load_lsn;
    TXNID load_root_xid;
    struct merge_fileset *fs;// N of these.
    QUEUE *fractal_queues; // N of these.
    toku_pthread_t *fractal_threads;// N of these.
    bool *fractal_threads_live; // N of these.
    unsigned fractal_workers;
    ...
};
  • load_lsn:第一个loader->put之前的lsn位置
  • load_root_xid:创建ft_loader时,局部事务loader_txn的txnid
  • fs:fs[which_db]表示索引的merge信息
  • fractal_queues:fractal_queues[which_db]表示索引的fractal线程使用的队列
  • fractal_threads:fractal_threads[which_db]表示索引的fractal线程
  • fractal_threads_live:fractal_threads_live[which_db]索引的fractal线程是否正在运行
  • fractal_workers:是否有fractal线程正在构建索引FT,因为merge和构建FT的过程可能会并发,这个字段主要用来计算merge阶段可用的内存大小

Loader主要流程

Loader处理过程大致分为两个阶段:extractor阶段和merge阶段。

Extractor阶段创建extractor后台线程生成索引的key,并存储到rows[which_db]中;merge阶段创建fractal后台线程生成FT文件。

Extractor阶段,loader使用者不停地调用loader->put插入数据,当primary_rowset里面缓存的数据超过一定量后,它会把primary_rowset克隆一份挂到primary_rowset_queue尾部;后台的extractor线程不停地从primary_rowset_queue读取rowset,并对rowset中的每行row计算索引key,然后把生成的索引key缓存到其对应的rowset(rows[which_db])中。如果rows[which_db]中缓存的数据超过一定量后,需要对属于当前primary_rowset的索引key排序并写到临时文件中。每个临时文件中存储的是部分排序的key的有序序列。

Loader插入数据过程结束,每个索引需要对所有的key进行排序。rows[which_db]的空间有限,前面排好序的key可能已经写入临时文件中。

此时,primary_rowset可能还有数据需要extractor线程处理,那么需要把primary_rowset挂到primary_rowset_queue队尾,最后把primary_rowset_queue队列设置成EOF,等待evictor线程处理完退出。

当evictor线程结束后,loader会为每个索引创建fractal_threads[which_db]线程,在loader当前上下文把索引的所有临时文件中的数据按照key升序的方式排序,然后把排好序的key发送给fractal_queues[which_db], 由fractal线程创建FT文件。

创建loader

Loader使用者调用db_env->create_loader创建loader数据结构并进行初始化,也就是初始化前面我们介绍的这些字段。

真正执行创建loader的函数是toku_loader_create_loader。

在外部数据导入和同步创建索引的场景下,loader_flags中并没有设置DB_PRELOCKED_WRITE标记,需要调用toku_db_pre_acquire_table_lock获取区间整个表区间的range锁。

前面提过,loader只适用空表/空索引的情况,因为在loader->close的阶段会创建一个新的FT文件,并在tokudb.directory里修改索引和索引文件的映射关系。整个过程都执行成功的情况下,老的FT文件在关闭时被删除。所以,在创建loader的过程一般都需要确认db是空的。

在locked_load_inames对每个索引生成新的FT文件名,并修改tokudb.directory里面的映射关系,最后对每个FT索引生成loader相关的undo和redo log。这些操作是在loader_txn保护下进行的。

int
toku_loader_create_loader(DB_ENV *env,
                          DB_TXN *txn,
                          DB_LOADER **blp,
                          DB *src_db,
                          int N,
                          DB *dbs[],
                          uint32_t db_flags[/*N*/],
                          uint32_t dbt_flags[/*N*/],
                          uint32_t loader_flags,
                          bool check_empty) {
    // lock tables and check empty
    for(int i=0;i<N;i++) {
        if (!(loader_flags&DB_PRELOCKED_WRITE)) {
            rval = toku_db_pre_acquire_table_lock(dbs[i], txn);
            if (rval!=0) {
                goto create_exit;
            }
        }
        if (check_empty) {
            bool empty = toku_ft_is_empty_fast(dbs[i]->i->ft_handle);
            if (!empty) {
                rval = ENOTEMPTY;
                goto create_exit;
            }
        }
    }

    {
        if (env->i->open_flags & DB_INIT_TXN) {
            rval = env->txn_begin(env, txn, &loader_txn, 0);
            if (rval) {
                goto create_exit;
            }
        }

        ft_compare_func compare_functions[N];
        for (int i=0; i<N; i++) {
            compare_functions[i] = env->i->bt_compare;
        }

        // time to open the big kahuna
        char **XMALLOC_N(N, new_inames_in_env);
        for (int i = 0; i < N; i++) {
            new_inames_in_env[i] = nullptr;
        }
        FT_HANDLE *XMALLOC_N(N, fts);
        for (int i=0; i<N; i++) {
            fts[i] = dbs[i]->i->ft_handle;
        }
        LSN load_lsn;
        rval = locked_load_inames(env, loader_txn, N, dbs, new_inames_in_env, &load_lsn, puts_allowed);
        if ( rval!=0 ) {
        }
        TOKUTXN ttxn = loader_txn ? db_txn_struct_i(loader_txn)->tokutxn : NULL;
        rval = toku_ft_loader_open(&loader->i->ft_loader,
                                   env->i->cachetable,
                                   env->i->generate_row_for_put,
                                   src_db,
                                   N,
                                   fts, 
                                   dbs,
                                   (const char **)new_inames_in_env,
                                   compare_functions,
                                   loader->i->temp_file_template,
                                   load_lsn,
                                   ttxn,
                                   puts_allowed,
                                   env->get_loader_memory_size(env),
                                   compress_intermediates,
                                   puts_allowed);
        if ( rval!=0 ) {
        }

        loader->i->inames_in_env = new_inames_in_env;
        toku_free(fts);

        if (!puts_allowed) {
            rval = ft_loader_close_and_redirect(loader);
            assert_zero(rval);
            loader->i->ft_loader = NULL;
            // close the ft_loader and skip to the redirection
            rval = 0;
        }

        rval = loader_txn->commit(loader_txn, 0);
        assert_zero(rval);
        loader_txn = nullptr;

        rval = 0;
    }
    *blp = loader;
 create_exit:
    if (loader_txn) {
        int r  = loader_txn->abort(loader_txn);
        assert_zero(r);
        loader_txn = nullptr;
    }
    if (rval == 0) {
    }
    else {
        free_loader(loader);
    }
    return rval;
}

一般使用loader的场景都需要往loader里面插入数据,所以allow_puts一般是TRUE。初始化结束后,需要创建extractor线程。

Extractor线程前面有提到过,是从primary_rowset_queue不断地拉rowset,对rowset里面的每行数据生成索引<key,value>的二元组,并把这些二元组缓存到rows[which_db]里面。当rows[which_db]缓存的数据量积累到一定程度会进行排序,然后把排序好的<key,value>写到临时文件中。

顺便说下,除了pk和clustering index以外,二级索引的value都是NULL。

toku_ft_loader_internal_init负责初始化数据,这里主要是初始化rowset(索引的rowset:rows[which_db]和接收输入的primary_rowset),创建primary_rowset_queue队列。这部分代码比较直观,读者可以自行分析。

函数toku_ft_loader_internal_init注册了3个重要的callback函数loader->put,loader->close和loader->abort分别处理数据插入,loader关闭和loader异常退出。

int toku_ft_loader_open (FTLOADER *blp, /* out */
                         CACHETABLE cachetable,
                         generate_row_for_put_func g,
                         DB *src_db,
                         int N, FT_HANDLE fts[/*N*/], 
                         DB* dbs[/*N*/],
                         const char *new_fnames_in_env[/*N*/],
                         ft_compare_func bt_compare_functions[/*N*/],
                         const char *temp_file_template,
                         LSN load_lsn,
                         TOKUTXN txn,
                         bool reserve_memory,
                         uint64_t reserve_memory_size,
                         bool compress_intermediates,
                         bool allow_puts) {
    int result = 0;
    {
        int r = toku_ft_loader_internal_init(blp, cachetable, g, 
                                             src_db,
                                             N, fts, dbs,
                                             new_fnames_in_env,
                                             bt_compare_functions,
                                             temp_file_template,
                                             load_lsn,
                                             txn,
                                             reserve_memory,
                                             reserve_memory_size,
                                             compress_intermediates,
                                             allow_puts);
        if (r!=0) result = r;
    }
    if (result==0 && allow_puts) {
        FTLOADER bl = *blp;
        int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
        if (r==0) {
            bl->extractor_live = true;
        } else  {
            result = r;
            (void) toku_ft_loader_internal_destroy(bl, true);
        }
    }
    return result;
}

值得一提的是memory_per_rowset_during_extract函数。在extractor阶段rowset的大小是函数memory_per_rowset_during_extract计算出来的。

这个阶段分配的内存主要包括两个部分:

  • rowset:primary row + rows(N个) + primary_rowset_queue队列的长度 + 挂到primary_rowset_queue分配的rowset + 存放索引排序结果的rowset(N个)
  • 临时文件写缓冲区(N个):大小16M
static uint64_t memory_per_rowset_during_extract (FTLOADER bl)
// Return how much memory can be allocated for each rowset.
{
    if (size_factor==1) {
        return 16*1024;
    } else {
        // There is a primary rowset being maintained by the foreground thread.
        // There could be two more in the queue.
        // There is one rowset for each index (bl->N) being filled in.
        // Later we may have sort_and_write operations spawning in parallel, and will need to account for that.
        int n_copies = (1 // primary rowset
                        +EXTRACTOR_QUEUE_DEPTH  // the number of primaries in the queue
                        +bl->N // the N rowsets being constructed by the extractor thread.
                        +bl->N // the N sort buffers
                        +1     // Give the extractor thread one more so that it can have temporary space for sorting.  This is overkill.
                        );
        int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE;  // for each index we are writing to a file at any given time.
        int64_t tentative_rowset_size = ((int64_t)(bl->reserved_memory - extra_reserved_memory))/(n_copies);
        return MAX(tentative_rowset_size, (int64_t)MIN_ROWSET_MEMORY);
    }
}

loader->put

Loader的处理过程分为两个阶段:extract阶段和merge阶段。

Extract阶段是从extractor线程命名而来,extractor线程不断地从primary_rowset_queue拉rowset,对rowset里面的row计算索引key并保存到loader->rows[which_db]里面。

Merge阶段是数据插入结束,显式调用loader->close方法。Loader对每个db,把所有部分排序的数据进行归并得到一个按索引key升序的<key,value>二元组序列,最后根据这个序列生成FT文件。

Loader->put的处理过程就是extract阶段。

  • 前景线程(创建loader上下文所在线程)不断的接收数据,并把数据缓存到primary_rowset,当其缓存的数据量大于primary_rowset->memory_budget时,会把primary_rowset克隆一份挂到primary_rowset_queue队尾并重新初始化primary_rowset,前景线程就可以继续接收数据了
  • 背景线程(extractor)不断从primary_rowset_queue拉rowset,并计算索引key和value。当其缓存的数据量大于primary_rowset->memory_budget时,对索引的<key,value>二元组按照key升序方式进行merge-sort并把排好序的<key,value>序列转存到临时文件中

在extractor阶段,只有一个背景线程(extractor线程),连接前景线程和extractor线程的是primary_rowset_queue。

screenshot.png

Loader的使用者调用loader->put,其实是调用函数toku_loader_put,这个函数是在loader创建阶段注册的put方法的callback函数。

这个函数首先需要检查LOADER_DISALLOW_PUTS是否设置,这个标记设置表示只使用loader来重定向director,而不是真正插入数据。所以,检查到这个标记被设置就直接退出了。

一般使用loader的用法都没有设置LOADER_DISALLOW_PUTS标记。

插入数据是在函数toku_ft_loader_put处理的。

int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
{
    int r = 0;
    int i = 0;

    if (loader->i->loader_flags & LOADER_DISALLOW_PUTS) {
        r = EINVAL;
        goto cleanup;
    }
    else {
        r = toku_ft_loader_put(loader->i->ft_loader, key, val);
    }
    if ( r != 0 ) {
    }
 cleanup:
    return r;
}

在toku_ft_loader_put中,重复检查是否设置了LOADER_DISALLOW_PUTS,如果设置了返回EINVAL并退出。

n_rows表示loader一共接收了多少行数据,这是个全局的counter。

toku_ft_loader_put是loader_do_put简单封装,真正处理put的在loader_do_put。

int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val)
/* Effect: Put a key-value pair into the ft loader.  Called by DB_LOADER->put().
 * Return value: 0 on success, an error number otherwise.
 */
{
    if (!bl->allow_puts || ft_loader_get_error(&bl->error_callback))
        return EINVAL; // previous panic
    bl->n_rows++;
    return loader_do_put(bl, key, val);
}

在loader_do_put,首先把<pk_key,pk_value>插入到primary_rowset。

rowset包括两部分:

  • rows:是行数据的描述符
  • data:是存储数据的缓冲区

每个<key,value>二元组是按照到达的顺序存储到primary_rowset->data指向的缓冲区中,先key后value连续存储;row.off记录了这行数据在rows.data中的偏移位置;row.klen和row.vlen分别表示key/value的长度。

rows和data都是动态数组,随着数据量增大可以动态扩展内存。

struct row {
    size_t off; // the offset in the data array.
    int klen,vlen;
};

struct rowset {
    uint64_t memory_budget;
    size_t n_rows, n_rows_limit;
    struct row *rows;
    size_t n_bytes, n_bytes_limit;
    char *data;
};

数据插入到primary_rowset,如果primary_rowset中缓存的数据量(rows+data)大于memory_budget,意味着primary_rowset满了,loader前景线程会把primary_rowset克隆一份挂到primary_rowset_queue尾部,并重新初始化primary_rowset。

Extract阶段,rowset的memory_budget是memory_per_rowset_during_extract计算的。前面有介绍过,这里不再赘述。

static int row_wont_fit (struct rowset *rows, size_t size)
/* Effect: Return nonzero if adding a row of size SIZE would be too big (bigger than the buffer limit) */
{
    // Account for the memory used by the data and also the row structures.
    size_t memory_in_use = (rows->n_rows*sizeof(struct row)
                            + rows->n_bytes);
    return (rows->memory_budget <  memory_in_use + size);
}

static int loader_do_put(FTLOADER bl,
                         DBT *pkey,
                         DBT *pval)
{
    int result;
    result = add_row(&bl->primary_rowset, pkey, pval);
    if (result == 0 && row_wont_fit(&bl->primary_rowset, 0)) {
        // queue the rows for further processing by the extractor thread
        enqueue_for_extraction(bl);
        {
            int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
            // bl->primary_rowset will get destroyed by toku_ft_loader_abort
            if (r != 0)
                result = r;
        }
    }
    return result;
}

static void enqueue_for_extraction (FTLOADER bl) {
    struct rowset *XMALLOC(enqueue_me);
    *enqueue_me = bl->primary_rowset;
    zero_rowset(&bl->primary_rowset);
    int r = toku_queue_enq(bl->primary_rowset_queue, (void*)enqueue_me, 1, NULL);
    resource_assert_zero(r);
}

呵呵,到这里extract的前景线程就处理完了,可以等待新的数据到来。

下面,我们一起看下extract阶段的背景线程。这个线程是在函数toku_ft_loader_open创建的。Extractor线程不断从primary_rowset_queue拉rowset,对每个rowset调用process_primary_rows进行处理。如果插入数据结束,用户会调用loader->close往primary_rowset_queue挂一个EOF,让extractor线程结束。

static void* extractor_thread (void *blv) {
    FTLOADER bl = (FTLOADER)blv;
    int r = 0;
    while (1) {
        void *item;
        {
            int rq = toku_queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
            if (rq==EOF) break;
            invariant(rq==0); // other errors are arbitrarily bad.
        }
        struct rowset *primary_rowset = (struct rowset *)item;
        {
            r = process_primary_rows(bl, primary_rowset);
            if (r)
                ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
        }
    }

    if (r == 0) {
        r = finish_primary_rows(bl);
        if (r)
            ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);

    }
    return NULL;
}

函数process_primary_rows是process_primary_rows_internal的简单封装。

函数process_primary_rows_internal有两个循环:外层循环是依次处理每个索引的db,里层循环是为extractor线程当前的primary_rowset的每行row生成索引的key和value。

bl->generate_row_for_put是为索引生成key和value的函数,基于<pkey,pval>二元组为索引bl->dbs[which_db]生成索引key和value。

生成的索引key和value会被缓存到索引的rowset里面,即bl->rows[which_db];bl->rows[which_db]满时,会对这个索引的rowset按照key升序的顺序排序并转存到临时文件中。

static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_rowset)
{
    int error_count = 0;
    int *XMALLOC_N(bl->N, error_codes);

    for (int i = 0; i < bl->N; i++) {
        error_codes[i] = 0;
        struct rowset *rows = &(bl->rows[i]);
        struct merge_fileset *fs = &(bl->fs[i]);
        ft_compare_func compare = bl->bt_compare_funs[i];

        for (size_t prownum=0; prownum<primary_rowset->n_rows; prownum++) {
            if (error_count) break;

            struct row *prow = &primary_rowset->rows[prownum];
            DBT pkey,pval;
            pkey.data = primary_rowset->data + prow->off;
            pkey.size = prow->klen;
            pval.data = primary_rowset->data + prow->off + prow->klen;
            pval.size = prow->vlen;

            DBT_ARRAY key_array;
            DBT_ARRAY val_array;
            if (bl->dbs[i] != bl->src_db) {
                int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &dest_keys, &dest_vals, &pkey, &pval);
                if (r != 0) {
                    error_codes[i] = r;
                    inc_error_count();
                    break;
                }
                paranoid_invariant(dest_keys.size <= dest_keys.capacity);
                paranoid_invariant(dest_vals.size <= dest_vals.capacity);
                paranoid_invariant(dest_keys.size == dest_vals.size);

                key_array = dest_keys;
                val_array = dest_vals;
            } else {
                key_array.size = key_array.capacity = 1;
                key_array.dbts = &pkey;

                val_array.size = val_array.capacity = 1;
                val_array.dbts = &pval;
            }
            for (uint32_t row = 0; row < key_array.size; row++) {
                DBT *dest_key = &key_array.dbts[row];
                DBT *dest_val = &val_array.dbts[row];

                bl->extracted_datasizes[i] += ft_loader_leafentry_size(dest_key->size, dest_val->size, leafentry_xid(bl, i));

                if (row_wont_fit(rows, dest_key->size + dest_val->size)) {
                    int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare);
                    init_rowset(rows, memory_per_rowset_during_extract(bl));
                    if (r != 0) {
                        error_codes[i] = r;
                        inc_error_count();
                        break;
                    }
                }
                int r = add_row(rows, dest_key, dest_val);
                if (r != 0) {
                    break;
                }
            }
        }
    }
    toku_dbt_array_destroy(&dest_keys);
    toku_dbt_array_destroy(&dest_vals);

    destroy_rowset(primary_rowset);
    toku_free(primary_rowset);
    int r = 0;
    if (error_count > 0) {
    }
    toku_free(error_codes);
    return r;
}

排序结束,就把索引在当前rowset里面的<key,value>二元组写入到临时文件中,每个临时文件里的数据都是按照key有序的。为了尽量合并这些部分排序的中间结果,loader为每个临时文件维护了prev_key,里面存储了当前打开临时文件的最大key。

如果当前rowset的所有key都比prev_key大,那么就可以把当前结果合并到正在打开的临时文件中,不必另起一个新文件,此时需要更新prev_key。

如果不能合并,就需要关闭当前临时文件,另起一个新的临时文件写入,并重新维护prev_key。

每个索引都为临时文件配置了16M的缓冲区。开启一个新的临时文件是在函数extend_fileset里面实现的,把一个预先分配好的16M缓冲区attach到那个文件。

int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare)
{
    int result;
    if (rows.n_rows == 0) {
        result = 0;
    } else {
        result = sort_rows(&rows, which_db, dest_db, compare, bl);

        if (result == 0) {
            DBT min_rowset_key = make_dbt(rows.data+rows.rows[0].off, rows.rows[0].klen);
            if (fs->have_sorted_output && compare(dest_db, &fs->prev_key, &min_rowset_key) < 0) {
                // write everything to the same output if the max key in the temp file (prev_key) is < min of the sorted rowset
                result = write_rowset_to_file(bl, fs->sorted_output, rows);
                if (result == 0) {
                    // set the max key in the temp file to the max key in the sorted rowset
                    result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
                }
            } else {
                // write the sorted rowset into a new temp file
                if (fs->have_sorted_output) {
                    fs->have_sorted_output = false;
                    result = ft_loader_fi_close(&bl->file_infos, fs->sorted_output, true);
                }
                if (result == 0) {
                    FIDX sfile = FIDX_NULL;
                    result = extend_fileset(bl, fs, &sfile);
                    if (result == 0) {
                        result = write_rowset_to_file(bl, sfile, rows);
                        if (result == 0) {
                            fs->have_sorted_output = true;
                            fs->sorted_output = sfile;
                            // set the max key in the temp file to the max key in the sorted rowset
                            result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
                        }
                    }
                }
            }
        }
    }

    destroy_rowset(&rows);

    return result;
}

呵呵,数据插入结束,loader会调用toku_queue_eof往primary_rowset_queue挂一个EOF通知evictor线程结束。

loader->close

我们一起回顾一下批量数据导入的应用场景,数据插入结束后,load的使用者调用loader->close()。

函数toku_loader_close是toku_ft_loader_internal_init注册的loader->close方法。

如果loader->put阶段处理出错,toku_loader_close会根据LOADER_DISALLOW_PUTS标记是否被设置做不同的处理;如果LOADER_DISALLOW_PUTS没有被设置,它会直接调用redirect_loader_to_empty_dictionaries做隐式abort,把每个索引redirect到空的FT上。如果LOADER_DISALLOW_PUTS被设置直接返回错误。

如果loader->put阶段处理成功,但是toku_loader_close内部执行出错,它会直接调用函数redirect_loader_to_empty_dictionaries做隐式abort,把每个索引redirect到空的FT上。

如果之前的处理都正常,toku_loader_close只处理LOADER_DISALLOW_PUTS没有被设置的情况,它会调用ft_loader_close_and_redirect进入merge阶段的处理。

int toku_loader_close(DB_LOADER *loader)
{
    if ( loader->i->err_errno != 0 ) {
        if ( loader->i->error_callback != NULL ) {
            loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
        }
        if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
            r = toku_ft_loader_abort(loader->i->ft_loader, true);
            redirect_loader_to_empty_dictionaries(loader);
        }
        else {
            r = loader->i->err_errno;
        }
    }
    else { // no error outstanding
        if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
            r = ft_loader_close_and_redirect(loader);
            if (r) {
                redirect_loader_to_empty_dictionaries(loader);
            }
        }
    }
    free_loader(loader);
    return r;
}

函数ft_loader_close_and_redirect调用toku_ft_loader_close把每个索引的<key,value>二元组按照key升序的顺序排序并生成FT文件。然后,调用toku_dictionary_redirect把每个索引redirect到刚刚生成的FT上。

static int ft_loader_close_and_redirect(DB_LOADER *loader) {
    int r;
    // use the bulk loader
    // in case you've been looking - here is where the real work is done!
    r = toku_ft_loader_close(loader->i->ft_loader, loader->i->error_callback,
                             loader->i->error_extra, loader->i->poll_func, loader->i->poll_extra);
    if ( r==0 ) {
        for (int i=0; i<loader->i->N; i++) {
            toku_multi_operation_client_lock(); //Must hold MO lock for dictionary_redirect.
            r = toku_dictionary_redirect(loader->i->inames_in_env[i],
                                         loader->i->dbs[i]->i->ft_handle,
                                         db_txn_struct_i(loader->i->txn)->tokutxn);
            toku_multi_operation_client_unlock();
            if ( r!=0 ) break;
        }
    }
    return r;
}

函数toku_ft_loader_close负责结束loader的extractor阶段,把之前缓存的rowset挂到primary_rowset_queue,并在最后往primary_rowset_queue挂一个EOF,通知extractor线程停止。

如果设置了LOADER_DISALLOW_PUTS,这个loader不接收任何数据输入,一般是用作特殊作用,即把每个索引redirect到空的FT上。

bl->extractor_live为FALSE表示extractor线程没有被启动,回顾一下函数toku_ft_loader_open的处理,只有在toku_ft_loader_internal_init返回成功并且没有设置LOADER_DISALLOW_PUTS的情况下,才会启动extractor线程,并把extractor_live设置为TRUE。

所以,bl->extractor_live为FALSE的条件在大部分情况下都是不成立的,笔者猜测这是为了防止loader的使用者在创建loader出错的情况下错误地使用loader->close方式释放loader。

处理merge的重头戏在函数toku_ft_loader_close_internal里,为每个索引创建fractal线程和相应的queue。

int toku_ft_loader_close (FTLOADER bl, ft_loader_error_func error_function, void *error_extra,
                          ft_loader_poll_func  poll_function,  void *poll_extra)
{
    int result = 0;
    int r;

    ft_loader_set_error_function(&bl->error_callback, error_function, error_extra);
    ft_loader_set_poll_function(&bl->poll_callback, poll_function, poll_extra);

    if (bl->extractor_live) {
        r = finish_extractor(bl);
        if (r)
            result = r;
        invariant(!bl->extractor_live);
    } else {
        r = finish_primary_rows(bl);
        if (r)
            result = r;
    }

    // check for an error during extraction
    if (result == 0) {
        r = ft_loader_call_error_function(&bl->error_callback);
        if (r)
            result = r;
    }

    if (result == 0) {
        r = toku_ft_loader_close_internal(bl);
        if (r && result == 0)
            result = r;
    } else
        toku_ft_loader_internal_destroy(bl, true);

    return result;
}

toku_ft_loader_close_internal为每个索引调用loader_do_i进行处理。这个函数还为每个索引创建fractal_queues[which_db]接收排好序的索引<key,value>二元组序列。

在extractor阶段部分排序的结果存储在临时文件中,bl->fs[which_db]中记录临时文件的数据结构。

Extractor阶段结束后,所有索引的<key,value>二元组应该都被处理过了,bl->rows[which_db].rows一定是NULL。

函数loader_do_i会根据dest_db原有的设置来配置新的FT,包括:node size,basement size,compression method和fanout。

一般情况下都没有设置LOADER_DISALLOW_PUTS标记,loader会为当前索引创建fractal线程,并调用merge_files函数把所有部分排序的结果进行归并产生按照索引key有序的<key,value>序列并转发到对应的fractal_queues[which_db]上,fractal线程根据这些<key,value>二元组产生新的FT文件。

若设置LOADER_DISALLOW_PUTS,loader会调用toku_loader_write_ft_from_q产生一个空的FT。

如果fractal线程在生成新FT过程中出错,fractal线程退出后bl->rows[which_db]可能还有未被处理过的索引<key,value>二元组。所以,loader_do_i在退出前需要释放rows->data和rows->rows。

在merge阶段中:

  • 前景线程:等待extractor线程把部分排序并转储临时文件的工作结束。循环处理每个索引,对这个索引的所有临时文件(部分排序结果)进行归并,生成一个全局按key有序的<key,value>二元组序列。并把这个<key,value>二元组序列转发给fractal_queues[which_db]
  • 背景线程(fractal):fractal[which_db]不断从fractal_queues[which_db]去拉排好序的<key,value>二元组序列,根据这个序列生成最终的FT文件。

在merge阶段有个N个背景线程,连接前景线程(创建loader的上下文)和背景线程 fractal[which_db]的是fractal_queues[which_db]。

screenshot.png

static int loader_do_i (FTLOADER bl,
                        int which_db,
                        DB *dest_db,
                        ft_compare_func compare,
                        const DESCRIPTOR descriptor,
                        const char *new_fname,
                        int progress_allocation // how much progress do I need to add into bl->progress by the end..
                        )
/* Effect: Handle the file creating for one particular DB in the bulk loader. */
/* Requires: The data is fully extracted, so we can do merges out of files and write the ft file. */
{
    struct merge_fileset *fs = &(bl->fs[which_db]);
    struct rowset *rows = &(bl->rows[which_db]);
    invariant(rows->data==NULL); // the rows should be all cleaned up already

    int r = toku_queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH);
    if (r) goto error;

    {
        mode_t mode = S_IRUSR+S_IWUSR + S_IRGRP+S_IWGRP;
        int fd = toku_os_open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode); // #2621
        if (fd < 0) {
            r = get_error_errno(); goto error;
        }

        uint32_t target_nodesize, target_basementnodesize, target_fanout;
        enum toku_compression_method target_compression_method;
        r = dest_db->get_pagesize(dest_db, &target_nodesize);
        invariant_zero(r);
        r = dest_db->get_readpagesize(dest_db, &target_basementnodesize);
        invariant_zero(r);
        r = dest_db->get_compression_method(dest_db, &target_compression_method);
        invariant_zero(r);
        r = dest_db->get_fanout(dest_db, &target_fanout);
        invariant_zero(r);

        if (bl->allow_puts) {
            // a better allocation would be to figure out roughly how many merge passes we'll need.
            int allocation_for_merge = (2*progress_allocation)/3;
            progress_allocation -= allocation_for_merge;

            // This structure must stay live until the join below.
            struct fractal_thread_args fta = {
                bl,
                descriptor,
                fd,
                progress_allocation,
                bl->fractal_queues[which_db],
                bl->extracted_datasizes[which_db],
                0,
                which_db,
                target_nodesize,
                target_basementnodesize,
                target_compression_method,
                target_fanout
            };

            r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
            if (r) {
                int r2 __attribute__((__unused__)) = toku_queue_destroy(bl->fractal_queues[which_db]);
                // ignore r2, since we already have an error
                bl->fractal_queues[which_db] = nullptr;
                goto error;
            }
            invariant(bl->fractal_threads_live[which_db]==false);
            bl->fractal_threads_live[which_db] = true;

            r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);

            {
                void *toku_pthread_retval;
                int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
                invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here.  A previous bug put that struct into a C block statement.
                resource_assert_zero(r2);
                invariant(toku_pthread_retval==NULL);
                invariant(bl->fractal_threads_live[which_db]);
                bl->fractal_threads_live[which_db] = false;
                if (r == 0) r = fta.errno_result;
            }
        } else {
            toku_queue_eof(bl->fractal_queues[which_db]);
            r = toku_loader_write_ft_from_q(bl, descriptor, fd, progress_allocation,
                                            bl->fractal_queues[which_db], bl->extracted_datasizes[which_db], which_db,
                                            target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
        }
    }

 error: // this is the cleanup code.  Even if r==0 (no error) we fall through to here.
    if (bl->fractal_queues[which_db]) {
        int r2 = toku_queue_destroy(bl->fractal_queues[which_db]);
        invariant(r2==0);
        bl->fractal_queues[which_db] = nullptr;
    }

    // if we get here we need to free up the merge_fileset and the rowset, as well as the keys
    toku_free(rows->data); rows->data = NULL;
    toku_free(rows->rows); rows->rows = NULL;
    toku_free(fs->data_fidxs); fs->data_fidxs = NULL;
    return r;
}

函数merge_files进行多轮归并,每轮把前一轮的临时文件集合归并成数目更少的临时文件集合。每次归并时,顺序选择几个临时文件(输入集)进行归并,生成新的临时文件(输出文件)。每轮的所有输出文件构成输出集,被下一轮作为输入集使用。

新产生的临时文件(输出文件)包含所有输入集的<key,value>二元组,文件中的二元组是按照key有序排列的。每轮归并旨在对输入集中的<key,value>二元组按照key进行排序并减少下一轮的输入集的文件个数,直到输出集只包含一个输出文件。

最后一轮产生的索引<key,value>二元组序列,是按照key有序的全序序列。

loader把他们分成几批存储到rows[which_db]的rowset里面,并以rows[which_db]作为媒介转发给fractal线程。

函数merge_files在读取临时文件中部分排序的结果时,使用dbufio来异步读取临时文件中的key和value。dbufio是double buffering io的意思,维护两个buffer和一个异步I/O线程,前景线程永远从buf[0]读取数据,异步的I/O线程负责把buf[1]填满。

当buf[0]的数据被消耗掉后,会交换buf[0]和buf[1],并且告诉I/O线程把后面的数据读到buf[1](老的buf[0])里面。如果前景线程消耗数据太快,异步I/O线程的读操作还没有完成,前景线程会阻塞在dbufio内部的条件变量上。这是个典型的生产者消费者模型。

Merge阶段,需要使用内存的buffer主要包含三部分:

  • 多轮归并时的dbufio:每轮输入集大小 * DBUFIO_DEPTH(2)
  • fractol线程使用的内存:fractal_queue深度 + 每次挂到fractal_queue时分配的rowset + attach到pivot_file的buffer
  • fractal线程把node写入磁盘时存放序列化后数据的buffer(压缩和未压缩的)
// To compute a merge, we have a certain amount of memory to work with.
// We perform only one fanin at a time.
// If the fanout is F then we are using
//   F merges.  Each merge uses
//   DBUFIO_DEPTH buffers for double buffering.  Each buffer is of size at least MERGE_BUF_SIZE
// so the memory is
//   F*MERGE_BUF_SIZE*DBUFIO_DEPTH storage.
// We use some additional space to buffer the outputs.
//  That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile.
//  And we have FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE per queue
//  And if we are doing a fractal, each worker could have have a fractal tree that it's working on.
//
// DBUFIO_DEPTH*F*MERGE_BUF_SIZE + FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE + WORKERS*NODESIZE*2 <= RESERVED_MEMORY

static int64_t memory_avail_during_merge(FTLOADER bl, bool is_fractal_node) {
    // avail memory = reserved memory - WORKERS*NODESIZE*2 for the last merge stage only
    int64_t avail_memory = bl->reserved_memory;
    if (is_fractal_node) {
        // reserve space for the fractal writer thread buffers
        avail_memory -= (int64_t)ft_loader_get_fractal_workers_count(bl) * (int64_t)default_loader_nodesize * 2; // compressed and uncompressed buffers
    }
    return avail_memory;
}

static uint64_t memory_per_rowset_during_merge (FTLOADER bl, int merge_factor, bool is_fractal_node // if it is being sent to a q
                                                ) {
    int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
    int64_t nbuffers = DBUFIO_DEPTH * merge_factor;
    if (is_fractal_node)
        nbuffers += FRACTAL_WRITER_ROWSETS;
    return MAX(memory_avail / nbuffers, (int64_t)MIN_MERGE_BUF_SIZE);
}

以上是归并的过程。

生成新FT的过程是在函数toku_loader_write_ft_from_q处理的。这个函数太长,就不在这贴代码了。

大致过程是这样的:
1. 从fractal_queues[which_db]依次读取<key,value>二元组并建立leaf node,每个leaf上存储的数据量一定小于7/8 nodesize,后续数据保存在下一个leaf上。
2. 为了建立internal subtree,在创建leaf node的过程中,维护了一个pivot文件保存每个leaf node中存储的最大key,pivot文件中最后一个key是没有用的,后面的处理会丢弃掉最后一个key;除了pivot文件,loader还为internal node维护一个subtree_info数组记录每个leaf node的blocknum。
3. 建立internal subtree过程分多轮进行,每轮建立某一高度的internal node,从高度1开始。
4. 写descriptor
5. 写BTT
6. 写FT header

internal node的处理过程,每轮处理:
依次读取上一轮产生的pivot文件中key,每次读取15个pivot key,最后一个key用作建立下一轮的pivot文件。用剩下的14个pivot key和上一轮得到的subtree_info数组中的blocknum建立本轮的internal node,并把新创建internal node的blocknum存储到另一个subtree_info数组,提供给下轮处理使用。

每轮处理完成时,会生成下轮处理需要的pivot文件和subtree_info数组。height 1时读取的是创建leaf node时生成的pivot文件和subtree_info数组。

如果一切顺利,在这之后就调用toku_dictionary_redirect把原来db[which_db]上打开的txn和ft_handle重定向到新的FT上。

loader->abort

异常退出的时候,loader首先释放ft_loader中的内部数据结构,然后调用redirect_loader_to_empty_dictionaries创建带有LOADER_DISALLOW_PUTS标记的temporary loader。

这个temporary loader首先在tokudb.directory里,把原来的loader处理的那些映射关系<index, newFT>改成<index,emptyFT>。然后在创建ft_loader之后,直接调用ft_loader_close_and_redirect把db[which_db]上打开的txn和ft_handle重定向到空的FT上。

static void redirect_loader_to_empty_dictionaries(DB_LOADER *loader) {
    DB_LOADER* tmp_loader = NULL;
    int r = toku_loader_create_loader(
        loader->i->env,
        loader->i->txn,
        &tmp_loader,
        loader->i->src_db,
        loader->i->N,
        loader->i->dbs,
        loader->i->db_flags,
        loader->i->dbt_flags,
        LOADER_DISALLOW_PUTS,
        false
        );
    lazy_assert_zero(r);
    r = toku_loader_close(tmp_loader);
}

int
toku_loader_create_loader(DB_ENV *env,
                          DB_TXN *txn,
                          DB_LOADER **blp,
                          DB *src_db,
                          int N,
                          DB *dbs[],
                          uint32_t db_flags[/*N*/],
                          uint32_t dbt_flags[/*N*/],
                          uint32_t loader_flags,
                          bool check_empty) {
        rval = locked_load_inames(env, loader_txn, N, dbs, new_inames_in_env, &load_lsn, puts_allowed);
        if ( rval!=0 ) {
            free_inames(new_inames_in_env, N);
            toku_free(fts);
            goto create_exit;
        }
        TOKUTXN ttxn = loader_txn ? db_txn_struct_i(loader_txn)->tokutxn : NULL;
        rval = toku_ft_loader_open(&loader->i->ft_loader,
                                   env->i->cachetable,
                                   env->i->generate_row_for_put,
                                   src_db,
                                   N,
                                   fts, dbs,
                                   (const char **)new_inames_in_env,
                                   compare_functions,
                                   loader->i->temp_file_template,
                                   load_lsn,
                                   ttxn,
                                   puts_allowed,
                                   env->get_loader_memory_size(env),
                                   compress_intermediates,
                                   puts_allowed);
        if ( rval!=0 ) {
            free_inames(new_inames_in_env, N);
            toku_free(fts);
            goto create_exit;
        }

        loader->i->inames_in_env = new_inames_in_env;
        toku_free(fts);

        if (!puts_allowed) {
            rval = ft_loader_close_and_redirect(loader);
            assert_zero(rval);
            loader->i->ft_loader = NULL;
            // close the ft_loader and skip to the redirection
            rval = 0;
        }

酱,loader的处理过程大致就是这样~

目录
相关文章
|
SQL 存储 NoSQL
MSSQL · 特性分析 · 列存储技术做实时分析
摘要 数据分析指导商业行为的价值越来越高,使得用户对数据实时分析的要求变得越来越高。使用传统RDBMS数据分析架构,遇到了前所未有的挑战,高延迟、数据处理流程复杂和成本过高。这篇文章讨论如何利用SQL Server 2016列存储技术做实时数据分析,解决传统分析方法的痛点。 传统RDBMS数据分析 在过去很长一段时间,企业均选择传统的关系型数据库做OLAP和Data Warehouse工作。这一
3620 0
|
SQL 监控 Go
MSSQL · 应用案例 · 日志表设计优化与实现
摘要 这篇文章从日志表问题引入、日志表的共有特性、日志表的设计需求、设计思路以及设计详细实现的角度,阐述了在SQL Server数据库中如何最优化设计日志表来降低系统资源的占用和提高系统吞吐量。问题引入 在平时与客户服务与交流过程中,我们不止一次的被客人问及这样的场景:我们现在面临如何设计SQL Server日志表方案,如何最优化设计数据库日志记录表。
1531 0
|
存储 测试技术 OLAP
PgSQL · 特性介绍 · 列存元数据扫描介绍
摘要 本文通过对于阿里云分析型数据库HybridDB for postgresql 数据库的列存扫描的优化特征的解析,让大家了解列存元数据扫描是如何达到提升查询扫描的速度的效果。从而使的分析型查询执行时间进一步缩短。
2005 0
|
存储 测试技术 OLAP
PgSQL · 特性介绍 · 列存元数据扫描介绍
摘要 本文通过对于阿里云分析型数据库HybridDB for postgresql 数据库的列存扫描的优化特征的解析,让大家了解列存元数据扫描是如何达到提升查询扫描的速度的效果。从而使的分析型查询执行时间进一步缩短。最终能够更好的为阿里云的用户提供更高性价比的服务。 关键字 Meta data scan,HybridDB for postgresql, GreenPlum,column stor
3598 0
|
SQL 关系型数据库 索引
HybridDB · 最佳实践 · HybridDB 数据合并的方法与原理
引言 刚开始使用HybridDB的用户,有个问的比较多的问题:如何快速做数据“合并”(Merge)?所谓“合并”,就是把数据新版本更新到HybridDB中。如果数据已经存在,则将它们替换为新版本;如果不存在,将它们插入数据库中。一般是离线的做这种数据合并,例如每天一次批量把数据更新到HybridDB中。也有客户需要实时的更新,即做到分钟级甚至秒级延迟。这里我们介绍一下HybridDB中数据合并的
3007 0
|
关系型数据库 索引
TokuDB · 捉虫动态 · MRR 导致查询失败
问题背景 最近有用户在使用 TokuDB 时,遇到了一个查询报错的问题,这里给大家分享下。 具体的报错信息是这样的: mysql&gt; select * from t2 where uid &gt; 1 limit 10; ERROR 1030 (HY000): Got error 1 from storage engine 表结构如下: CREATE TABLE `t2` ( `
2214 0
|
SQL 缓存 关系型数据库
MySQL · 特性分析 · 执行计划缓存设计与实现
Plan Cache背景知识 一条SQL语句输入到MySQL服务器后,一般要经历:词法语法解析(parse),优化(optimize),生成执行计划(plan)和执行(execute)的过程。词法语法分析,优化以及生成执行计划,这三个阶段的主要输出是SQL语句的执行计划(plan),当SQL语句存在多种执行计划的时候,优化器会从这许多的执行计划中挑选出一个它认为最优的(通常是占用系统资源最少的,
2264 0
|
索引
TokuDB · 引擎特性 · REPLACE 语句优化
背景 MySQL 在标准 SQL 外,会扩展一些好用的语法,本文关注的 REPLACE 和 INSERT IGNORE 就属于这类。这 2 个语法都是对 INSERT 的扩展,语义是向表中插入数据,不同之处在于遇到 PK 或者 UK 冲突时的处理: INSERT:报 duplicate key 的错误,数据不插入; REPLACE:删除掉老冲突记录,插入新的记录; INSERT IG
1476 0
下一篇
无影云桌面