PostgreSQL 10.0 preview 多核并行增强 - 并行hash join支持shared hashdata, 节约哈希表内存提高效率

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云原生数据库 PolarDB 分布式版,标准版 2核8GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介:

标签

PostgreSQL , 10.0 , 多核并行增强 , shared hash表 , hash join


背景

PostgreSQL 9.6支持哈希JOIN并行,但是每个worker进程都需要复制一份哈希表,所以会造成内存的浪费,小表无妨,但是大表的浪费是非常大的。

因此10.0做了一个改进,使用共享的哈希表。

Hi hackers,  

In PostgreSQL 9.6, hash joins can be parallelised under certain  
conditions, but a copy of the hash table is built in every  
participating backend.  That means that memory and CPU time are  
wasted.  In many cases, that's OK: if the hash table contents are  
small and cheap to compute, then we don't really care, we're just  
happy that the probing can be done in parallel.  But in cases where  
the hash table is large and/or expensive to build, we could do much  
better.  I am working on that problem.  

To recap the situation in 9.6, a hash join can appear below a Gather  
node and it looks much the same as a non-parallel hash join except  
that it has a partial outer plan:  

      ->  Hash Join  
            ->  <partial outer plan>  
            ->  Hash  
                  ->  <non-partial parallel-safe inner plan>  

A partial plan is one that has some kind of 'scatter' operation as its  
ultimate source of tuples.  Currently the only kind of scatter  
operation is a Parallel Seq Scan (but see also the Parallel Index Scan  
and Parallel Bitmap Scan proposals).  The scatter operation enables  
parallelism in all the executor nodes above it, as far as the  
enclosing 'gather' operation which must appear somewhere above it.  
Currently the only kind of gather operation is a Gather node (but see  
also the Gather Merge proposal which adds a new one).  

The inner plan is built from a non-partial parallel-safe path and will  
be run in every worker.  

Note that a Hash Join node in 9.6 isn't parallel-aware itself: it's  
not doing anything special at execution time to support parallelism.  
The planner has determined that correct partial results will be  
produced by this plan, but the executor nodes are blissfully unaware  
of parallelism.  

PROPOSED NEW PLAN VARIANTS  

Shortly I will post a patch which introduces two new hash join plan  
variants that are parallel-aware:  

1.  Parallel Hash Join with Shared Hash  

      ->  Parallel Hash Join  
            ->  <partial outer plan>  
            ->  Shared Hash  
                  ->  <non-partial parallel-safe inner plan>  

In this case, there is only one copy of the hash table and only one  
participant loads it.  The other participants wait patiently for one  
chosen backend to finish building the hash table, and then they all  
wake up and probe.  

Call the number of participants P, being the number of workers + 1  
(for the leader).  Compared to a non-shared hash plan, we avoid  
wasting CPU and IO resources running P copies of the inner plan in  
parallel (something that is not well captured in our costing model for  
parallel query today), and we can allow ourselves to use a hash table  
P times larger while sticking to the same overall space target of  
work_mem * P.  

2.  Parallel Hash Join with Parallel Shared Hash  

      ->  Parallel Hash Join  
            ->  <partial outer plan>  
            ->  Parallel Shared Hash  
                  ->  <partial inner plan>  

In this case, the inner plan is run in parallel by all participants.  
We have the advantages of a shared hash table as described above, and  
now we can also divide the work of running the inner plan and hashing  
the resulting tuples by P participants.  Note that Parallel Shared  
Hash is acting as a special kind of gather operation that is the  
counterpart to the scatter operation contained in the inner plan.  

PERFORMANCE  

So far I have been unable to measure any performance degradation  
compared with unpatched master for hash joins with non-shared hash.  
That's good because it means that I didn't slow existing plans down  
when I introduced a bunch of conditional branches to existing hash  
join code.  

Laptop testing shows greater than 2x speedups on several of the TPC-H  
queries with single batches, and no slowdowns.  I will post test  
numbers on big rig hardware in the coming weeks when I have the  
batching code in more complete and stable shape.  

IMPLEMENTATION  

I have taken the approach of extending the existing hash join  
algorithm, rather than introducing separate hash join executor nodes  
or a fundamentally different algorithm.  Here's a short description of  
what the patch does:  

1.  SHARED HASH TABLE  

To share data between participants, the patch uses two other patches I  
have proposed:  DSA areas[1], which provide a higher level interface  
to DSM segments to make programming with processes a little more like  
programming with threads, and in particular a per-parallel-query DSA  
area[2] that is made available for any executor node that needs some  
shared work space.  

The patch uses atomic operations to push tuples into the hash table  
buckets while building, rehashing and loading, and then the hash table  
is immutable during probing (except for match flags used to implement  
outer joins).  The existing memory chunk design is retained for dense  
allocation of tuples, which provides a convenient way to rehash the  
table when its size changes.  

2.  WORK COORDINATION  

To coordinate parallel work, this patch uses two other patches:  
barriers[3], to implement a 'barrier' or 'phaser' synchronisation  
primitive, and those in turn use the condition variables proposed by  
Robert Haas.  

