Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the Many-Core Age 论文解读

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云原生数据库 PolarDB 分布式版,标准版 2核8GB
简介: 这篇paper介绍了TUM的内存数据库系统HyPer中使用的,基于小块数据(morsel)来驱动的并行查询执行框架。用来解决many-cores(NUMA)环境下,分析型查询的scalability问题。

这篇paper介绍了TUM的内存数据库系统HyPer中使用的,基于小块数据(morsel)来驱动的并行查询执行框架。用来解决many-cores(NUMA)环境下,分析型查询的scalability问题。

背景

现代cpu的体系架构下,core在不断增多,memory的容量也在持续增大,因此单点的memory controller逐渐出现了性能的瓶颈,为了解决这个问题引入了decentralized的多controller结构,也就是NUMA,但由于跨NUMA socket的非均匀访问特性,导致即使单机内也显现出了集群的特点:数据的访问性能取决于访问线程和目标内存是否存在于同一socker内。

传统的Volcano执行框架采用了一种优美而简单的模型,使得各个算子之间充分解耦,算子完全不感知并行执行的存在,在planning期间会决定计划的并行度,并为其分配固定的线程数,各个算子的执行和并行无关,只有exchange operator负责并行的数据交互,各个pipeline之间,并不共享数据,而是分发数据。

而HyPer思路则完全不同,使用了一种自适应的数据驱动的调度执行方案:系统使用固定数量的线程池,各个query的执行过程中的数据,被切分为细粒度的单元(morsel),然后结合对其进行处理的operator pipeline,封装为task,交给worker thread pool去执行。

通过动态的调度策略,尽量保证数据的本地化,这样每个query的并行度是可以动态调整的,所有worker执行所有query的tasks,使吞吐量最大化,并且结合调度策略可以实现load banlance,没有动态的thread创建/销毁,可以做到NUMA-aware,也就是尽可能倾向于NUMA-local的执行,worker尽可能只处理本socket内的局部数据,除非其发生空闲,从而最小化跨socket的数据访问。

总体架构

从全局角度,存在一个dispatcher来保存各个query传递过来的任务(pipeline job),每个任务是一个query的一个subplan,对应着底层要被处理的数据(在各个NUMA node memory中),有一个全局的线程池,主动向dispatcher请求task来执行。

dispatcher根据调度策略,分配给其某个pipeline job中的NUMA-local的(morsel数据+job code),整体作为一个task,交给thread执行,thread执行完成后再继续请求任务,此外如果完成了一个pipeline job后,中间结果会进行完全的物化,来等待下一个pipeline job的处理,而下一个job在处理这些输入数据时仍然会做全局的“逻辑上”动态的morsel切分,保证数据仍然均匀被处理。

image.png

上图中不同颜色代表不同的NUMA-node内的cores和memory,一个thread请求task时,尽量分配给其local morsel的task,当不再有task可执行时,就从其他node中steal一个继续处理,避免idle worker thread,保证负载均衡。

对于一个pipeline job,当有一组threads去执行它时,这种细粒度的morsel,可以使各个worker争抢任务,类似PolarDB中parallel scan的机制,让workload均衡,减少data skew的影响。

对于任一个query,其执行过程中就可以基于调度策略动态调整dop,而且由于morsel很小,这种调整也会比较平滑。

从整体来看,所有query交错的执行,保证了整体的吞吐量,而且可以集成不同的调度策略(基于优先级..)。

Dispatcher

dispatcher本身是一个逻辑的概念,并不存在一个物理线程,本质就是一个全局的pipeline job队列(无锁),当worker向其要task时,根据其所在NUMA node,分配对应node的job,并动态”切分“出一个本地的morsel数据,封装为task交给worker执行。

dispatcher并没有一个独立的全局线程,因为可能成为contention点,还会占用其他worker thread的执行资源,而是把调度逻辑实现在了wokrer thread的请求处理中。

Worker thread

