PostgreSQL 10.0 preview 多核并行增强 - 并行hash join支持shared hashdata, 节约哈希表内存提高效率

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介:

标签

PostgreSQL , 10.0 , 多核并行增强 , shared hash表 , hash join


背景

PostgreSQL 9.6支持哈希JOIN并行,但是每个worker进程都需要复制一份哈希表,所以会造成内存的浪费,小表无妨,但是大表的浪费是非常大的。

因此10.0做了一个改进,使用共享的哈希表。

Hi hackers,  

In PostgreSQL 9.6, hash joins can be parallelised under certain  
conditions, but a copy of the hash table is built in every  
participating backend.  That means that memory and CPU time are  
wasted.  In many cases, that's OK: if the hash table contents are  
small and cheap to compute, then we don't really care, we're just  
happy that the probing can be done in parallel.  But in cases where  
the hash table is large and/or expensive to build, we could do much  
better.  I am working on that problem.  

To recap the situation in 9.6, a hash join can appear below a Gather  
node and it looks much the same as a non-parallel hash join except  
that it has a partial outer plan:  

      ->  Hash Join  
            ->  <partial outer plan>  
            ->  Hash  
                  ->  <non-partial parallel-safe inner plan>  

A partial plan is one that has some kind of 'scatter' operation as its  
ultimate source of tuples.  Currently the only kind of scatter  
operation is a Parallel Seq Scan (but see also the Parallel Index Scan  
and Parallel Bitmap Scan proposals).  The scatter operation enables  
parallelism in all the executor nodes above it, as far as the  
enclosing 'gather' operation which must appear somewhere above it.  
Currently the only kind of gather operation is a Gather node (but see  
also the Gather Merge proposal which adds a new one).  

The inner plan is built from a non-partial parallel-safe path and will  
be run in every worker.  

Note that a Hash Join node in 9.6 isn't parallel-aware itself: it's  
not doing anything special at execution time to support parallelism.  
The planner has determined that correct partial results will be  
produced by this plan, but the executor nodes are blissfully unaware  
of parallelism.  

PROPOSED NEW PLAN VARIANTS  

Shortly I will post a patch which introduces two new hash join plan  
variants that are parallel-aware:  

1.  Parallel Hash Join with Shared Hash  

      ->  Parallel Hash Join  
            ->  <partial outer plan>  
            ->  Shared Hash  
                  ->  <non-partial parallel-safe inner plan>  

In this case, there is only one copy of the hash table and only one  
participant loads it.  The other participants wait patiently for one  
chosen backend to finish building the hash table, and then they all  
wake up and probe.  

Call the number of participants P, being the number of workers + 1  
(for the leader).  Compared to a non-shared hash plan, we avoid  
wasting CPU and IO resources running P copies of the inner plan in  
parallel (something that is not well captured in our costing model for  
parallel query today), and we can allow ourselves to use a hash table  
P times larger while sticking to the same overall space target of  
work_mem * P.  

2.  Parallel Hash Join with Parallel Shared Hash  

      ->  Parallel Hash Join  
            ->  <partial outer plan>  
            ->  Parallel Shared Hash  
                  ->  <partial inner plan>  

In this case, the inner plan is run in parallel by all participants.  
We have the advantages of a shared hash table as described above, and  
now we can also divide the work of running the inner plan and hashing  
the resulting tuples by P participants.  Note that Parallel Shared  
Hash is acting as a special kind of gather operation that is the  
counterpart to the scatter operation contained in the inner plan.  

PERFORMANCE  

So far I have been unable to measure any performance degradation  
compared with unpatched master for hash joins with non-shared hash.  
That's good because it means that I didn't slow existing plans down  
when I introduced a bunch of conditional branches to existing hash  
join code.  

Laptop testing shows greater than 2x speedups on several of the TPC-H  
queries with single batches, and no slowdowns.  I will post test  
numbers on big rig hardware in the coming weeks when I have the  
batching code in more complete and stable shape.  

IMPLEMENTATION  

I have taken the approach of extending the existing hash join  
algorithm, rather than introducing separate hash join executor nodes  
or a fundamentally different algorithm.  Here's a short description of  
what the patch does:  

