part结构
我们知道,在clickhouse中,MergeTree表由一个个part组成。每个part对应一个目录,该目录下有两类文件:元数据文件,数据文件和projection part目录(如果该表创建了projection的话)
$ ll ./20210321_310_310_0 total 36 -rw-r----- 1 root root 28 Feb 9 14:26 primary.idx -rw-r----- 1 root root 4 Feb 9 14:26 partition.dat -rw-r----- 1 root root 4 Feb 9 14:26 minmax_day.idx -rw-r----- 1 root root 10 Feb 9 14:26 default_compression_codec.txt -rw-r----- 1 root root 1648 Feb 9 14:26 data.mrk3 -rw-r----- 1 root root 2110 Feb 9 14:26 data.bin -rw-r----- 1 root root 1 Feb 9 14:26 count.txt -rw-r----- 1 root root 790 Feb 9 14:26 columns.txt -rw-r----- 1 root root 252 Feb 9 14:26 checksums.txt
其中元数据文件包括:
- partition.dat: 记录本part所在分区的值
- primary.idx: 主键索引文件,记录每个Granule起始行的主键值,需配合.mrk3文件使用
- count.txt: 记录本part的数据行数
- checksums.txt: 记录本part下所有文件的size和checksum
- minmax_XX.idx: 分区索引或二级minmax索引文件,记录对应字段的min值和max值
- default_compression_codec.txt: 记录该part默认的压缩编码,如ZTSD, LZ4等
- *.mrk, *.mrk2, *.mrk3: 不同版本的mark文件,它记录着每个Granule的起始行数、在对应的数据文件中的偏移位置和解压后的Block中的偏移位置。
- uuid.txt: 记录本part的唯一id。仅在assign_part_uuids = true时才会出现。
- ttl.txt: 记录本part的过期时间
其中数据文件包括:
- XX.bin。part有三种类型,Wide, Compact, InMemory。当part类型为Wide时,每个字段对应一个bin文件和mark文件。当part类型为Compact时,part目录下仅有一个全局的data.bin和data.mrk3文件。当part类型为InMemory时,没有part目录,只有一个全局的wal.bin文件,因此InMemory Part对启动性能的影响很小,不在我们考虑范围之内。
启动流程
clickhouse-server启动之后:
- 首先加载系统库(包括system/information_schema/INFORMATION_SCHEMA)的元数据,然后attach系统库下的表。
- 然后调用loadMetadata加载除了系统库之外的database
- 最后创建HTTP、TCP等server,提供对外查询
系统库中表的数量有限,主要是第二步比较耗时
try { auto & database_catalog = DatabaseCatalog::instance(); /// We load temporary database first, because projections need it. database_catalog.initializeAndLoadTemporaryDatabase(); loadMetadataSystem(global_context); /// After attaching system databases we can initialize system log. global_context->initializeSystemLogs(); global_context->setSystemZooKeeperLogAfterInitializationIfNeeded(); /// After the system database is created, attach virtual system tables (in addition to query_log and part_log) attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper); attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA)); attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)); /// Firstly remove partially dropped databases, to avoid race with MaterializedMySQLSyncThread, /// that may execute DROP before loadMarkedAsDroppedTables() in background, /// and so loadMarkedAsDroppedTables() will find it and try to add, and UUID will overlap. database_catalog.loadMarkedAsDroppedTables(); /// Then, load remaining databases loadMetadata(global_context, default_database); startupSystemTables(); database_catalog.loadDatabases(); /// After loading validate that default database exists database_catalog.assertDatabaseExists(default_database); } ... { attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics); { std::lock_guard lock(servers_lock); createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers); if (servers.empty()) throw Exception( "No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", ErrorCodes::NO_ELEMENTS_IN_CONFIG); } if (servers.empty()) throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
在loadMetadata
中,
- 首先遍历metadata目录,获取每个database的名称和元数据路径
- 对于每个database,加载对应的.sql文件,获取create database query并执行, 并将该database加入到catalog中
- 执行
TablesLoader::loadTables
,对每个database对象执行
loadStoredObjects
,即完成该database下每个table的加载。对于Engine为MergeTree Family的表来说,加载的主要工作就是读取所有part下的元数据文件到内存中。
- 执行
TablesLoader::startupTables
,对每个database对象执行startupTables
。对于非Replicated的MergeTree Family表,启动包括清理过期part、清理过期WAL、清理空part、启动后台disk move任务(如果该表配置了多disk的话)等工作。对于Replicated的MergeTree Family表,启动包括开启副本间同步、副本选主、启用副本、启动后台disk move等工作
void loadMetadata(ContextMutablePtr context, const String & default_database_name) { ... for (fs::directory_iterator it(path); it != dir_end; ++it) { if (it->is_symlink()) continue; const auto current_file = it->path().filename().string(); if (!it->is_directory()) { /// TODO: DETACH DATABASE PERMANENTLY ? if (fs::path(current_file).extension() == ".sql") { String db_name = fs::path(current_file).stem(); if (!isSystemOrInformationSchema(db_name)) databases.emplace(unescapeForFileName(db_name), fs::path(path) / db_name); } ... TablesLoader::Databases loaded_databases; for (const auto & [name, db_path] : databases) { loadDatabase(context, name, db_path, has_force_restore_data_flag); loaded_databases.insert({name, DatabaseCatalog::instance().getDatabase(name)}); } TablesLoader loader{context, std::move(loaded_databases), has_force_restore_data_flag, /* force_attach */ true}; loader.loadTables(); loader.startupTables(); ... }
在DatabaseOrdinary::loadStoredObjects
中
- 首先遍历database目录是下所有*.sql文件。对于每个文件,读取其中的建表语句并解析成AST
- 对于该database目录下的每张表,生成对应的Storage对象,并attach到本database中。注意这个过程以表为最小粒度并发的,并发度等于cpu物理核数
void DatabaseOrdinary::loadStoredObjects( ContextMutablePtr local_context, bool force_restore, bool force_attach, bool skip_startup_tables) { /** Tables load faster if they are loaded in sorted (by name) order. * Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order, * which does not correspond to order tables creation and does not correspond to order of their location on disk. */ ParsedTablesMetadata metadata; loadTablesMetadata(local_context, metadata); ... /// Attach tables. for (const auto & name_with_path_and_query : metadata.parsed_tables) { const auto & name = name_with_path_and_query.first; const auto & path = name_with_path_and_query.second.path; const auto & ast = name_with_path_and_query.second.ast; const auto & create_query = ast->as<const ASTCreateQuery &>(); if (!create_query.is_dictionary) { pool.scheduleOrThrowOnError([&]() { loadTableFromMetadata(local_context, path, name, ast, force_restore); /// Messages, so that it's not boring to wait for the server to load for a long time. logAboutProgress(log, ++tables_processed, total_tables, watch); }); } } pool.wait(); ... }
那么MergeTree表加载元数据的操作发生在哪里呢?发生在Storage的构造函数中,参考StorageReplicatedMergeTree::StorageReplicatedMergeTree
-> MergeTreeData::loadDataParts
。在loadDataParts
中
- 首先遍历每一个disk下的表的数据目录,收集其中的part。注意这个过程是并行的,并行度为本表关联的disk数量。
- 对于InMemory part, 加载wal文件中的数据。
- 对于Wide或Compact part,记录part名到disk的映射关系。
- 然后加载Wide和Compact part。
loadDataPartsFromDisk
- 最后加载InMemory part.
loadDataPartsFromWAL
void MergeTreeData::loadDataParts(bool skip_sanity_checks) { ... for (const auto & disk_ptr : disks) { if (disk_ptr->isBroken()) continue; auto & disk_parts = disk_part_map[disk_ptr->getName()]; auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()]; pool.scheduleOrThrowOnError([&, disk_ptr]() { for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next()) { /// Skip temporary directories, file 'format_version.txt' and directory 'detached'. if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME || it->name() == MergeTreeData::DETACHED_DIR_NAME) continue; if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME)) disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr)); else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal) { std::unique_lock lock(wal_init_lock); if (write_ahead_log != nullptr) throw Exception( "There are multiple WAL files appeared in current storage policy. You need to resolve this manually", ErrorCodes::CORRUPTED_DATA); write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name()); for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext())) disk_wal_parts.push_back(std::move(part)); } ... if (num_parts > 0) loadDataPartsFromDisk( broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings); if (!parts_from_wal.empty()) loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock); }
一个MergeTree表的In memory是相当有限的,我们重点分析Wide和Compact part的加载,即loadDataPartsFromDisk
。每个part的加载流程如下:
- 加载uuid.txt文件
- 加载columns.txt文件
- 加载checksums.txt文件
- 加载任意mark文件。对于Compact part,只有一个mark文件:data.mrk3
- 从checksums中获取每个column和secondary index文件的大小。
- 加载primary.idx文件
- 加载ttl.txt文件
- 加载projection parts中的metadata文件。
- 根据checksums校验metadata和data文件的一致性
- 加载default_compression_codec.txt文件
注意MergeTree表加载part的过程是并行的,并发度默认为cpu硬件核数。当MergeTree表完成所有part的加载后,便可利用元数据构造的分区索引、主键索引、二级索引、Projection加速查询。这就是为什么clickhouse-server必须等待所有表完成加载后才可对外提供查询服务的原因。
为什么慢
从上面的流程可以看到,加载一个MergeTree part最多读取8个metadata 文件。假如clickhouse示例上有70w part, metadata文件的数量为560w。如果MergeTree还创建了一个或多个projection, metadata文件数量可能会超过1000w。对这么多文件的并发读取会瞬间把disk ioutil打满,disk read成为性能瓶颈。这就是为什么MergeTree启动慢的原因。
启动加速优化
优化方案
优化的核心在于减少disk io次数。因此我们决定引入RocksDB用于缓存metadata file。RocksDB启动之后会自身的LSM文件,文件个数远远少于metadata文件数,文件位置在磁盘上也更加集中。当所有part的metadata都缓存到RocksDB时,clickhouse启动时仅从RocksDB中就可获得所有part的metadata,总体来看disk io的次数大大减小了。
缓存结构
RocksDB中:
- key: 考虑到同一张表的part可能分布在不同的disk中,所以key由两部分组成,disk name和part相对disk的路径,如
default:store/b47/b47ee9c8-afa1-41ac-adfb-2e0dc42de819/6_7_7_0/primary.idx
- value: metadata文件的内容
缓存一致性
既然用RockDB作为缓存层,那么如何保证缓存层和持久化层也就是磁盘文件的一致性呢?主要分为两大类场景:
- 读meta:当需要读取某个part的元数据时,首先从缓存层查询,如果缓存未命中, 则读取持久化层的结果,并将该结果更新到缓存层并返回结果。如果缓存命中则直接返回结果。该操作主要发生在clickhouse-server启动阶段
- 写meta:当需要更新某个part的元数据时,对缓存层和持久化层进行双写。该操作可能发生在插入数据、删除分区、后台数据合并/移动、ALTER TABLE等场景下。
那么是否需要考虑缓存的并发问题呢? 一个cache key唯一对应一个metadata文件,而clickhouse MergeTree的设计已经保证了不可能有多写或读写并发的情况出现。因此在设计缓存层时无需考虑这个问题。
如何检测不一致
增加了一层缓存层,那么