Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the Many-Core Age 论文解读

本文涉及的产品
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云数据库 RDS SQL Server,基础系列 2核4GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 这篇paper介绍了TUM的内存数据库系统HyPer中使用的,基于小块数据(morsel)来驱动的并行查询执行框架。用来解决many-cores(NUMA)环境下,分析型查询的scalability问题。

这篇paper介绍了TUM的内存数据库系统HyPer中使用的,基于小块数据(morsel)来驱动的并行查询执行框架。用来解决many-cores(NUMA)环境下,分析型查询的scalability问题。

背景

现代cpu的体系架构下,core在不断增多,memory的容量也在持续增大,因此单点的memory controller逐渐出现了性能的瓶颈,为了解决这个问题引入了decentralized的多controller结构,也就是NUMA,但由于跨NUMA socket的非均匀访问特性,导致即使单机内也显现出了集群的特点:数据的访问性能取决于访问线程和目标内存是否存在于同一socker内。

传统的Volcano执行框架采用了一种优美而简单的模型,使得各个算子之间充分解耦,算子完全不感知并行执行的存在,在planning期间会决定计划的并行度,并为其分配固定的线程数,各个算子的执行和并行无关,只有exchange operator负责并行的数据交互,各个pipeline之间,并不共享数据,而是分发数据。

而HyPer思路则完全不同,使用了一种自适应的数据驱动的调度执行方案:系统使用固定数量的线程池,各个query的执行过程中的数据,被切分为细粒度的单元(morsel),然后结合对其进行处理的operator pipeline,封装为task,交给worker thread pool去执行。

通过动态的调度策略,尽量保证数据的本地化,这样每个query的并行度是可以动态调整的,所有worker执行所有query的tasks,使吞吐量最大化,并且结合调度策略可以实现load banlance,没有动态的thread创建/销毁,可以做到NUMA-aware,也就是尽可能倾向于NUMA-local的执行,worker尽可能只处理本socket内的局部数据,除非其发生空闲,从而最小化跨socket的数据访问。

总体架构

从全局角度,存在一个dispatcher来保存各个query传递过来的任务(pipeline job),每个任务是一个query的一个subplan,对应着底层要被处理的数据(在各个NUMA node memory中),有一个全局的线程池,主动向dispatcher请求task来执行。

dispatcher根据调度策略,分配给其某个pipeline job中的NUMA-local的(morsel数据+job code),整体作为一个task,交给thread执行,thread执行完成后再继续请求任务,此外如果完成了一个pipeline job后,中间结果会进行完全的物化,来等待下一个pipeline job的处理,而下一个job在处理这些输入数据时仍然会做全局的“逻辑上”动态的morsel切分,保证数据仍然均匀被处理。

image.png

上图中不同颜色代表不同的NUMA-node内的cores和memory,一个thread请求task时,尽量分配给其local morsel的task,当不再有task可执行时,就从其他node中steal一个继续处理,避免idle worker thread,保证负载均衡。

对于一个pipeline job,当有一组threads去执行它时,这种细粒度的morsel,可以使各个worker争抢任务,类似PolarDB中parallel scan的机制,让workload均衡,减少data skew的影响。

对于任一个query,其执行过程中就可以基于调度策略动态调整dop,而且由于morsel很小,这种调整也会比较平滑。

从整体来看,所有query交错的执行,保证了整体的吞吐量,而且可以集成不同的调度策略(基于优先级..)。

Dispatcher

dispatcher本身是一个逻辑的概念,并不存在一个物理线程,本质就是一个全局的pipeline job队列(无锁),当worker向其要task时,根据其所在NUMA node,分配对应node的job,并动态”切分“出一个本地的morsel数据,封装为task交给worker执行。

dispatcher并没有一个独立的全局线程,因为可能成为contention点,还会占用其他worker thread的执行资源,而是把调度逻辑实现在了wokrer thread的请求处理中。

Worker thread