1.  SHARED HASH TABLE  

To share data between participants, the patch uses two other patches I  
have proposed:  DSA areas[1], which provide a higher level interface  
to DSM segments to make programming with processes a little more like  
programming with threads, and in particular a per-parallel-query DSA  
area[2] that is made available for any executor node that needs some  
shared work space.  

The patch uses atomic operations to push tuples into the hash table  
buckets while building, rehashing and loading, and then the hash table  
is immutable during probing (except for match flags used to implement  
outer joins).  The existing memory chunk design is retained for dense  
allocation of tuples, which provides a convenient way to rehash the  
table when its size changes.  

2.  WORK COORDINATION  

To coordinate parallel work, this patch uses two other patches:  
barriers[3], to implement a 'barrier' or 'phaser' synchronisation  
primitive, and those in turn use the condition variables proposed by  
Robert Haas.  

Barriers provide a way for participants to break work up into phases  
that they unanimously agree to enter together, which is a basic  
requirement for parallelising hash joins.  It is not safe to insert  
into the hash table until exactly one participant has created it; it  
is not safe to probe the hash table until all participants have  
finished inserting into it; it is not safe to scan it for unmatched  
tuples until all participants have finished probing it; it is not safe  
to discard it and start loading the next batch until ... you get the  
idea.  You could also construct appropriate synchronisation using  
various other interlocking primitives or flow control systems, but  
fundamentally these wait points would exist at some level, and I think  
this way is quite clean and simple.  YMMV.  

If we had exactly W workers and the leader didn't participate, then we  
could use a simple simple pthread- or MPI-style barrier without an  
explicit notion of 'phase'.  We would simply take the existing hash  
join code, add the shared hash table, add barrier waits at various  
points and make sure that all participants always hit all of those  
points in the same order, and it should All Just Work.   But we have a  
variable party size and a dual-role leader process, and I want to  
highlight the specific problems that causes here because they increase  
the patch size significantly:  

Problem 1:  We don't know how many workers will actually start.  We  
know how many were planned, but at execution time we may have  
exhausted limits and actually get a smaller number.  So we can't use  
"static" barriers like the classic barriers in POSIX or MPI where the  
group size is known up front.  We need "dynamic" barriers with attach  
and detach operations.  As soon as you have varying party size you  
need some kind of explicit model of the current phase, so that a new  
participant can know what to do when it joins.  For that reason, this  
patch uses a phase number to track progress through the parallel hash  
join.  See MultiExecHash and ExecHashJoin which have switch statements  
allowing a newly joined participant to synchronise their own state  
machine and program counter with the phase.  

Problem 2:  One participant is not like the others: Gather may or may  
not decide to run its subplan directly if the worker processes aren't  
producing any tuples (and the proposed Gather Merge is the same).  The  
problem is that it also needs to consume tuples from the fixed-size  
queues of the regular workers.  A deadlock could arise if the leader's  
plan blocks waiting for other participants while another participant  
has filled its output queue and is waiting for the leader to consume.  
One way to avoid such deadlocks is to follow the rule that the leader  
should never wait for other participants if there is any possibility  
that they have emitted tuples.  The simplest way to do that would be  
to have shared hash plans refuse to run in the leader by returning  
NULL to signal the end of this partial tuple stream, but then we'd  
lose a CPU compared to non-shared hash plans.  The latest point the  
leader can exit while respecting that rule is at the end of probing  
the first batch.  That is the approach taken by the patch currently.  
See ExecHashCheckForEarlyExit for logic and discussion.  It would be  
better to be able to use the leader in later batches too, but as far  
as I can see that'd require changes that are out of scope for this  
patch.  One idea would be an executor protocol change allowing plans  
running in the leader to detach and yield, saying 'I have no further  
tuples right now, but I'm not finished; try again later', and then  
reattach when you call it back.  Clearly that sails close to  
asynchronous execution territory.  

