PostgreSQL并行HashJoin解读

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核8GB 50GB
简介: PostgreSQL并行HashJoin解读

PostgreSQl并行查询简介

并行查询是 PostgreSQL 的一个大特性,对于单一sql查询,从传统单一进程,提升到多个进程协同完成。由于并行查询使用了更多的cpu/mem/io资源,提高了任务的执行效率。

并行查询设计的难点是需要协调多个worker进程间的同步:

1)在barrier点上多个进程需要全部前进到这个点上,比如:build inner hash table,必须等该状态完成了,所有的worker才能往下走;

2)在达到barrier点之前,多个进程的工作量尽量按照‘能者多劳’,执行快的进程多做一些工作量,比如:scan,build hash table,probe等;

3)此外,实现上要避免死锁,避免内存分配碎片,处理进程出错;

本文主要讲述并行查询中最复杂的功能:paralle hash join。先从单进程的hashjoin逐步过渡到多进程并行的hashjoin。

并行hashjoin是从9.6就开始演进,但是只支持outer table的并行;11版本支持outer和inner都并行执行;

并行hashjoin的执行计划

一个例子

select count(*)
  from lineitem
  join orders on l_orderkey = o_orderkey
 where o_totalprice > 5.00;

10版本的并行hash join计划

Finalize Aggregate
   ->  Gather
         Workers Planned: 2
         ->  Partial Aggregate
               ->  Hash Join
                     Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                     ->  Parallel Seq Scan on lineitem
                     ->  Hash
                           ->  Seq Scan on orders
                                 Filter: (o_totalprice > 5.00)

过程是:

1)每个worker扫描全部的inner table;

2)在各自的私有内存中build一个hash表;

3)每个worker并行的扫描outer table,并行执行join的probe;

11版本的并行hash join计划

Finalize Aggregate
   ->  Gather
         Workers Planned: 2
         ->  Partial Aggregate
               ->  Parallel Hash Join
                     Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
                     ->  Parallel Seq Scan on lineitem
                     ->  Parallel Hash
                           ->  Parallel Seq Scan on orders
                                 Filter: (o_totalprice > 5.00)

过程是:

1)每个worker并行扫描部分inner table(涉及parallel seq scan);

2)在共享内存中并行的build一个hash表;

3)每个worker并行的扫描outer table,并行执行join的probe;

hashjoin的设计

单进程hashjoin的时间线:

image.png

1)先扫描inner table;

2)建立hash表;

3)在扫描outer table;

平均总的执行时间 = inner plan(扫描+build) + outer plan

outer表并发执行的时间线

在Postgres10中,每个worker进程都各自在自己的私有内存中建立inner表的hash,然后并发的扫描outer表,并执行join。

下图是3个worker并发执行的时间线。

如前面所说,每个worker完整的执行inner plan,在各自的私有内存build hash table,outer table是并行的扫描,每个worker扫描一部分,并执行join,这样所有worker的结果集的并集就是最终的结果集。

image.png

只对outer plan进行了并行化。该方法适用于nner table很小的时候。它的问题是:

1)仅仅对子系统进行优化;

2)每个worker内都有一份inner table的拷贝,浪费了内存;

阿姆达尔定律指出:对系统中的一个子系统优化改进,受限于可使用该优化方法的时间占比。说白了是顶了天了优化的效果最多是把该被优化的子系统降到接近0。

但是该定律的重要性是说明了:收益递减。要把一个子系统优化从80%优化到40%付出的代价是变多的。

上图中,worker2执行的比较慢,总的执行时间可能取决于worker2,但是也可能worker1和worker3在执行outer plan进行并行的join时适当的多干些活。

平均总的执行时间 = iner plan + (outer plan / worker_num)

inner和outer都并行执行的时间线

该方案是Posrgres11中的并行hashjoin算法。

inner plan也被多个worker并行执行,在共享内存中进程安全的build出一个共享的hash table。

注意点是:在进行join之前,要等待所有的worker都把inner plan执行完成,也就是要build出一份完成的hash table。因此图中的虚线是所有worker进程的barrier点,只有等所有的进程都达到了虚线,才能继续下面的步骤。

这是通过pg的barrier机制实现。

image.png

在build hash tabl阶段,worker2执行的慢,该阶段的执行速度也不一定取决于最慢的worker,同理,worker1和worker3可能会多干些活。但是一定要满足在barrier处等待所有的woker都完成build阶段;

