MergeTree启动原理

简介: MergeTree启动原理

part结构


我们知道,在clickhouse中,MergeTree表由一个个part组成。每个part对应一个目录,该目录下有两类文件:元数据文件,数据文件和projection part目录(如果该表创建了projection的话)

$ ll ./20210321_310_310_0 
total 36
-rw-r----- 1 root root   28 Feb  9 14:26 primary.idx
-rw-r----- 1 root root    4 Feb  9 14:26 partition.dat
-rw-r----- 1 root root    4 Feb  9 14:26 minmax_day.idx
-rw-r----- 1 root root   10 Feb  9 14:26 default_compression_codec.txt
-rw-r----- 1 root root 1648 Feb  9 14:26 data.mrk3
-rw-r----- 1 root root 2110 Feb  9 14:26 data.bin
-rw-r----- 1 root root    1 Feb  9 14:26 count.txt
-rw-r----- 1 root root  790 Feb  9 14:26 columns.txt
-rw-r----- 1 root root  252 Feb  9 14:26 checksums.txt

其中元数据文件包括:


  • partition.dat: 记录本part所在分区的值


  • primary.idx: 主键索引文件,记录每个Granule起始行的主键值,需配合.mrk3文件使用


  • count.txt: 记录本part的数据行数


  • checksums.txt: 记录本part下所有文件的size和checksum


  • minmax_XX.idx: 分区索引或二级minmax索引文件,记录对应字段的min值和max值
  • default_compression_codec.txt: 记录该part默认的压缩编码,如ZTSD, LZ4等


  • *.mrk, *.mrk2, *.mrk3: 不同版本的mark文件,它记录着每个Granule的起始行数、在对应的数据文件中的偏移位置和解压后的Block中的偏移位置。


  • uuid.txt: 记录本part的唯一id。仅在assign_part_uuids = true时才会出现。


  • ttl.txt: 记录本part的过期时间


其中数据文件包括:


  • XX.bin。part有三种类型,Wide, Compact, InMemory。当part类型为Wide时,每个字段对应一个bin文件和mark文件。当part类型为Compact时,part目录下仅有一个全局的data.bin和data.mrk3文件。当part类型为InMemory时,没有part目录,只有一个全局的wal.bin文件,因此InMemory Part对启动性能的影响很小,不在我们考虑范围之内。



启动流程


clickhouse-server启动之后:


  • 首先加载系统库(包括system/information_schema/INFORMATION_SCHEMA)的元数据,然后attach系统库下的表。


  • 然后调用loadMetadata加载除了系统库之外的database


  • 最后创建HTTP、TCP等server,提供对外查询


系统库中表的数量有限,主要是第二步比较耗时

try
    {
auto & database_catalog = DatabaseCatalog::instance();
/// We load temporary database first, because projections need it.
        database_catalog.initializeAndLoadTemporaryDatabase();
        loadMetadataSystem(global_context);
/// After attaching system databases we can initialize system log.
        global_context->initializeSystemLogs();
        global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
        attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper);
        attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA));
        attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
/// Firstly remove partially dropped databases, to avoid race with MaterializedMySQLSyncThread,
        /// that may execute DROP before loadMarkedAsDroppedTables() in background,
        /// and so loadMarkedAsDroppedTables() will find it and try to add, and UUID will overlap.
        database_catalog.loadMarkedAsDroppedTables();
/// Then, load remaining databases
        loadMetadata(global_context, default_database);
        startupSystemTables();
        database_catalog.loadDatabases();
/// After loading validate that default database exists
        database_catalog.assertDatabaseExists(default_database);
    }
  ...
    {
        attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics);
        {
std::lock_guard lock(servers_lock);
            createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers);
if (servers.empty())
throw Exception(
"No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
                    ErrorCodes::NO_ELEMENTS_IN_CONFIG);
        }
if (servers.empty())
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
                ErrorCodes::NO_ELEMENTS_IN_CONFIG);

loadMetadata中,


  • 首先遍历metadata目录,获取每个database的名称和元数据路径


  • 对于每个database,加载对应的.sql文件,获取create database query并执行, 并将该database加入到catalog中


  • 执行TablesLoader::loadTables,对每个database对象执行



  • loadStoredObjects,即完成该database下每个table的加载。对于Engine为MergeTree Family的表来说,加载的主要工作就是读取所有part下的元数据文件到内存中。


  • 执行TablesLoader::startupTables,对每个database对象执行startupTables。对于非Replicated的MergeTree Family表,启动包括清理过期part、清理过期WAL、清理空part、启动后台disk move任务(如果该表配置了多disk的话)等工作。对于Replicated的MergeTree Family表,启动包括开启副本间同步、副本选主、启用副本、启动后台disk move等工作