Problem 3:  If the leader drops out after the first batch to solve  
problem 2, then it may leave behind batch files which must be  
processed by other participants.  I had originally planned to defer  
work on batch file sharing until a later iteration, thinking that it  
would be a nice performance improvement to redistribute work from  
uneven batch files, but it turns out to be necessary for correct  
results because of participants exiting early.  I am working on a very  
simple batch sharing system to start with...  Participants still  
generate their own batch files, and then new operations BufFileExport  
and BufFileImport are used to grant read-only access to the BufFile to  
other participants.  Each participant reads its own batch files  
entirely and then tries to read from every other participant's batch  
files until they are all exhausted, using a shared read head.  The  
per-tuple locking granularity, extra seeking and needless buffering in  
every backend on batch file reads aren't great, and I'm still figuring  
out temporary file cleanup/ownership semantics.  There may be an  
opportunity to make use of 'unified' BufFile concepts from Peter  
Geoghegan's work, or create some new reusable shared tuple spilling  
infrastructure.  

3.  COSTING  

For now, I have introduced a GUC called cpu_shared_tuple_cost which  
provides a straw-man model of the overhead of exchanging tuples via a  
shared hash table, and the extra process coordination required.  If  
it's zero then a non-shared hash plan (ie multiple copies) has the  
same cost as a shared hash plan, even though the non-shared hash plan  
wastefully runs P copies of the plan.  If cost represents runtime and  
and we assume perfectly spherical cows running without interference  
from each other, that makes some kind of sense, but it doesn't account  
for the wasted resources and contention caused by running the same  
plan in parallel.  I don't know what to do about that yet.  If  
cpu_shared_tuple_cost is a positive number, as it probably should be  
(more on that later), then shared hash tables look more expensive than  
non-shared ones, which is technically true (CPU cache sharing etc) but  
unhelpful because what you lose there you tend to gain by not running  
all those plans in parallel.  In other words cpu_shared_tuple_cost  
doesn't really model the cost situation at all well, but it's a useful  
GUC for development purposes for now as positive and negative numbers  
can be used to turn the feature on and off for testing...  As for  
work_mem, it seems to me that 9.6 already established that work_mem is  
a per participant limit, and it would be only fair to let a shared  
plan use a total of work_mem * P too.  I am still working on work_mem  
accounting and reporting.  Accounting for the parallelism in parallel  
shared hash plans is easy though: their estimated tuple count is  
already divided by P in the underlying partial path, and that is a  
fairly accurate characterisation of what's going to happen at  
execution time:  it's often going to go a lot faster, and those plans  
are the real goal of this work.  

STATUS  

Obviously this is a work in progress.  I am actively working on the following:  

* rescan  
* batch number increases  
* skew buckets  
* costing model and policy/accounting for work_mem  
* shared batch file reading  
* preloading next batch  
* debugging and testing  
* tidying and refactoring  

The basic approach is visible and simple cases are working though, so  
I am submitting this WIP work for a round of review in the current  
commitfest and hoping to get some feedback and ideas.  I will post the  
patch in a follow-up email shortly...  Thanks for reading!  

[1] https://www.postgresql.org/message-id/flat/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA(at)mail(dot)gmail(dot)com#CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com  
[2] https://www.postgresql.org/message-id/flat/CAEepm%3D0HmRefi1%2BxDJ99Gj5APHr8Qr05KZtAxrMj8b%2Bay3o6sA%40mail.gmail.com  
[3] https://www.postgresql.org/message-id/flat/CAEepm%3D2_y7oi01OjA_wLvYcWMc9_d%3DLaoxrY3eiROCZkB_qakA%40mail.gmail.com  

--   
Thomas Munro  
http://www.enterprisedb.com  

这个patch的讨论,详见邮件组,本文末尾URL。

PostgreSQL社区的作风非常严谨,一个patch可能在邮件组中讨论几个月甚至几年,根据大家的意见反复的修正,patch合并到master已经非常成熟,所以PostgreSQL的稳定性也是远近闻名的。

参考

https://commitfest.postgresql.org/13/871/

