skin778 2020-05-21 1224浏览量
注:以下分析基于开源 v19.15.2.2-stable 版本进行
ClickHouse内核分析系列文章,继上一篇文章 MergeTree查询链路 之后,这次我将为大家介绍MergeTree存储引擎的异步Merge和Mutation机制。建议读者先补充上一篇文章的基础知识,这样会比较容易理解。
在上一篇系列文章中,我已经介绍过ClickHouse内核中的MergeTree存储一旦生成一个Data Part,这个Data Part就不可再更改了。所以从MergeTree存储内核层面,ClickHouse就不擅长做数据更新删除操作。但是绝大部分用户场景中,难免会出现需要手动订正、修复数据的场景。所以ClickHouse为用户设计了一套离线异步机制来支持低频的Mutation(改、删)操作。
ALTER TABLE [db.]table DELETE WHERE filter_expr;
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;
ClickHouse的方言把Delete和Update操作也加入到了Alter Table的范畴中,它并不支持裸的Delete或者Update操作。当用户执行一个如上的Mutation操作获得返回时,ClickHouse内核其实只做了两件事情:
两者的主体逻辑分别在MutationsInterpreter::validate函数和StorageMergeTree::mutate函数中。
MutationsInterpreter::validate函数dry run一个异步Mutation执行的全过程,其中涉及到检查Mutation是否合法的判断原则是列值更新后记录的分区键和排序键不能有变化。因为分区键和排序键一旦发生变化,就会导致多个Data Part之间之间Merge逻辑的复杂化。剩余的Mutation执行过程可以看做是打开一个Data Part的BlockInputStream,在这个BlockStream的基础上封装删除操作的FilterBlockInputStream,再加上更新操作的ExpressionBlockInputStream,最后把数据通过BlockOutputStream写回到新的Data Part中。这里简单介绍一下ClickHouse的计算层实现,整体上它是一个火山模型的计算引擎,数据的各种filer、投影、join、agg都是通过BlockStrem抽象实现,在BlockStream中数据是按照Block进行传输处理的,而Block中的数据又是按照列模式组织,这使得ClickHouse在单列的计算上可以批量化并使用一些SIMD指令加速。BlockOutputStream承担了MergeTree Data Part列存写入和索引构建的全部工作,我会在后续的文章中会详细展开介绍ClickHouse计算层中各类功能的BlockStream,以及BlockOutputStream中构建索引的实现细节。
在Mutation命令的执行过程中,我们可以看到MergeTree会把整条Alter命令保存到存储文件夹下,然后创建一个MergeTreeMutationEntry对象保存到表的待修改状态中,最后唤醒一个异步处理merge和 mutation的工作线程。这里有一个关键的问题,因为Mutation的实际操作是异步发生的,在用户的Alter命令返回之后仍然会有数据写入,系统如何在异步订正的过程中排除掉Alter命令之后写入的数据呢?下一节中我会介绍MergeTree中Data Part的Version机制,它可以在Data Part级别解决上面的问题。但是因为ClickHouse写入链路的异步性,ClickHouse仍然无法保证Alter命令前Insert的每条纪录都被更新,只能确保Alter命令前已经存在的Data Part都会被订正,推荐用户只用来订正T+1场景的离线数据。
struct MergeTreePartInfo
{
String partition_id;
Int64 min_block = 0;
Int64 max_block = 0;
UInt32 level = 0;
Int64 mutation = 0; /// If the part has been mutated or contains mutated parts, is equal to mutation version number.
...
/// Get block number that can be used to determine which mutations we still need to apply to this part
/// (all mutations with version greater than this block number).
Int64 getDataVersion() const { return mutation ? mutation : min_block; }
...
bool operator<(const MergeTreePartInfo & rhs) const
{
return std::forward_as_tuple(partition_id, min_block, max_block, level, mutation)
< std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level, rhs.mutation);
}
}
在具体展开MergeTree的异步merge和mutation机制之前,先需要详细介绍一下MergeTree中对Data Part的管理方式。每个Data Part都有一个MergeTreePartInfo对象来保存它的meta信息,MergeTreePartInfo类的结构如上方代码所示。
解释了MergeTreePartInfo类中的信息含义,我们就可以理解上一节中遗留的异步Mutation如何选择哪些Data Parts需要订正的问题。系统可以通过MergeTreePartInfo::getDataVersion() { return mutation ? mutation : min_block }函数来判断当前Data Part是否需要进行某个mutation订正,比较两者version即可。
ClickHouse内核中异步merge、mutation工作由统一的工作线程池来完成,这个线程池的大小用户可以通过参数background_pool_size进行设置。线程池中的线程Task总体逻辑如下,可以看出这个异步Task主要做三块工作:清理残留文件,merge Data Parts 和 mutate Data Part。
BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
....
try
{
/// Clear old parts. It is unnecessary to do it more than once a second.
if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
{
/// TODO: Implement tryLockStructureForShare.
auto lock_structure = lockStructureForShare(false, "");
clearOldPartsFromFilesystem();
clearOldTemporaryDirectories();
}
clearOldMutations();
}
///TODO: read deduplicate option from table config
if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
return BackgroundProcessingPoolTaskResult::SUCCESS;
if (tryMutatePart())
return BackgroundProcessingPoolTaskResult::SUCCESS;
return BackgroundProcessingPoolTaskResult::ERROR;
}
...
}
需要清理的残留文件分为三部分:过期的Data Part,临时文件夹,过期的Mutation命令文件。如下方代码所示,MergeTree Data Part的生命周期包含多个阶段,创建一个Data Part的时候分两阶段执行Temporary->Precommitted->Commited,淘汰一个Data Part的时候也可能会先经过一个Outdated状态,再到Deleting状态。在Outdated状态下的Data Part仍然是可查的。异步Task在收集Outdated Data Part的时候会根据它的shared_ptr计数来判断当前是否有查询Context引用它,没有的话才进行删除。清理临时文件的逻辑较为简单,在数据文件夹中遍历搜索"tmp_"开头的文件夹,并判断创建时长是否超过temporary_directories_lifetime。临时文件夹主要在ClickHouse的两阶段提交过程可能造成残留。最后是清理数据已经全部订正完成的过期Mutation命令文件。
enum class State
{
Temporary, /// the part is generating now, it is not in data_parts list
PreCommitted, /// the part is in data_parts, but not used for SELECTs
Committed, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
};
StorageMergeTree::merge函数是MergeTree异步Merge的核心逻辑,Data Part Merge的工作除了通过后台工作线程自动完成,用户还可以通过Optimize命令来手动触发。自动触发的场景中,系统会根据后台空闲线程的数据来启发式地决定本次Merge最大可以处理的数据量大小,max_bytes_to_merge_at_min_space_in_pool和max_bytes_to_merge_at_max_space_in_pool参数分别决定当空闲线程数最大时可处理的数据量上限以及只剩下一个空闲线程时可处理的数据量上限。当用户的写入量非常大的时候,应该适当调整工作线程池的大小和这两个参数。当用户手动触发merge时,系统则是根据disk剩余容量来决定可处理的最大数据量。
接下来介绍merge过程中最核心的逻辑:如何选择Data Parts进行merge?为了方便理解,这里先介绍一下Data Parts在MergeTree表引擎中的管理组织方式。上一节中提到的MergeTreePartInfo类中定义了比较操作符,MergeTree中的Data Parts就是按照这个比较操作符进行排序管理,排序键是(partition_id, min_block, max_block, level, mutation),索引管理结构如下图所示:
自动Merge的处理逻辑,首先是通过MergeTreeDataMergerMutator::selectPartsToMerge函数筛选出本次merge要合并的Data Parts,这个筛选过程需要准守三个原则:
所以我们上面的Data Parts组织关系逻辑示意图中,相同颜色的Data Parts是可以合并的。虽然图中三个不同颜色的Data Parts序列都是可以合并的,但是合并工作线程每次只会挑选其中某个序列的一小段进行合并(如前文所述,系统会限定每次合并的Data Parts的数据量)。对于如何从这些序列中挑选出最佳的一段区间,ClickHouse抽象出了IMergeSelector类来实现不同的逻辑。当前主要有两种不同的merge策略:TTL数据淘汰策略和常规策略。
StorageMergeTree::tryMutatePart函数是MergeTree异步mutation的核心逻辑,主体逻辑如下。系统每次都只会订正一个Data Part,但是会聚合多个mutation任务批量完成,这点实现非常的棒。因为在用户真实业务场景中一次数据订正逻辑中可能会包含多个Mutation命令,把这多个mutation操作聚合到一起订正效率上就非常高。系统每次选择一个排序键最小的并且需要订正Data Part进行操作,本意上就是把数据从前往后进行依次订正。
Mutation功能是MergeTree表引擎最新推出一大功能,从我个人的角度看在实现完备度上还有一下两点需要去优化:
for (const auto & part : getDataPartsVector())
{
...
size_t current_ast_elements = 0;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context);
size_t commands_size = interpreter.evaluateCommandsSize();
if (current_ast_elements + commands_size >= max_ast_elements)
break;
current_ast_elements += commands_size;
commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
}
auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first;
future_part.parts.push_back(part);
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
break;
}
最后在经过后台工作线程一轮merge和mutation操作之后,上一节中展示的MergeTree表引擎中的Data Parts可能发生的变化如下图所示,2020-05-10数据分区下的头两个Data Parts被merge到了一起,并且完成了Mutation 37和Mutation 39的数据订正,新产生的Data Part如红色所示:
Clickhouse产品链接:https://www.aliyun.com/product/clickhouse
ClickHouse内核分析系列文章:
MergeTree查询链路
希望通过内核分析系列文章,让大家更好地了解这款世界领先的列式存储分析型数据库。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
帮用户承担一切数据库风险,给您何止是安心!