平均总的执行时间 = (inner plan + outer plan) / worker_num

从中可以看出,Postgres社区对并行执行方案的演进路径:现在Postgres10版本并行化了outer plan,稳定后,再在Postgres11版本并行化inner plan。

multi-batch机制

不论是并行的hashjoin还是单进程的hash join都要解决inner table在内存装不的问题。

解决方法是:

1)建立多个batch;

2)对inner table进行分片,分片的临时数据写入相应batch下的小文件;

3)在逐个batch的build hash table和join;

单进程multi-batch

image.png

1)扫描inner table:

a) 对属于batch-0的tuple,直接在内存中构建hash table,和上面讲的单个batch相同;

b)对不属于batch-0的tuple,写入相应batch的inner tuple文件;

2)单进程扫描outer table:直接写入相应batch的outer tuple文件;

3)单进程依次遍历每个batch;

4)把属于该batch的inner tuple文件加载到内存中(batch-0除外,因为它已经在内存中了);

5)在该batch内,执行单进程的join的逻辑;

并行化multi-batch+只并行outer

image.png

每个worker在扫描inner table时做2件事情:

1)扫描inner table:

a) 对属于batch-0的tuple,直接在内存中构建hash table,和上面讲的单个batch相同;

b)对不属于batch-0的tuple,写入相应batch的inner tuple文件;

2) 并行扫描outer table:直接写入相应batch的outer tuple文件;

3)依次遍历每个batch;

4)把属于该batch的inner tuple文件加载到内存中(batch-0除外,因为它已经在内存中了);

5)在该batch内,执行单进程的join的逻辑;

为了避免多个worker同时往一个batch写文件加锁的问题,每个batch给每个worker都分配一个inner文件和outer文件。

并行化multi-batch+同时并行outer和inner

image.png

1)并行扫描并行扫描inner table,属于batch-0的tuple直接在内存中构建一个shared hash table,不属于的写入对应batch的inner tuple文件;

2)并行扫描outer table,写入对应batch的outer_tuples文件;

3)并行的对batch-0执行join;

4)woker1和worker2先完成batch-0的join任务后,分别领取batch1和batch2的join;

5)worker3执行batch3的join;

6)worker1和worker2并行的执行batch4,也是build一个shared hash table,同样需要barrier;

7)worker3只能领取最后的batch5;

8)当worker1和worker2完成了对batch4的join后,会attach到batch5上,3个worker并行的执行;

对于batch-5:

1)开始只有worker3处理;

2)后面worker-1和worker-2也加入进来,3个进程一起处理;

attach的细节:先对batch5进行处理的worker3可能build完hash table,也可能没有。因此内部维护了一个简单的状态机:

1)如果没有build 完成,这个时候worker1和worker2加入进来(也可能只有worker1进来了)那么在加进来的worker并行的build hash table,在join之前必须barrier;

2)如果已经build完成了,那么无需barrier;

3)实现的关键在barrier的相关模块中,每个worker加进来执行attach时,barrier中维护计数,在需要等待的地方判断计数是否归零,没有看到归零的进程等待,直到计数为0;

总结并行hash join的效果

在内存能装下inner table的场景

image.png

在内存装不下inner table的场景

image.png

可以看到,该方案最大程度的进行并发,’能者多劳‘,不让进程闲着。动态的分配扫描任务,既可以batch内部并行执行,也可以batch之间并行执行。

并行hash join的实现

hash join 的 plan node

image.png

在hash join中关键数据结构是HashJoinState和HashState。

HashJoinState对应的exec函数是ExecHashJoinImpl负责维护驱动整个join的过程:

1)驱动inner plan;

2)驱动outer plan;

3)probe;

HashState的exec函数是MultiExecParallelHash,负责并行的build inner表。

进程模型

image.png

1)处理psql连接的进程做为leader进程;

2)leader进程负责估算大小并初始化DSM;

3)把用户sql的plan tree和其他参数序列化到DSM中;

4)Postgres提供的bgworker机制来启动n个worker:在全局的BackgroundWorkerlist上添加一些node,然后给PostMaster进程发信号,由PostMaster进程来启动worker,参数是DSM的handler;

5)每个worker都attach到DSM,并从ParallerWorkerMain函数开始,反序列plan tree;

6)每个worker执行plan tree,具体进入到parallel-aware的算子;

内存管理

