这篇paper介绍了TUM的内存数据库系统HyPer中使用的,基于小块数据(morsel)来驱动的并行查询执行框架。用来解决many-cores(NUMA)环境下,分析型查询的scalability问题。
背景
现代cpu的体系架构下,core在不断增多,memory的容量也在持续增大,因此单点的memory controller逐渐出现了性能的瓶颈,为了解决这个问题引入了decentralized的多controller结构,也就是NUMA,但由于跨NUMA socket的非均匀访问特性,导致即使单机内也显现出了集群的特点:数据的访问性能取决于访问线程和目标内存是否存在于同一socker内。
传统的Volcano执行框架采用了一种优美而简单的模型,使得各个算子之间充分解耦,算子完全不感知并行执行的存在,在planning期间会决定计划的并行度,并为其分配固定的线程数,各个算子的执行和并行无关,只有exchange operator负责并行的数据交互,各个pipeline之间,并不共享数据,而是分发数据。
而HyPer思路则完全不同,使用了一种自适应的数据驱动的调度执行方案:系统使用固定数量的线程池,各个query的执行过程中的数据,被切分为细粒度的单元(morsel),然后结合对其进行处理的operator pipeline,封装为task,交给worker thread pool去执行。
通过动态的调度策略,尽量保证数据的本地化,这样每个query的并行度是可以动态调整的,所有worker执行所有query的tasks,使吞吐量最大化,并且结合调度策略可以实现load banlance,没有动态的thread创建/销毁,可以做到NUMA-aware,也就是尽可能倾向于NUMA-local的执行,worker尽可能只处理本socket内的局部数据,除非其发生空闲,从而最小化跨socket的数据访问。
总体架构
从全局角度,存在一个dispatcher来保存各个query传递过来的任务(pipeline job),每个任务是一个query的一个subplan,对应着底层要被处理的数据(在各个NUMA node memory中),有一个全局的线程池,主动向dispatcher请求task来执行。
dispatcher根据调度策略,分配给其某个pipeline job中的NUMA-local的(morsel数据+job code),整体作为一个task,交给thread执行,thread执行完成后再继续请求任务,此外如果完成了一个pipeline job后,中间结果会进行完全的物化,来等待下一个pipeline job的处理,而下一个job在处理这些输入数据时仍然会做全局的“逻辑上”动态的morsel切分,保证数据仍然均匀被处理。
上图中不同颜色代表不同的NUMA-node内的cores和memory,一个thread请求task时,尽量分配给其local morsel的task,当不再有task可执行时,就从其他node中steal一个继续处理,避免idle worker thread,保证负载均衡。
对于一个pipeline job,当有一组threads去执行它时,这种细粒度的morsel,可以使各个worker争抢任务,类似PolarDB中parallel scan的机制,让workload均衡,减少data skew的影响。
对于任一个query,其执行过程中就可以基于调度策略动态调整dop,而且由于morsel很小,这种调整也会比较平滑。
从整体来看,所有query交错的执行,保证了整体的吞吐量,而且可以集成不同的调度策略(基于优先级..)。
Dispatcher
dispatcher本身是一个逻辑的概念,并不存在一个物理线程,本质就是一个全局的pipeline job队列(无锁),当worker向其要task时,根据其所在NUMA node,分配对应node的job,并动态”切分“出一个本地的morsel数据,封装为task交给worker执行。
dispatcher并没有一个独立的全局线程,因为可能成为contention点,还会占用其他worker thread的执行资源,而是把调度逻辑实现在了wokrer thread的请求处理中。
Worker thread
Worker thread在系统初始化时就创建了固定的数量,不会再修改,每个worker thread会绑定特定的core,worker thread拿到一个task后(多数是local morsel,steal时是remote morsel)开始处理,将结果物化到local storage area中,或者是写入全局的share structure中(hash table…),worker的调度时机就是在一个morsel处理完成时。
全局shared data structure是以interleaved方式分配在多个socket之间,由kernel来控制具体分配策略。
单个query
每个query在由query compiler将plan拆分为若干了plan segment,每个subplan是最大化的pipeline operators集合,直到遇到pipeline breaker,然后将各个pipeline operators进行CodeGen,生成machine code来保证高效执行。
每个query会生成一个QEPobject,负责管理pipeline segment之间的依赖关系,并将segment转换为pipeline job,在一个pipeline job执行完成时,会触发QEPobject的被动状态机,选择下一个可以执行的pipeline job,传递给dispatcher,也就是挂到全局的job队列中。
每个pipeline输入是一个单元的morsel数据,输出的结果也要物化在NUMA本地的存储area中,由后续pipeline处理。
并行化算子
除了这种全新的调度执行框架,HyPer中还实现了各个算子的高效并行化。
hash join
Hash join是多个线程并行build,并写入到一个全局shared hash table中,这个过程分为2个步骤。
- 各个线程读取numa-local数据并进行filtering,结果物化到本地的临时存储中
- 第1步完成后整个hash table的大小可以精确得知,因此可以采用更加高效的lock-free hash table结构,避免了动态grow的开销,然后各个thread并发的把每个record的pointer,插入到hash table中
lock-free hash table 使用了一种early filtering的优化,也就是在每个bucket list的header ptr中,加入一个tag,通过查看这个tag就可以知道是否有能命中的data item(具体原理没细讲,但应该类似bloom filter)
这个tag和list pointer集成到一个64bit的单元中,这样每次插入元素时,对header pointer+tag的修改,就可以通过一个CAS操作原子性的完成,如果失败了则重试即可,实现了高效的并发insert,算法如下:
完成build后,probe就可以流水线的进行了,而且可以跨越多个hash table,如下图中,R的一条记录可以先probe S,再probe T从而完成join。
对于hash table/storage area,HyPer使用了large page(2MB)的方式,这样可以减小page table大小,尽量放入cache中,此外由于page少了page fault也会大大减少,减少TLB miss的次数。具体的分配使用了mmap,由kernel来管理内存策略。
table scan
为了实现morsel-driven,需要将data 均匀分布在各个NUMA node中,最简单的就是round-robin。也可以根据query情况,按照join列进行hash分布,这样实现co-located join,尽量让数据保持本地化处理。(当然还是有可能有steal的)
Note: 即使没有hash分布,也不影响morsel-driven这种NUMA-aware的特性,本地的数据仍然是尽量在本地处理的。
group by + aggregation
分组聚集总体上采用了local aggregation -> repartition -> parallel aggregation的策略。
每个worker thread先做numa-local的局部聚集,然后将结果放入一个本地的partition结构中,后续会把这些paritition进行分发,各个worker thread接收到对应的partitions,做二次的hash聚集。
Sort
对于main-memory数据库系统,研究 表明hash的执行方式比sort会更加高效,因此HyPer中只有Order by/TopN这样的算子使用了排序操作。
其策略是 local order by -> 并行mergesort。
首先各个worker thread 在本地进行order by或者维护一个top k的heap,然后在本地排序后,根据等距key的原则记录一些local separators,然后将所有local separators汇总+排序,并计算出global separators(具体算法没细讲。。)
基于global separators,各个local run可以找到对应的点,然后各自分发到目标thread,做并行的merge sort。
总结
HyPer的性能数据是非常亮眼的:
对于TPC-H 100G,没有任何索引(只有主键)的情况下,所有查询都在3s以内执行完成,是当时最快的single-server执行结果。
总结下其基本思路,这些是其他database系统也可以借鉴的:
- fine-grained 动态调度
- NUMA-aware尽量保证本地化处理
- 线程间的高效synchronization
- full operator parallelism,各个算子都有不错的并行执行策略