ClickHouse内核分析-MergeTree的Merge和Mutation机制

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 作者:仁劼

5_6_3

注:以下分析基于开源 v19.15.2.2-stable 版本进行

引言

ClickHouse内核分析系列文章,继上一篇文章之后,这次我将为大家介绍MergeTree存储引擎的异步Merge和Mutation机制。建议读者先补充上一篇文章的基础知识,这样会比较容易理解。

MergeTree Mutation功能介绍

在上一篇系列文章中,我已经介绍过ClickHouse内核中的MergeTree存储一旦生成一个Data Part,这个Data Part就不可再更改了。所以从MergeTree存储内核层面,ClickHouse就不擅长做数据更新删除操作。但是绝大部分用户场景中,难免会出现需要手动订正、修复数据的场景。所以ClickHouse为用户设计了一套离线异步机制来支持低频的Mutation(改、删)操作。

Mutation命令执行

ALTER TABLE [db.]table DELETE WHERE filter_expr;
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;

ClickHouse的方言把Delete和Update操作也加入到了Alter Table的范畴中,它并不支持裸的Delete或者Update操作。当用户执行一个如上的Mutation操作获得返回时,ClickHouse内核其实只做了两件事情:

  1. 检查Mutation操作是否合法;
  2. 保存Mutation命令到存储文件中,唤醒一个异步处理merge和mutation的工作线程;

两者的主体逻辑分别在MutationsInterpreter::validate函数和StorageMergeTree::mutate函数中。

MutationsInterpreter::validate函数dry run一个异步Mutation执行的全过程,其中涉及到检查Mutation是否合法的判断原则是列值更新后记录的分区键和排序键不能有变化。因为分区键和排序键一旦发生变化,就会导致多个Data Part之间之间Merge逻辑的复杂化。剩余的Mutation执行过程可以看做是打开一个Data Part的BlockInputStream,在这个BlockStream的基础上封装删除操作的FilterBlockInputStream,再加上更新操作的ExpressionBlockInputStream,最后把数据通过BlockOutputStream写回到新的Data Part中。这里简单介绍一下ClickHouse的计算层实现,整体上它是一个火山模型的计算引擎,数据的各种filer、投影、join、agg都是通过BlockStrem抽象实现,在BlockStream中数据是按照Block进行传输处理的,而Block中的数据又是按照列模式组织,这使得ClickHouse在单列的计算上可以批量化并使用一些SIMD指令加速。BlockOutputStream承担了MergeTree Data Part列存写入和索引构建的全部工作,我会在后续的文章中会详细展开介绍ClickHouse计算层中各类功能的BlockStream,以及BlockOutputStream中构建索引的实现细节。

在Mutation命令的执行过程中,我们可以看到MergeTree会把整条Alter命令保存到存储文件夹下,然后创建一个MergeTreeMutationEntry对象保存到表的待修改状态中,最后唤醒一个异步处理merge和 mutation的工作线程。这里有一个关键的问题,因为Mutation的实际操作是异步发生的,在用户的Alter命令返回之后仍然会有数据写入,系统如何在异步订正的过程中排除掉Alter命令之后写入的数据呢?下一节中我会介绍MergeTree中Data Part的Version机制,它可以在Data Part级别解决上面的问题。但是因为ClickHouse写入链路的异步性,ClickHouse仍然无法保证Alter命令前Insert的每条纪录都被更新,只能确保Alter命令前已经存在的Data Part都会被订正,推荐用户只用来订正T+1场景的离线数据。

异步Merge&Mutation

Batch Insert和Mutation的数据一致性

struct MergeTreePartInfo
{
    String partition_id;
    Int64 min_block = 0;
    Int64 max_block = 0;
    UInt32 level = 0;
    Int64 mutation = 0;   /// If the part has been mutated or contains mutated parts, is equal to mutation version number.
    ...
    /// Get block number that can be used to determine which mutations we still need to apply to this part
    /// (all mutations with version greater than this block number).
    Int64 getDataVersion() const { return mutation ? mutation : min_block; }
    ...    
    bool operator<(const MergeTreePartInfo & rhs) const
    {
        return std::forward_as_tuple(partition_id, min_block, max_block, level, mutation)
            < std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level, rhs.mutation);
    }
}