image.png

PostgreSQL是多进程的模型,进程间通过share memory来通信。pg在启动时申请一大片固定的share memory,在这片内存上分配大概50多个结构体以及buffer。每当有新连接进来时就fork一个新的进程,这个进程和main进程共享这些内存。

基于多进程共享内存的编程范式:

1)首先由leader进程分配出共享内存(比如:mmap),返回handle指针;

2)每个worker进程创建私有的xxxDesc结构体;

3)worker进程attach上来,xxxDesc指向handle,注册一些回调函数,refcnt++等;

dsm

对于并行查询,多个worker间需要通信,但是需要的内存量是不确定的。pg提供动态的创建共享内存机制(dsm)给并行查询分配内存。

并行查询的leader进程负责创建dsm,follower进程attach上去,attach动作是为了生成一个私有的dsm描述符,并把dsm中的refcnt加1。

dsa

dsm机制是通过系统调用(linux平台是mmap)来分配出来的内存,不支持小内存分配;而且如果在dsm上分配几个有关联关系的的小对象是有问题,比如:

strct A {

B *b;

}

dsm在每个进程上的地址空间是不同,指针b的地址是创建该dsm的进程锁看到的地址,为了解决这两个问题,提供dsa机制:

1)在dsm上分配小块内存;

2)通过dsa_pointer描述结构体之间的引用关系;dsa_pointer本质上是一个相对hanle指针的偏移量;

batch内部的内存

image.png

一个batch的构成:

1)ParallelHashJoinBatch结构,管理该batch的基础信息,比如:bucket的个数;

2)两个SharedTuplestore,一个inner table,一个outer table;

3)一个大数组做为hash的表头,长度是bucket个数;

4)一个chunk组成的list,每个chunk的大小是32KB,存储一个个的hashtuple,每个hashtuple都有一个头部,在hash表中存的是这个hashtuple的dsa_pointer;

在插入bucket时,通过对buckets[bucketno]的head进行cas,实现线程安全的hash inert。

for (;;)
    {
        tuple->next.shared = dsa_pointer_atomic_read(head);
        if (dsa_pointer_atomic_compare_exchange(head,
                                                &tuple->next.shared,
                                                tuple_shared))
            break;
    }

并行的build inner hash table

image.png

1)每个worker并行的扫描inner table;

2)把tuple写入到batch中对应的inner tule文件里,每个worker都有一个文件;

3)对于attach上去的batch则是直接写入内存;

这样图是不精确的,worker1和worker2都应该attach到batch-0上:一边扫描一遍插入batch-0的内存,其他的batch则是写入文件,这样总的内存使用量就是一个bath了,因为batch的引入本来就是为了解决内存不足的问题。

并行hashjoin的状态机

image.png

实现并行hashjoin算法的函数是在ExecHashJoinImpl里,这个函数同时实现了单进程版本的hashjoin和并行版本的 hashjon算法。

BUILD_HASHTABLE状态:

1) 在MultiExecParallelHash函数中;

2)并行的build inner hash table,写入inner tuple文件;

HJ_BUILD_HASHING_OUTER状态:

1)这个状态是在build inner之后,所有worker从barrier上detach下来,自动++phase得来的;

2)所有的woker并行扫描;

3)写入对应的batch的outer_tuples文件;

4)到这一步,除了batch0,inner tuple文件和outer tuple文件已经就位;

HJ_NEED_NEW_BATCH状态:

1)ExecParallelHashJoinNewBatch函数;

2)每个worker选择一个batch,直到所有batch完成:

a)给这个batch在dsa上分配bucket(batch0除外);

b)等待所有woker完成分配内存;

c)把该batch上的inner文件中的tuple加载该batch的hashtable中;

d) 选定一个batch开始处理,其他的batch可能还没有开始处理:

HJ_NEED_NEW_OUTER状态:

1)每个worker从自己负责的batch中读取outer tuple文件;

2)当自己的batch中所有outer tuple文件处理完毕后,再次回到HJ_NEED_NEW_BATCH,挑选下一个batch处理;

HJ_SCAN_BUCKET状态:

1)执行hash查找;

2)当执行完成后,回到HJ_NEED_NEW_OUTER状态,处理下一个outer tuple;

HJ_FILL_OUTER_TUPLES状态:

外连接,需要填充所有的outer,因此需要回到OUTER

HJ_FILL_INNER_TUPLES状态:

内连接,需要填充所有的inner,因此需要回到NEW_BATCH;

并行的关键是:每次一个worker处理完一个batch后,都回到HJ_NEED_NEW_BATCH去领取下一个batch,如果当前只剩下一个batch了,那么所有的workerattach上去并行的处理。

总结

PostgreSQL的并行hashjoin的演进,体现了复杂feature的迭代过程。多个worker进程通过共享内存‘争抢’一些工作量来处理,‘能者多劳’处理快的worker不会因为得不到任务而处于idle的状态。

并行查询需要高改造10来个算子,使得这些算子感知多进程之间通信,可想代码量是很大的。

可以考虑使用类似gp的motion也许会有意外的收获,只不过gp中节点间是通过interconnect通信,这里通过共享内存通信。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
消息中间件 存储 关系型数据库
PostgreSQL技术大讲堂 - 第33讲:并行查询管理
PostgreSQL从小白到专家,技术大讲堂 - 第33讲:并行查询管理
630 1
|
关系型数据库 大数据 PostgreSQL
PostgreSQL16-新特性-并行聚合
PostgreSQL16-新特性-并行聚合
314 0
|
SQL 分布式计算 并行计算
PostgreSQL 并行计算解说 之10 - parallel 自定义并行函数(UDF)
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan parallel index scan
1785 0
|
SQL 大数据 关系型数据库
PostgreSQL 并行查询概述
大数据时代,人们使用数据库系统处理的数据量越来越大,请求越来越复杂,对数据库系统的大数据处理能力和混合负载能力提出更高的要求。PostgreSQL 作为世界上最先进的开源数据库,在大数据处理方面做了很多工作,如并行和分区。
6803 1
PostgreSQL 并行查询概述
|
SQL 分布式计算 并行计算
PostgreSQL 并行计算解说 之9 - parallel 自定义并行聚合
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan parallel index scan
674 0
|
SQL 分布式计算 并行计算
PostgreSQL 并行计算解说 之25 - parallel FDW scan (并行访问多个外部表) with parallel append (FDW must with IsForeignScanParallelSafe)
标签 PostgreSQL , cpu 并行 , smp 并行 , 并行计算 , gpu 并行 , 并行过程支持 背景 PostgreSQL 11 优化器已经支持了非常多场合的并行。简单估计,已支持27余种场景的并行计算。 parallel seq scan
434 0
|
并行计算 算法 关系型数据库
PostgreSQL 11 并行计算算法,参数,强制并行度设置
标签 PostgreSQL , 并行计算 背景 PostgreSQL 并行计算原理、应用参考: 《PostgreSQL 多场景 沙箱实验》 优化器并行计算的并行度计算方法 1、总worker进程数 postgres=# show ; max_worker_processes ---------------------- 128 (1 row) 2、所有会话,在同一时刻的QUERY,并行计算最大允许开启的WORKER数。
5450 0
|
存储 SQL 关系型数据库
PostgreSQL 多维、图像 欧式距离、向量距离、向量相似 查询优化 - cube,imgsmlr - 压缩、分段、异步并行
标签 PostgreSQL , 多维 , 图像 , cube , imgsmlr , 压缩 , 分段 , 异步并行 , 欧式距离 背景 向量相似搜索越来越流行,PostgreSQL imgsmlr,cube插件分别应用于向量类型的存储,以及向量距离排序输出搜索。
2905 0
|
SQL 关系型数据库 网络安全
PostgreSQL 大版本升级方法之一 - 不落地并行导出导入
标签 PostgreSQL , 大版本升级 , rds_dbsync , pg_dump , pg_restore 背景 尽量快的大版本升级的方法。 一、9.4以下版本,使用pg_dump并行导出,pg_restore并行导入,迁移 (导出使用源版本pg_dump,导入使用目标版本pg_restore。
2346 0
|
SQL 关系型数据库 测试技术
PostgreSQL 11 相似图像搜索插件 imgsmlr 性能测试与优化 2 - 单机分区表 (dblink 异步调用并行) (4亿图像)
标签 PostgreSQL , imgsmlr , GiST , 图像特征值 , 小波转换 背景 续 《PostgreSQL 11 相似图像搜索插件 imgsmlr 性能测试 1 - 单机单表 (4亿图像)》 使用分区表+dblink异步接口并行调用。
1605 0

热门文章

最新文章

推荐镜像

更多