Worker thread在系统初始化时就创建了固定的数量,不会再修改,每个worker thread会绑定特定的core,worker thread拿到一个task后(多数是local morsel,steal时是remote morsel)开始处理,将结果物化到local storage area中,或者是写入全局的share structure中(hash table…),worker的调度时机就是在一个morsel处理完成时。

全局shared data structure是以interleaved方式分配在多个socket之间,由kernel来控制具体分配策略。

单个query

每个query在由query compiler将plan拆分为若干了plan segment,每个subplan是最大化的pipeline operators集合,直到遇到pipeline breaker,然后将各个pipeline operators进行CodeGen,生成machine code来保证高效执行。

每个query会生成一个QEPobject,负责管理pipeline segment之间的依赖关系,并将segment转换为pipeline job,在一个pipeline job执行完成时,会触发QEPobject的被动状态机,选择下一个可以执行的pipeline job,传递给dispatcher,也就是挂到全局的job队列中。

每个pipeline输入是一个单元的morsel数据,输出的结果也要物化在NUMA本地的存储area中,由后续pipeline处理。

并行化算子

除了这种全新的调度执行框架,HyPer中还实现了各个算子的高效并行化。

hash join

Hash join是多个线程并行build,并写入到一个全局shared hash table中,这个过程分为2个步骤。

  1. 各个线程读取numa-local数据并进行filtering,结果物化到本地的临时存储中
  2. 第1步完成后整个hash table的大小可以精确得知,因此可以采用更加高效的lock-free hash table结构,避免了动态grow的开销,然后各个thread并发的把每个record的pointer,插入到hash table中

lock-free hash table 使用了一种early filtering的优化,也就是在每个bucket list的header ptr中,加入一个tag,通过查看这个tag就可以知道是否有能命中的data item(具体原理没细讲,但应该类似bloom filter)

image.png

这个tag和list pointer集成到一个64bit的单元中,这样每次插入元素时,对header pointer+tag的修改,就可以通过一个CAS操作原子性的完成,如果失败了则重试即可,实现了高效的并发insert,算法如下:

image.png

完成build后,probe就可以流水线的进行了,而且可以跨越多个hash table,如下图中,R的一条记录可以先probe S,再probe T从而完成join。

image.png

对于hash table/storage area,HyPer使用了large page(2MB)的方式,这样可以减小page table大小,尽量放入cache中,此外由于page少了page fault也会大大减少,减少TLB miss的次数。具体的分配使用了mmap,由kernel来管理内存策略。

table scan

为了实现morsel-driven,需要将data 均匀分布在各个NUMA node中,最简单的就是round-robin。也可以根据query情况,按照join列进行hash分布,这样实现co-located join,尽量让数据保持本地化处理。(当然还是有可能有steal的)

Note: 即使没有hash分布,也不影响morsel-driven这种NUMA-aware的特性,本地的数据仍然是尽量在本地处理的。

group by + aggregation

分组聚集总体上采用了local aggregation -> repartition -> parallel aggregation的策略。

image.png

每个worker thread先做numa-local的局部聚集,然后将结果放入一个本地的partition结构中,后续会把这些paritition进行分发,各个worker thread接收到对应的partitions,做二次的hash聚集。

Sort

对于main-memory数据库系统,研究 表明hash的执行方式比sort会更加高效,因此HyPer中只有Order by/TopN这样的算子使用了排序操作。

其策略是 local order by -> 并行mergesort。

首先各个worker thread 在本地进行order by或者维护一个top k的heap,然后在本地排序后,根据等距key的原则记录一些local separators,然后将所有local separators汇总+排序,并计算出global separators(具体算法没细讲。。)

基于global separators,各个local run可以找到对应的点,然后各自分发到目标thread,做并行的merge sort。

image.png

总结

HyPer的性能数据是非常亮眼的:

image.png

对于TPC-H 100G,没有任何索引(只有主键)的情况下,所有查询都在3s以内执行完成,是当时最快的single-server执行结果。