在具体展开MergeTree的异步merge和mutation机制之前,先需要详细介绍一下MergeTree中对Data Part的管理方式。每个Data Part都有一个MergeTreePartInfo对象来保存它的meta信息,MergeTreePartInfo类的结构如上方代码所示。

  1. partition_id:表示所属的数据分区id。
  2. min_block、max_block:blockNumber是数据写入的一个版本信息,在上一篇系列文章中讲过,用户每次批量写入的数据都会生成一个Data Part。同一批写入的数据会被assign一个唯一的blockNumber,而这个blockNumber是在MergeTree表级别自增的。以及MergeTree在merge多个Data Part的时候会准守一个原则:在同一个数据分区下选择blockNumber区间相邻的若干个Data Parts进行合并,不会出现在同一个数据分区下Data Parts之间的blockNumber区间出现重合。所以Data Part中的min_block和max_block可以表示当前Data Part中数据的版本范围。
  3. level:表示Data Part所在的层级,新写入的Data Part都属于level 0。异步merge多个Data Part的过程中,系统会选择其中最大的level + 1作为新Data Part的level。这个信息可以一定程度反映出当前的Data Part是经历了多少次merge,但是不能准确表示,核心原因是MergeTree允许多个Data Part跨level进行merge的,为了最终一个数据分区内的数据merge成一个Data Part。
  4. mutation:和批量写入数据的版本号机制类似,MergeTree表的mutation命令也会被assign一个唯一的blockNumber作为版本号,这个版本号信息会保存在MergeTreeMutationEntry中,所以通过版本号信息我们可以看出数据写入和mutation命令之间的先后关系。Data Part中的这个mutation表示的则是当前这个Data Part已经完成的mutation操作,对每个Data Part来说它是按照mutation的blockNumber顺序依次完成所有的mutation。

解释了MergeTreePartInfo类中的信息含义,我们就可以理解上一节中遗留的异步Mutation如何选择哪些Data Parts需要订正的问题。系统可以通过MergeTreePartInfo::getDataVersion() { return mutation ? mutation : min_block }函数来判断当前Data Part是否需要进行某个mutation订正,比较两者version即可。

Merge&Mutation工作任务

ClickHouse内核中异步merge、mutation工作由统一的工作线程池来完成,这个线程池的大小用户可以通过参数background_pool_size进行设置。线程池中的线程Task总体逻辑如下,可以看出这个异步Task主要做三块工作:清理残留文件,merge Data Parts 和 mutate Data Part。

BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
    ....
    try
    {
        /// Clear old parts. It is unnecessary to do it more than once a second.
        if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
        {
            {
                /// TODO: Implement tryLockStructureForShare.
                auto lock_structure = lockStructureForShare(false, "");
                clearOldPartsFromFilesystem();
                clearOldTemporaryDirectories();
            }
            clearOldMutations();
        }
        ///TODO: read deduplicate option from table config
        if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
            return BackgroundProcessingPoolTaskResult::SUCCESS;
        if (tryMutatePart())
            return BackgroundProcessingPoolTaskResult::SUCCESS;
        return BackgroundProcessingPoolTaskResult::ERROR;
    }
   ...
}

需要清理的残留文件分为三部分:过期的Data Part,临时文件夹,过期的Mutation命令文件。如下方代码所示,MergeTree Data Part的生命周期包含多个阶段,创建一个Data Part的时候分两阶段执行Temporary->Precommitted->Commited,淘汰一个Data Part的时候也可能会先经过一个Outdated状态,再到Deleting状态。在Outdated状态下的Data Part仍然是可查的。异步Task在收集Outdated Data Part的时候会根据它的shared_ptr计数来判断当前是否有查询Context引用它,没有的话才进行删除。清理临时文件的逻辑较为简单,在数据文件夹中遍历搜索"tmp_"开头的文件夹,并判断创建时长是否超过temporary_directories_lifetime。临时文件夹主要在ClickHouse的两阶段提交过程可能造成残留。最后是清理数据已经全部订正完成的过期Mutation命令文件。

enum class State
    {
        Temporary,       /// the part is generating now, it is not in data_parts list
        PreCommitted,    /// the part is in data_parts, but not used for SELECTs
        Committed,       /// active data part, used by current and upcoming SELECTs
        Outdated,        /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
        Deleting,        /// not active data part with identity refcounter, it is deleting right now by a cleaner
        DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
    };

Merge逻辑