https://www.postgresql.org/message-id/flat/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com#CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
关系型数据库 数据库 PostgreSQL
PostgreSQL 内存表可选项 - unlogged table
标签 PostgreSQL , 内存表 , unlogged table 背景 内存表,通常被用于不需要持久化,变更频繁,访问RT低的场景。 目前社区版本PostgreSQL没有内存表的功能,postgrespro提供了两个插件可以实现类似内存表的功能。
3411 0
|
6月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 分布式计算 大数据
"大数据计算难题揭秘:MaxCompute中hash join内存超限,究竟该如何破解?"
【8月更文挑战第20天】在大数据处理领域,阿里云的MaxCompute以高效稳定著称,但复杂的hash join操作常导致内存超限。本文通过一个实例解析此问题:数据分析师小王需对两个共计300GB的大表进行join,却遭遇内存不足。经分析发现,单个mapper任务内存默认为2GB,不足以支持大型hash表的构建。为此,提出三种解决方案:1) 提升mapper任务内存;2) 利用map join优化小表连接;3) 实施分而治之策略,将大表分割后逐一处理再合并结果。这些方法有助于提升大数据处理效率及稳定性。
93 0
|
6月前
|
安全 Java Python
GIL是Python解释器的锁,确保单个进程中字节码执行的串行化,以保护内存管理,但限制了多线程并行性。
【6月更文挑战第20天】GIL是Python解释器的锁,确保单个进程中字节码执行的串行化,以保护内存管理,但限制了多线程并行性。线程池通过预创建线程池来管理资源,减少线程创建销毁开销,提高效率。示例展示了如何使用Python实现一个简单的线程池,用于执行多个耗时任务。
49 6
|
NoSQL 算法 Redis
Redis 从入门到精通之 开发和设计内存节约
在使用Redis时,需要权衡内存节省方法的好处和弊端,根据具体情况进行选择。如果数据量较小并且性能不是主要问题,则可以选择简单的数据结构和算法。如果处理大量数据并且性能是主要问题,则可以使用内存节省方法。但是,需要确保代码易于维护和扩展。Redis各种数据结构及其占用内存的benchmark测试
237 9
|
SQL 存储 缓存
PostgreSQL内存上下文[翻译]
PostgreSQL内存上下文[翻译]
292 0
|
SQL 弹性计算 关系型数据库
PostgreSQL 12 preview - CTE 增强,支持用户语法层控制 materialized 优化
标签 PostgreSQL , CTE , materialized , not materialized , push down 背景 PostgreSQL with 语法,能跑非常复杂的SQL逻辑,包括递归,多语句物化计算等。 在12以前的版本中,WITH中的每一个CTE(common table express),都是直接进行物化的,也就是说外层的条件不会推到CTE(物化节点)里
1011 0
|
存储 机器学习/深度学习 人工智能
ML2021 | (腾讯)PatrickStar:通过基于块的内存管理实现预训练模型的并行训练
目前比较常见的并行训练是数据并行,这是基于模型能够在一个GPU上存储的前提,而当这个前提无法满足时,则需要将模型放在多个GPU上。现有的一些模型并行方案仍存在许多问题,本文提出了一种名为PatrickStar的异构训练系统。PatrickStar通过以细粒度方式管理模型数据来更有效地使用异构内存,从而克服了这些缺点。
ML2021 | (腾讯)PatrickStar:通过基于块的内存管理实现预训练模型的并行训练
|
弹性计算 关系型数据库 数据库连接
PostgreSQL 12 preview - Move max_wal_senders out of max_connections for connection slot handling
标签 PostgreSQL , max_wal_senders , max_connections , sorry, too many clients already 背景 如果你需要使用PG的流复制,上游节点的max_wal_senders参数,用来限制这个节点同时最多可以有多少个wal sender进程。 包括逻辑复制、物理复制、pg_basebackup备份等,只要是使用stre
378 0
|
弹性计算 安全 关系型数据库
PostgreSQL 12 preview - 可靠性提升 - data_sync_retry 消除os层write back failed status不可靠的问题
标签 PostgreSQL , data_sync_retry , write back , retry , failed status 背景 有些OS系统,对fsync的二次调用不敏感,因为OS层可能有自己的CACHE,如果使用了buffer write,并且出现write back failed的情况,有些OS可能在下次fsync时并不能正确的反馈fsync的可靠性与否。(因为这个B
572 0

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版