Worker thread在系统初始化时就创建了固定的数量,不会再修改,每个worker thread会绑定特定的core,worker thread拿到一个task后(多数是local morsel,steal时是remote morsel)开始处理,将结果物化到local storage area中,或者是写入全局的share structure中(hash table…),worker的调度时机就是在一个morsel处理完成时。

全局shared data structure是以interleaved方式分配在多个socket之间,由kernel来控制具体分配策略。

单个query

每个query在由query compiler将plan拆分为若干了plan segment,每个subplan是最大化的pipeline operators集合,直到遇到pipeline breaker,然后将各个pipeline operators进行CodeGen,生成machine code来保证高效执行。

每个query会生成一个QEPobject,负责管理pipeline segment之间的依赖关系,并将segment转换为pipeline job,在一个pipeline job执行完成时,会触发QEPobject的被动状态机,选择下一个可以执行的pipeline job,传递给dispatcher,也就是挂到全局的job队列中。

每个pipeline输入是一个单元的morsel数据,输出的结果也要物化在NUMA本地的存储area中,由后续pipeline处理。

并行化算子

除了这种全新的调度执行框架,HyPer中还实现了各个算子的高效并行化。

hash join

Hash join是多个线程并行build,并写入到一个全局shared hash table中,这个过程分为2个步骤。

  1. 各个线程读取numa-local数据并进行filtering,结果物化到本地的临时存储中
  2. 第1步完成后整个hash table的大小可以精确得知,因此可以采用更加高效的lock-free hash table结构,避免了动态grow的开销,然后各个thread并发的把每个record的pointer,插入到hash table中

lock-free hash table 使用了一种early filtering的优化,也就是在每个bucket list的header ptr中,加入一个tag,通过查看这个tag就可以知道是否有能命中的data item(具体原理没细讲,但应该类似bloom filter)

image.png

这个tag和list pointer集成到一个64bit的单元中,这样每次插入元素时,对header pointer+tag的修改,就可以通过一个CAS操作原子性的完成,如果失败了则重试即可,实现了高效的并发insert,算法如下:

image.png

完成build后,probe就可以流水线的进行了,而且可以跨越多个hash table,如下图中,R的一条记录可以先probe S,再probe T从而完成join。

image.png

对于hash table/storage area,HyPer使用了large page(2MB)的方式,这样可以减小page table大小,尽量放入cache中,此外由于page少了page fault也会大大减少,减少TLB miss的次数。具体的分配使用了mmap,由kernel来管理内存策略。

table scan

为了实现morsel-driven,需要将data 均匀分布在各个NUMA node中,最简单的就是round-robin。也可以根据query情况,按照join列进行hash分布,这样实现co-located join,尽量让数据保持本地化处理。(当然还是有可能有steal的)

Note: 即使没有hash分布,也不影响morsel-driven这种NUMA-aware的特性,本地的数据仍然是尽量在本地处理的。

group by + aggregation

分组聚集总体上采用了local aggregation -> repartition -> parallel aggregation的策略。

image.png

每个worker thread先做numa-local的局部聚集,然后将结果放入一个本地的partition结构中,后续会把这些paritition进行分发,各个worker thread接收到对应的partitions,做二次的hash聚集。

Sort

对于main-memory数据库系统,研究 表明hash的执行方式比sort会更加高效,因此HyPer中只有Order by/TopN这样的算子使用了排序操作。

其策略是 local order by -> 并行mergesort。

首先各个worker thread 在本地进行order by或者维护一个top k的heap,然后在本地排序后,根据等距key的原则记录一些local separators,然后将所有local separators汇总+排序,并计算出global separators(具体算法没细讲。。)

基于global separators,各个local run可以找到对应的点,然后各自分发到目标thread,做并行的merge sort。

image.png

总结

HyPer的性能数据是非常亮眼的:

image.png

对于TPC-H 100G,没有任何索引(只有主键)的情况下,所有查询都在3s以内执行完成,是当时最快的single-server执行结果。