Barriers provide a way for participants to break work up into phases  
that they unanimously agree to enter together, which is a basic  
requirement for parallelising hash joins.  It is not safe to insert  
into the hash table until exactly one participant has created it; it  
is not safe to probe the hash table until all participants have  
finished inserting into it; it is not safe to scan it for unmatched  
tuples until all participants have finished probing it; it is not safe  
to discard it and start loading the next batch until ... you get the  
idea.  You could also construct appropriate synchronisation using  
various other interlocking primitives or flow control systems, but  
fundamentally these wait points would exist at some level, and I think  
this way is quite clean and simple.  YMMV.  

If we had exactly W workers and the leader didn't participate, then we  
could use a simple simple pthread- or MPI-style barrier without an  
explicit notion of 'phase'.  We would simply take the existing hash  
join code, add the shared hash table, add barrier waits at various  
points and make sure that all participants always hit all of those  
points in the same order, and it should All Just Work.   But we have a  
variable party size and a dual-role leader process, and I want to  
highlight the specific problems that causes here because they increase  
the patch size significantly:  

Problem 1:  We don't know how many workers will actually start.  We  
know how many were planned, but at execution time we may have  
exhausted limits and actually get a smaller number.  So we can't use  
"static" barriers like the classic barriers in POSIX or MPI where the  
group size is known up front.  We need "dynamic" barriers with attach  
and detach operations.  As soon as you have varying party size you  
need some kind of explicit model of the current phase, so that a new  
participant can know what to do when it joins.  For that reason, this  
patch uses a phase number to track progress through the parallel hash  
join.  See MultiExecHash and ExecHashJoin which have switch statements  
allowing a newly joined participant to synchronise their own state  
machine and program counter with the phase.  

Problem 2:  One participant is not like the others: Gather may or may  
not decide to run its subplan directly if the worker processes aren't  
producing any tuples (and the proposed Gather Merge is the same).  The  
problem is that it also needs to consume tuples from the fixed-size  
queues of the regular workers.  A deadlock could arise if the leader's  
plan blocks waiting for other participants while another participant  
has filled its output queue and is waiting for the leader to consume.  
One way to avoid such deadlocks is to follow the rule that the leader  
should never wait for other participants if there is any possibility  
that they have emitted tuples.  The simplest way to do that would be  
to have shared hash plans refuse to run in the leader by returning  
NULL to signal the end of this partial tuple stream, but then we'd  
lose a CPU compared to non-shared hash plans.  The latest point the  
leader can exit while respecting that rule is at the end of probing  
the first batch.  That is the approach taken by the patch currently.  
See ExecHashCheckForEarlyExit for logic and discussion.  It would be  
better to be able to use the leader in later batches too, but as far  
as I can see that'd require changes that are out of scope for this  
patch.  One idea would be an executor protocol change allowing plans  
running in the leader to detach and yield, saying 'I have no further  
tuples right now, but I'm not finished; try again later', and then  
reattach when you call it back.  Clearly that sails close to  
asynchronous execution territory.  

Problem 3:  If the leader drops out after the first batch to solve  
problem 2, then it may leave behind batch files which must be  
processed by other participants.  I had originally planned to defer  
work on batch file sharing until a later iteration, thinking that it  
would be a nice performance improvement to redistribute work from  
uneven batch files, but it turns out to be necessary for correct  
results because of participants exiting early.  I am working on a very  
simple batch sharing system to start with...  Participants still  
generate their own batch files, and then new operations BufFileExport  
and BufFileImport are used to grant read-only access to the BufFile to  
other participants.  Each participant reads its own batch files  
entirely and then tries to read from every other participant's batch  
files until they are all exhausted, using a shared read head.  The  
per-tuple locking granularity, extra seeking and needless buffering in  
every backend on batch file reads aren't great, and I'm still figuring  
out temporary file cleanup/ownership semantics.  There may be an  
opportunity to make use of 'unified' BufFile concepts from Peter  
Geoghegan's work, or create some new reusable shared tuple spilling  
infrastructure.  

3.  COSTING  

For now, I have introduced a GUC called cpu_shared_tuple_cost which  
provides a straw-man model of the overhead of exchanging tuples via a  
shared hash table, and the extra process coordination required.  If  
it's zero then a non-shared hash plan (ie multiple copies) has the  
same cost as a shared hash plan, even though the non-shared hash plan  
wastefully runs P copies of the plan.  If cost represents runtime and  
and we assume perfectly spherical cows running without interference  
from each other, that makes some kind of sense, but it doesn't account  
for the wasted resources and contention caused by running the same  
plan in parallel.  I don't know what to do about that yet.  If  
cpu_shared_tuple_cost is a positive number, as it probably should be  
(more on that later), then shared hash tables look more expensive than  
non-shared ones, which is technically true (CPU cache sharing etc) but  
unhelpful because what you lose there you tend to gain by not running  
all those plans in parallel.  In other words cpu_shared_tuple_cost  
doesn't really model the cost situation at all well, but it's a useful  
GUC for development purposes for now as positive and negative numbers  
can be used to turn the feature on and off for testing...  As for  
work_mem, it seems to me that 9.6 already established that work_mem is  
a per participant limit, and it would be only fair to let a shared  
plan use a total of work_mem * P too.  I am still working on work_mem  
accounting and reporting.  Accounting for the parallelism in parallel  
shared hash plans is easy though: their estimated tuple count is  
already divided by P in the underlying partial path, and that is a  
fairly accurate characterisation of what's going to happen at  
execution time:  it's often going to go a lot faster, and those plans  
are the real goal of this work.  