总结下其基本思路,这些是其他database系统也可以借鉴的:

  • fine-grained 动态调度
  • NUMA-aware尽量保证本地化处理
  • 线程间的高效synchronization
  • full operator parallelism,各个算子都有不错的并行执行策略
目录
相关文章
|
3月前
|
机器学习/深度学习 算法
【文献学习】Channel Estimation Method Based on Transformer in High Dynamic Environment
一种基于CNN和Transformer的信道估计方法,用于在高度动态环境中跟踪信道变化特征,并通过实验结果展示了其相比传统方法的性能提升。
57 0
|
Docker 容器
求助: 运行模型时报错module 'megatron_util.mpu' has no attribute 'get_model_parallel_rank'
运行ZhipuAI/Multilingual-GLM-Summarization-zh的官方代码范例时,报错AttributeError: MGLMTextSummarizationPipeline: module 'megatron_util.mpu' has no attribute 'get_model_parallel_rank' 环境是基于ModelScope官方docker镜像,尝试了各个版本结果都是一样的。
404 5
|
算法 数据可视化 机器人
Object SLAM: An Object SLAM Framework for Association, Mapping, and High-Level Tasks 论文解读
Object SLAM: An Object SLAM Framework for Association, Mapping, and High-Level Tasks 论文解读
84 0
|
机器学习/深度学习 人工智能 自然语言处理
OneEE: A One-Stage Framework for Fast Overlapping and Nested Event Extraction 论文解读
事件抽取(EE)是信息抽取的基本任务,旨在从非结构化文本中抽取结构化事件信息。大多数先前的工作集中于抽取平面事件,而忽略了重叠或嵌套的事件。
99 0
|
机器学习/深度学习 人工智能 自然语言处理
One SPRING to Rule Them Both Symmetric AMR Semantic Parsing and Generation without Complex Pipeline
在文本到AMR解析中,当前最先进的语义解析器集成了几个不同模块或组件的繁琐管道,并利用图重新分类,即在训练集的基础上开发的一组特定内容的启发式方法。
130 0
|
数据挖掘
MUSIED: A Benchmark for Event Detection from Multi-Source Heterogeneous Informal Texts 论文解读
事件检测(ED)从非结构化文本中识别和分类事件触发词,作为信息抽取的基本任务。尽管在过去几年中取得了显著进展
66 0
|
算法 数据处理
Volcano - An Extensible and Parallel Query Evaluation System 论文解读
前面写了一些关于优化器的文章,现在开个小差,写一些执行器的paper介绍,从这篇开始。 这篇是Graefe的Volcano Project的执行器框架,其概念已被广泛接受和使用,也就是我们最为熟悉的Volcano iterator的执行框架,关于volcano/cascades的优化器介绍
734 0
|
分布式计算 Apache Spark
《How to Integrate Spark MLlib and Apache Solr to Build Real-Time Entity Type Recognition System for Better Query Understanding》电子版地址
How to Integrate Spark MLlib and Apache Solr to Build Real-Time Entity Type Recognition System for Better Query Understanding
88 0
《How to Integrate Spark MLlib and Apache Solr to Build Real-Time Entity Type Recognition System for Better Query Understanding》电子版地址
|
机器学习/深度学习 算法 数据挖掘
Paper:He参数初始化之《Delving Deep into Rectifiers: Surpassing Human-Level Performance on ImageNet C》的翻译与解读
Paper:He参数初始化之《Delving Deep into Rectifiers: Surpassing Human-Level Performance on ImageNet Classification》的翻译与解读
|
SQL 监控 算法
Adaptive Execution of Compiled Queries 论文解读
本篇是TUM的内存数据库HyPer针对compile-based执行框架的改进。其中涉及到HyPer的动态编译和并行执行框架 动态编译文章的结尾提到了编译执行系统存在的2个问题,其中之一就是:不可控的编译时间。
498 0
Adaptive Execution of Compiled Queries 论文解读