总结下其基本思路,这些是其他database系统也可以借鉴的:

  • fine-grained 动态调度
  • NUMA-aware尽量保证本地化处理
  • 线程间的高效synchronization
  • full operator parallelism,各个算子都有不错的并行执行策略
目录
相关文章
|
3月前
|
算法 数据挖掘
文献解读-Consistency and reproducibility of large panel next-generation sequencing: Multi-laboratory assessment of somatic mutation detection on reference materials with mismatch repair and proofreading deficiency
Consistency and reproducibility of large panel next-generation sequencing: Multi-laboratory assessment of somatic mutation detection on reference materials with mismatch repair and proofreading deficiency,大panel二代测序的一致性和重复性:对具有错配修复和校对缺陷的参考物质进行体细胞突变检测的多实验室评估
32 6
文献解读-Consistency and reproducibility of large panel next-generation sequencing: Multi-laboratory assessment of somatic mutation detection on reference materials with mismatch repair and proofreading deficiency
|
4月前
|
机器学习/深度学习 算法
【文献学习】Channel Estimation Method Based on Transformer in High Dynamic Environment
一种基于CNN和Transformer的信道估计方法,用于在高度动态环境中跟踪信道变化特征,并通过实验结果展示了其相比传统方法的性能提升。
65 0
|
数据挖掘
MUSIED: A Benchmark for Event Detection from Multi-Source Heterogeneous Informal Texts 论文解读
事件检测(ED)从非结构化文本中识别和分类事件触发词,作为信息抽取的基本任务。尽管在过去几年中取得了显著进展
73 0
|
机器学习/深度学习 人工智能 自然语言处理
OneEE: A One-Stage Framework for Fast Overlapping and Nested Event Extraction 论文解读
事件抽取(EE)是信息抽取的基本任务,旨在从非结构化文本中抽取结构化事件信息。大多数先前的工作集中于抽取平面事件,而忽略了重叠或嵌套的事件。
108 0
|
机器学习/深度学习 人工智能 自然语言处理
One SPRING to Rule Them Both Symmetric AMR Semantic Parsing and Generation without Complex Pipeline
在文本到AMR解析中,当前最先进的语义解析器集成了几个不同模块或组件的繁琐管道,并利用图重新分类,即在训练集的基础上开发的一组特定内容的启发式方法。
138 0
|
机器学习/深度学习 自然语言处理 索引
GTEE-DYNPREF: Dynamic Prefix-Tuning for Generative Template-based Event Extraction 论文解读
我们以基于模板的条件生成的生成方式考虑事件抽取。尽管将事件抽取任务转换为带有提示的序列生成问题的趋势正在上升,但这些基于生成的方法存在两个重大挑战
151 0
|
算法 数据可视化 机器人
Object SLAM: An Object SLAM Framework for Association, Mapping, and High-Level Tasks 论文解读
Object SLAM: An Object SLAM Framework for Association, Mapping, and High-Level Tasks 论文解读
97 0
|
机器学习/深度学习 自然语言处理 数据可视化
M2E2: Cross-media Structured Common Space for Multimedia Event Extraction 论文解读
我们介绍了一个新的任务,多媒体事件抽取(M2E2),旨在从多媒体文档中抽取事件及其参数。我们开发了第一个基准测试
116 0
|
算法 数据处理
Volcano - An Extensible and Parallel Query Evaluation System 论文解读
前面写了一些关于优化器的文章,现在开个小差,写一些执行器的paper介绍,从这篇开始。 这篇是Graefe的Volcano Project的执行器框架,其概念已被广泛接受和使用,也就是我们最为熟悉的Volcano iterator的执行框架,关于volcano/cascades的优化器介绍
753 0
PAT (Advanced Level) Practice - 1129 Recommendation System(25 分)
PAT (Advanced Level) Practice - 1129 Recommendation System(25 分)
109 0