void loadMetadata(ContextMutablePtr context, const String & default_database_name)
{
    ...
for (fs::directory_iterator it(path); it != dir_end; ++it)
    {
if (it->is_symlink())
continue;
const auto current_file = it->path().filename().string();
if (!it->is_directory())
        {
/// TODO: DETACH DATABASE PERMANENTLY ?
            if (fs::path(current_file).extension() == ".sql")
            {
                String db_name = fs::path(current_file).stem();
if (!isSystemOrInformationSchema(db_name))
                    databases.emplace(unescapeForFileName(db_name), fs::path(path) / db_name);
            }
    ...
    TablesLoader::Databases loaded_databases;
for (const auto & [name, db_path] : databases)
    {
        loadDatabase(context, name, db_path, has_force_restore_data_flag);
        loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)});
    }
    TablesLoader loader{context, std::move(loaded_databases), has_force_restore_data_flag, /* force_attach */ true};
    loader.loadTables();
    loader.startupTables();
    ...
}

DatabaseOrdinary::loadStoredObjects


  • 首先遍历database目录是下所有*.sql文件。对于每个文件,读取其中的建表语句并解析成AST


  • 对于该database目录下的每张表,生成对应的Storage对象,并attach到本database中。注意这个过程以表为最小粒度并发的,并发度等于cpu物理核数
void DatabaseOrdinary::loadStoredObjects(
    ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables)
{
/** Tables load faster if they are loaded in sorted (by name) order.
      * Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
      *  which does not correspond to order tables creation and does not correspond to order of their location on disk.
      */
    ParsedTablesMetadata metadata;
    loadTablesMetadata(local_context, metadata);
    ...
/// Attach tables.
    for (const auto & name_with_path_and_query : metadata.parsed_tables)
    {
const auto & name = name_with_path_and_query.first;
const auto & path = name_with_path_and_query.second.path;
const auto & ast = name_with_path_and_query.second.ast;
const auto & create_query = ast->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
        {
            pool.scheduleOrThrowOnError([&]()
            {
                loadTableFromMetadata(local_context, path, name, ast, force_restore);
/// Messages, so that it's not boring to wait for the server to load for a long time.
                logAboutProgress(log, ++tables_processed, total_tables, watch);
            });
        }
    }
    pool.wait();
    ...
}

那么MergeTree表加载元数据的操作发生在哪里呢?发生在Storage的构造函数中,参考StorageReplicatedMergeTree::StorageReplicatedMergeTree -> MergeTreeData::loadDataParts。在loadDataParts


  • 首先遍历每一个disk下的表的数据目录,收集其中的part。注意这个过程是并行的,并行度为本表关联的disk数量。


  • 对于InMemory part, 加载wal文件中的数据。
  • 对于Wide或Compact part,记录part名到disk的映射关系。


  • 然后加载Wide和Compact part。loadDataPartsFromDisk
  • 最后加载InMemory part. loadDataPartsFromWAL
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
  ...