StorageMergeTree::merge函数是MergeTree异步Merge的核心逻辑,Data Part Merge的工作除了通过后台工作线程自动完成,用户还可以通过Optimize命令来手动触发。自动触发的场景中,系统会根据后台空闲线程的数据来启发式地决定本次Merge最大可以处理的数据量大小,max_bytes_to_merge_at_min_space_in_pool和max_bytes_to_merge_at_max_space_in_pool参数分别决定当空闲线程数最大时可处理的数据量上限以及只剩下一个空闲线程时可处理的数据量上限。当用户的写入量非常大的时候,应该适当调整工作线程池的大小和这两个参数。当用户手动触发merge时,系统则是根据disk剩余容量来决定可处理的最大数据量。

接下来介绍merge过程中最核心的逻辑:如何选择Data Parts进行merge?为了方便理解,这里先介绍一下Data Parts在MergeTree表引擎中的管理组织方式。上一节中提到的MergeTreePartInfo类中定义了比较操作符,MergeTree中的Data Parts就是按照这个比较操作符进行排序管理,排序键是(partition_id, min_block, max_block, level, mutation),索引管理结构如下图所示:
image.png
自动Merge的处理逻辑,首先是通过MergeTreeDataMergerMutator::selectPartsToMerge函数筛选出本次merge要合并的Data Parts,这个筛选过程需要准守三个原则:

  1. 跨数据分区的Data Part之间不能合并;
  2. 合并的Data Parts之间必须是相邻(在上图的有序组织关系中相邻),只能在排序链表中按段合并,不能跳跃;
  3. 合并的Data Parts之间的mutation状态必须是一致的,如果Data Part A 后续还需要完成mutation-23而Data Part B后续不需要完成mutation-23(数据全部是在mutation命令之后写入或者已经完成mutation-23),则A和B不能进行合并;

所以我们上面的Data Parts组织关系逻辑示意图中,相同颜色的Data Parts是可以合并的。虽然图中三个不同颜色的Data Parts序列都是可以合并的,但是合并工作线程每次只会挑选其中某个序列的一小段进行合并(如前文所述,系统会限定每次合并的Data Parts的数据量)。对于如何从这些序列中挑选出最佳的一段区间,ClickHouse抽象出了IMergeSelector类来实现不同的逻辑。当前主要有两种不同的merge策略:TTL数据淘汰策略和常规策略。

  • TTL数据淘汰策略:TTL数据淘汰策略启用的条件比较苛刻,只有当某个Data Part中存在数据生命周期超时需要淘汰,并且距离上次使用TTL策略达到一定时间间隔(默认1小时)。TTL策略也非常简单,首先挑选出TTL超时最严重Data Part,把这个Data Part所在的数据分区作为要进行数据合并的分区,最后会把这个TTL超时最严重的Data Part前后连续的所有存在TTL过期的Data Part都纳入到merge的范围中。这个策略简单直接,每次保证优先合并掉最老的存在过期数据的Data Part。
  • 常规策略:这里的选举策略就比较复杂,基本逻辑是枚举每个可能合并的Data Parts区间,通过启发式规则判断是否满足合并条件,再有启发式规则进行算分,选取分数最好的区间。启发式判断是否满足合并条件的算法在SimpleMergeSelector.cpp::allow函数中,其中的主要思想分为以下几点:系统默认对合并的区间有一个Data Parts数量的限制要求(每5个Data Parts才能合并);如果当前数据分区中的Data Parts出现了膨胀,则适量放宽合并数量限制要求(最低可以两两merge);如果参与合并的Data Parts中有很久之前写入的Data Part,也适量放宽合并数量限制要求,放宽的程度还取决于要合并的数据量。第一条规则是为了提升写入性能,避免在高速写入时两两merge这种低效的合并方式。最后一条规则则是为了保证随着数据分区中的Data Part老化,老龄化的数据分区内数据全部合并到一个Data Part。中间的规则更多是一种保护手段,防止因为写入和频繁mutation的极端情况下,Data Parts出现膨胀。启发式算法的策略则是优先选择IO开销最小的Data Parts区间完成合并,尽快合并掉小数据量的Data Parts是对在线查询最有利的方式,数据量很大的Data Parts已经有了很较好的数据压缩和索引效率,合并操作对查询带来的性价比较低。

Mutation逻辑