STATUS  

Obviously this is a work in progress.  I am actively working on the following:  

* rescan  
* batch number increases  
* skew buckets  
* costing model and policy/accounting for work_mem  
* shared batch file reading  
* preloading next batch  
* debugging and testing  
* tidying and refactoring  

The basic approach is visible and simple cases are working though, so  
I am submitting this WIP work for a round of review in the current  
commitfest and hoping to get some feedback and ideas.  I will post the  
patch in a follow-up email shortly...  Thanks for reading!  

[1] https://www.postgresql.org/message-id/flat/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA(at)mail(dot)gmail(dot)com#CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com  
[2] https://www.postgresql.org/message-id/flat/CAEepm%3D0HmRefi1%2BxDJ99Gj5APHr8Qr05KZtAxrMj8b%2Bay3o6sA%40mail.gmail.com  
[3] https://www.postgresql.org/message-id/flat/CAEepm%3D2_y7oi01OjA_wLvYcWMc9_d%3DLaoxrY3eiROCZkB_qakA%40mail.gmail.com  

--   
Thomas Munro  
http://www.enterprisedb.com  

这个patch的讨论,详见邮件组,本文末尾URL。

PostgreSQL社区的作风非常严谨,一个patch可能在邮件组中讨论几个月甚至几年,根据大家的意见反复的修正,patch合并到master已经非常成熟,所以PostgreSQL的稳定性也是远近闻名的。

参考

https://commitfest.postgresql.org/13/871/

https://www.postgresql.org/message-id/flat/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com#CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
8月前
|
SQL 关系型数据库 PostgreSQL
把PostgreSQL的表导入SQLite
把PostgreSQL的表导入SQLite
119 0
|
5月前
|
SQL 分布式计算 大数据
"大数据计算难题揭秘:MaxCompute中hash join内存超限,究竟该如何破解?"
【8月更文挑战第20天】在大数据处理领域,阿里云的MaxCompute以高效稳定著称,但复杂的hash join操作常导致内存超限。本文通过一个实例解析此问题:数据分析师小王需对两个共计300GB的大表进行join,却遭遇内存不足。经分析发现,单个mapper任务内存默认为2GB,不足以支持大型hash表的构建。为此,提出三种解决方案:1) 提升mapper任务内存;2) 利用map join优化小表连接;3) 实施分而治之策略,将大表分割后逐一处理再合并结果。这些方法有助于提升大数据处理效率及稳定性。
117 0
|
6月前
|
SQL 监控 关系型数据库
实时计算 Flink版操作报错合集之在设置监控PostgreSQL数据库时,将wal_level设置为logical,出现一些表更新和删除操作报错,怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
7月前
|
关系型数据库 PostgreSQL
postgresql如何将没有关联关系的两张表的字段合并
【6月更文挑战第2天】postgresql如何将没有关联关系的两张表的字段合并
167 3
|
7月前
|
SQL 关系型数据库 数据库连接
ClickHouse(20)ClickHouse集成PostgreSQL表引擎详细解析
ClickHouse的PostgreSQL引擎允许直接查询和插入远程PostgreSQL服务器的数据。`CREATE TABLE`语句示例展示了如何定义这样的表,包括服务器信息和权限。查询在只读事务中执行,简单筛选在PostgreSQL端处理,复杂操作在ClickHouse端完成。`INSERT`通过`COPY`命令在PostgreSQL事务中进行。注意,数组类型的处理和Nullable列的行为。示例展示了如何从PostgreSQL到ClickHouse同步数据。一系列的文章详细解释了ClickHouse的各种特性和表引擎。
213 0
|
8月前
|
关系型数据库 PostgreSQL
postgresql将没有关联关系的两张表合并成一张
【5月更文挑战第4天】postgresql将没有关联关系的两张表合并成一张
271 5
|
7月前
|
SQL 关系型数据库 PostgreSQL
【sql】PostgreSQL物化视图表使用案例
【sql】PostgreSQL物化视图表使用案例
73 0
|
SQL 关系型数据库 数据库
postgresql中连接两张表更新第三张表(updata)
如何结合两张表的数据来更新第三张表
376 0
|
7月前
|
关系型数据库 PostgreSQL
postgresql如何将没有关联关系的两张表合并成一张
【6月更文挑战第2天】postgresql如何将没有关联关系的两张表合并成一张
153 0
|
8月前
|
存储 JSON 关系型数据库
PostgreSQL Json应用场景介绍和Shared Detoast优化
PostgreSQL Json应用场景介绍和Shared Detoast优化

热门文章

最新文章

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版