for (const auto & disk_ptr : disks)
    {
if (disk_ptr->isBroken())
continue;
auto & disk_parts = disk_part_map[disk_ptr->getName()];
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
        pool.scheduleOrThrowOnError([&, disk_ptr]()
        {
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
            {
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
                if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
                    disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
                {
std::unique_lock lock(wal_init_lock);
if (write_ahead_log != nullptr)
throw Exception(
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
                            ErrorCodes::CORRUPTED_DATA);
                    write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
                        disk_wal_parts.push_back(std::move(part));
                }
    ...
if (num_parts > 0)
        loadDataPartsFromDisk(
            broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);
if (!parts_from_wal.empty())
        loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock);
}

一个MergeTree表的In memory是相当有限的,我们重点分析Wide和Compact part的加载,即loadDataPartsFromDisk。每个part的加载流程如下:


  • 加载uuid.txt文件


  • 加载columns.txt文件


  • 加载checksums.txt文件


  • 加载任意mark文件。对于Compact part,只有一个mark文件:data.mrk3


  • 从checksums中获取每个column和secondary index文件的大小。


  • 加载primary.idx文件


  • 加载ttl.txt文件


  • 加载projection parts中的metadata文件。


  • 根据checksums校验metadata和data文件的一致性


  • 加载default_compression_codec.txt文件


注意MergeTree表加载part的过程是并行的,并发度默认为cpu硬件核数。当MergeTree表完成所有part的加载后,便可利用元数据构造的分区索引、主键索引、二级索引、Projection加速查询。这就是为什么clickhouse-server必须等待所有表完成加载后才可对外提供查询服务的原因。



为什么慢


从上面的流程可以看到,加载一个MergeTree part最多读取8个metadata 文件。假如clickhouse示例上有70w part, metadata文件的数量为560w。如果MergeTree还创建了一个或多个projection, metadata文件数量可能会超过1000w。对这么多文件的并发读取会瞬间把disk ioutil打满,disk read成为性能瓶颈。这就是为什么MergeTree启动慢的原因。



启动加速优化


优化方案


优化的核心在于减少disk io次数。因此我们决定引入RocksDB用于缓存metadata file。RocksDB启动之后会自身的LSM文件,文件个数远远少于metadata文件数,文件位置在磁盘上也更加集中。当所有part的metadata都缓存到RocksDB时,clickhouse启动时仅从RocksDB中就可获得所有part的metadata,总体来看disk io的次数大大减小了。


缓存结构


RocksDB中:


  • key: 考虑到同一张表的part可能分布在不同的disk中,所以key由两部分组成,disk name和part相对disk的路径,如default:store/b47/b47ee9c8-afa1-41ac-adfb-2e0dc42de819/6_7_7_0/primary.idx
  • value: metadata文件的内容


缓存一致性


既然用RockDB作为缓存层,那么如何保证缓存层和持久化层也就是磁盘文件的一致性呢?主要分为两大类场景:


  • 读meta:当需要读取某个part的元数据时,首先从缓存层查询,如果缓存未命中, 则读取持久化层的结果,并将该结果更新到缓存层并返回结果。如果缓存命中则直接返回结果。该操作主要发生在clickhouse-server启动阶段


  • 写meta:当需要更新某个part的元数据时,对缓存层和持久化层进行双写。该操作可能发生在插入数据、删除分区、后台数据合并/移动、ALTER TABLE等场景下。


那么是否需要考虑缓存的并发问题呢? 一个cache key唯一对应一个metadata文件,而clickhouse MergeTree的设计已经保证了不可能有多写或读写并发的情况出现。因此在设计缓存层时无需考虑这个问题。



如何检测不一致


增加了一层缓存层,那么


源码走读

相关文章
|
存储 分布式数据库 Hbase
HBase scan过程简析
HBase scan过程简析。 scan过程总体上是分层处理的,与存储上的组织方式一致,脉络比较清晰; 具体来说,就是region->store→hfile/memstore,分别都有对应的scanner实现进行数据读取; scan请求本身设置的条件,以及server和table层面的一些参数限制,会根据需要分布在不同层次的scanner中进行处理; 2.
2177 0
HBase scan过程简析
|
5月前
|
存储 SQL 消息中间件
ClickHouse(12)ClickHouse合并树MergeTree家族表引擎之AggregatingMergeTree详细解析
AggregatingMergeTree是ClickHouse的一种表引擎,它优化了MergeTree的合并逻辑,通过将相同主键(排序键)的行聚合为一行并存储聚合函数状态来减少行数。适用于增量数据聚合和物化视图。建表语法中涉及AggregateFunction和SimpleAggregateFunction类型。插入数据需使用带-State-的聚合函数,查询时使用GROUP BY和-Merge-。处理逻辑包括按排序键聚合、在合并分区时计算、以分区为单位聚合等。常用于物化视图配合普通MergeTree使用。查阅更多资料可访问相关链接。
242 4
|
5月前
|
存储 SQL 算法
ClickHouse(13)ClickHouse合并树MergeTree家族表引擎之CollapsingMergeTree详细解析
CollapsingMergeTree是ClickHouse的一种表引擎,它扩展了`MergeTree`,通过折叠行来优化存储和查询效率。当`Sign`列值为1和-1的成对行存在时,该引擎会异步删除除`Sign`外其他字段相同的行,只保留最新状态。建表语法中,`sign`列必须为`Int8`类型,用来标记状态(1)和撤销(-1)。写入时,应确保状态和撤销行的对应关系以保证正确折叠。查询时,可能需要使用聚合函数如`sum(Sign * x)`配合`GROUP BY`来处理折叠后的数据。使用`FINAL`修饰符可强制折叠,但效率较低。系列文章提供了更多关于ClickHouse及其表引擎的详细解析。
160 1
|
22天前
|
存储 算法 NoSQL
大数据-138 - ClickHouse 集群 表引擎详解3 - MergeTree 存储结构 数据标记 分区 索引 标记 压缩协同
大数据-138 - ClickHouse 集群 表引擎详解3 - MergeTree 存储结构 数据标记 分区 索引 标记 压缩协同
29 0
|
3月前
|
SQL 存储 关系型数据库
实时计算 Flink版产品使用问题之同步MySQL多张表的过程中,内存释放依赖于什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
传感器 存储 SQL
ClickHouse(15)ClickHouse合并树MergeTree家族表引擎之GraphiteMergeTree详细解析
GraphiteMergeTree是ClickHouse用于优化Graphite数据存储和汇总的表引擎,适合需要瘦身和高效查询Graphite数据的开发者。它基于MergeTree,减少存储空间并提升查询效率。创建表时需包括Path、Time、Value和Version列。配置涉及pattern、regexp、function和retention,用于指定聚合函数和数据保留规则。文章还提供了建表语句示例和相关资源链接。
82 1
|
5月前
|
存储 SQL 关系型数据库
ClickHouse(11)ClickHouse合并树MergeTree家族表引擎之SummingMergeTree详细解析
`SummingMergeTree`是`MergeTree`引擎的变种,它合并相同主键的行并计算数值列的总和,从而节省存储空间和加速查询。通常与`MergeTree`配合使用,存储聚合数据以避免数据丢失。创建`SummingMergeTree`表时,可选参数`columns`指定要汇总的数值列。未指定时,默认汇总所有非主键数值列。注意,聚合可能不完整,查询时需用`SUM`和`GROUP BY`。文章还介绍了建表语法、数据处理规则以及对嵌套数据结构和`AggregateFunction`列的处理。查阅更多ClickHouse相关内容可访问相关链接。
191 5
|
5月前
|
存储 SQL 算法
ClickHouse(14)ClickHouse合并树MergeTree家族表引擎之VersionedCollapsingMergeTree详细解析
VersionedCollapsingMergeTree是ClickHouse的一种优化引擎,扩展了MergeTree,支持多线程异步插入和高效的数据折叠。它通过Sign和Version列处理对象状态的变化,Sign表示行的状态(正向或撤销),Version追踪状态版本。引擎自动删除旧状态,减少存储占用。在查询时,需注意可能需使用GROUP BY和聚合函数确保数据折叠,因为ClickHouse不保证查询结果已折叠。文章还提供了建表语法、使用示例和相关资源链接。
127 0
|
6月前
|
存储 SQL 关系型数据库
ClickHouse(09)ClickHouse合并树MergeTree家族表引擎之MergeTree详细解析
ClickHouse的MergeTree系列引擎是其高性能大数据存储的核心,特别适合大量数据的快速插入。数据按主键排序,支持分区和数据副本,提供数据采样功能。建表时,通过`ENGINE = MergeTree()`指定引擎,`ORDER BY`指定排序键,可选`PARTITION BY`分区,`SAMPLE BY`进行采样。此外,MergeTree支持多种索引和设置,如`index_granularity`控制索引粒度。查询时,ClickHouse利用主键和索引来高效检索数据,尤其在使用等值或范围条件时。
56 0
|
6月前
|
SQL 消息中间件 关系型数据库
ClickHouse(10)ClickHouse合并树MergeTree家族表引擎之ReplacingMergeTree详细解析
`ReplacingMergeTree`是ClickHouse的一种表引擎,用于数据去重。与`MergeTree`不同,它在合并分区时删除重复行,但不保证无重复。去重基于`ORDER BY`列,在ver列未指定时保留最新行,否则保留ver值最大者。数据处理策略包括延迟合并导致的不确定性及按分区去重。`CREATE TABLE`语法中,`ReplacingMergeTree`需要指定可选的`ver`列。相关系列文章提供了更深入的解析。
174 0