StorageMergeTree::tryMutatePart函数是MergeTree异步mutation的核心逻辑,主体逻辑如下。系统每次都只会订正一个Data Part,但是会聚合多个mutation任务批量完成,这点实现非常的棒。因为在用户真实业务场景中一次数据订正逻辑中可能会包含多个Mutation命令,把这多个mutation操作聚合到一起订正效率上就非常高。系统每次选择一个排序键最小的并且需要订正Data Part进行操作,本意上就是把数据从前往后进行依次订正。

Mutation功能是MergeTree表引擎最新推出一大功能,从我个人的角度看在实现完备度上还有一下两点需要去优化:

  1. mutation没有实时可见能力。我这里的实时可见并不是指在存储上立即原地更新,而是给用户提供一种途径可以立即看到数据订正后的最终视图确保订正无误。类比在使用CollapsingMergeTree、SummingMergeTree等高级MergeTree引擎时,数据还没有完全merge到一个Data Part之前,存储层并没有一个数据的最终视图。但是用户可以通过Final查询模式,在计算引擎层实时聚合出数据的最终视图。这个原理对mutation实时可见也同样适用,在实时查询中通过FilterBlockInputStream和ExpressionBlockInputStream完成用户的mutation操作,给用户提供一个最终视图。
  2. mutation和merge相互独立执行。看完本文前面的分析,大家应该也注意到了目前Data Part的merge和mutation是相互独立执行的,Data Part在同一时刻只能是在merge或者mutation操作中。对于MergeTree这种存储彻底Immutable的设计,数据频繁merge、mutation会引入巨大的IO负载。实时上merge和mutation操作是可以合并到一起去考虑的,这样可以省去数据一次读写盘的开销。对数据写入压力很大又有频繁mutation的场景,会有很大帮助。
for (const auto & part : getDataPartsVector())
        {
            ...
            size_t current_ast_elements = 0;
            for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
            {
                MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context);
                size_t commands_size = interpreter.evaluateCommandsSize();
                if (current_ast_elements + commands_size >= max_ast_elements)
                    break;
                current_ast_elements += commands_size;
                commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
            }
            auto new_part_info = part->info;
            new_part_info.mutation = current_mutations_by_version.rbegin()->first;
            future_part.parts.push_back(part);
            future_part.part_info = new_part_info;
            future_part.name = part->getNewName(new_part_info);
            tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
            break;
        }

最后在经过后台工作线程一轮merge和mutation操作之后,上一节中展示的MergeTree表引擎中的Data Parts可能发生的变化如下图所示,2020-05-10数据分区下的头两个Data Parts被merge到了一起,并且完成了Mutation 37和Mutation 39的数据订正,新产生的Data Part如红色所示:
image.png
Clickhouse产品链接:https://www.aliyun.com/product/clickhouse
223.png

ClickHouse内核分析系列文章:

希望通过内核分析系列文章,让大家更好地了解这款世界领先的列式存储分析型数据库。
图片.gif

目录
相关文章
|
6月前
|
存储 SQL 消息中间件
ClickHouse(12)ClickHouse合并树MergeTree家族表引擎之AggregatingMergeTree详细解析
AggregatingMergeTree是ClickHouse的一种表引擎,它优化了MergeTree的合并逻辑,通过将相同主键(排序键)的行聚合为一行并存储聚合函数状态来减少行数。适用于增量数据聚合和物化视图。建表语法中涉及AggregateFunction和SimpleAggregateFunction类型。插入数据需使用带-State-的聚合函数,查询时使用GROUP BY和-Merge-。处理逻辑包括按排序键聚合、在合并分区时计算、以分区为单位聚合等。常用于物化视图配合普通MergeTree使用。查阅更多资料可访问相关链接。
294 4
|
6月前
|
存储 SQL 算法
ClickHouse(13)ClickHouse合并树MergeTree家族表引擎之CollapsingMergeTree详细解析
CollapsingMergeTree是ClickHouse的一种表引擎,它扩展了`MergeTree`,通过折叠行来优化存储和查询效率。当`Sign`列值为1和-1的成对行存在时,该引擎会异步删除除`Sign`外其他字段相同的行,只保留最新状态。建表语法中,`sign`列必须为`Int8`类型,用来标记状态(1)和撤销(-1)。写入时,应确保状态和撤销行的对应关系以保证正确折叠。查询时,可能需要使用聚合函数如`sum(Sign * x)`配合`GROUP BY`来处理折叠后的数据。使用`FINAL`修饰符可强制折叠,但效率较低。系列文章提供了更多关于ClickHouse及其表引擎的详细解析。
201 1
|
6月前
|
传感器 存储 SQL
ClickHouse(15)ClickHouse合并树MergeTree家族表引擎之GraphiteMergeTree详细解析
GraphiteMergeTree是ClickHouse用于优化Graphite数据存储和汇总的表引擎,适合需要瘦身和高效查询Graphite数据的开发者。它基于MergeTree,减少存储空间并提升查询效率。创建表时需包括Path、Time、Value和Version列。配置涉及pattern、regexp、function和retention,用于指定聚合函数和数据保留规则。文章还提供了建表语句示例和相关资源链接。
92 1
|
6月前
|
存储 SQL 关系型数据库
ClickHouse(11)ClickHouse合并树MergeTree家族表引擎之SummingMergeTree详细解析
`SummingMergeTree`是`MergeTree`引擎的变种,它合并相同主键的行并计算数值列的总和,从而节省存储空间和加速查询。通常与`MergeTree`配合使用,存储聚合数据以避免数据丢失。创建`SummingMergeTree`表时,可选参数`columns`指定要汇总的数值列。未指定时,默认汇总所有非主键数值列。注意,聚合可能不完整,查询时需用`SUM`和`GROUP BY`。文章还介绍了建表语法、数据处理规则以及对嵌套数据结构和`AggregateFunction`列的处理。查阅更多ClickHouse相关内容可访问相关链接。
231 5
|
6月前
|
存储 SQL 算法
ClickHouse(14)ClickHouse合并树MergeTree家族表引擎之VersionedCollapsingMergeTree详细解析
VersionedCollapsingMergeTree是ClickHouse的一种优化引擎,扩展了MergeTree,支持多线程异步插入和高效的数据折叠。它通过Sign和Version列处理对象状态的变化,Sign表示行的状态(正向或撤销),Version追踪状态版本。引擎自动删除旧状态,减少存储占用。在查询时,需注意可能需使用GROUP BY和聚合函数确保数据折叠,因为ClickHouse不保证查询结果已折叠。文章还提供了建表语法、使用示例和相关资源链接。
163 0
|
7月前
|
存储 SQL 关系型数据库
ClickHouse(09)ClickHouse合并树MergeTree家族表引擎之MergeTree详细解析
ClickHouse的MergeTree系列引擎是其高性能大数据存储的核心,特别适合大量数据的快速插入。数据按主键排序,支持分区和数据副本,提供数据采样功能。建表时,通过`ENGINE = MergeTree()`指定引擎,`ORDER BY`指定排序键,可选`PARTITION BY`分区,`SAMPLE BY`进行采样。此外,MergeTree支持多种索引和设置,如`index_granularity`控制索引粒度。查询时,ClickHouse利用主键和索引来高效检索数据,尤其在使用等值或范围条件时。
69 0
|
7月前
|
SQL 消息中间件 关系型数据库
ClickHouse(10)ClickHouse合并树MergeTree家族表引擎之ReplacingMergeTree详细解析
`ReplacingMergeTree`是ClickHouse的一种表引擎,用于数据去重。与`MergeTree`不同,它在合并分区时删除重复行,但不保证无重复。去重基于`ORDER BY`列,在ver列未指定时保留最新行,否则保留ver值最大者。数据处理策略包括延迟合并导致的不确定性及按分区去重。`CREATE TABLE`语法中,`ReplacingMergeTree`需要指定可选的`ver`列。相关系列文章提供了更深入的解析。
198 0
|
7月前
|
Java 关系型数据库 MySQL
实时计算 Flink版产品使用合集之通过scan.incremental.snapshot.chunk.key-column参数配置来处理无主键表的全量同步,增量数据进不来的原因是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
GreenPlum Hash聚合简析
GreenPlum Hash聚合简析
113 0
|
SQL 存储 缓存
源码解读:semi join如何避免拉取大表数据?(一)
Hash join是解决复杂join的一个重要手段,但其无法避免拉取左右两端的数据到计算层进行计算,导致某些场景下执行效率不高。作为一种补充,bka join则可以利用OLTP数据库中的索引,通过join构造inner表的predicate命中表索引,在某些场景下有比较好的join效率。PolarDB-X是面向HTAP设计的分布式数据库,在复杂查询时也会重点考虑利用数据库的索引信息来提升join的查询效率,因此有了本文的semi bka join。
源码解读:semi join如何避免拉取大表数据?(一)