标签
PostgreSQL , 10.0 , 多核并行增强 , shared hash表 , hash join
背景
PostgreSQL 9.6支持哈希JOIN并行,但是每个worker进程都需要复制一份哈希表,所以会造成内存的浪费,小表无妨,但是大表的浪费是非常大的。
因此10.0做了一个改进,使用共享的哈希表。
Hi hackers,
In PostgreSQL 9.6, hash joins can be parallelised under certain
conditions, but a copy of the hash table is built in every
participating backend. That means that memory and CPU time are
wasted. In many cases, that's OK: if the hash table contents are
small and cheap to compute, then we don't really care, we're just
happy that the probing can be done in parallel. But in cases where
the hash table is large and/or expensive to build, we could do much
better. I am working on that problem.
To recap the situation in 9.6, a hash join can appear below a Gather
node and it looks much the same as a non-parallel hash join except
that it has a partial outer plan:
-> Hash Join
-> <partial outer plan>
-> Hash
-> <non-partial parallel-safe inner plan>
A partial plan is one that has some kind of 'scatter' operation as its
ultimate source of tuples. Currently the only kind of scatter
operation is a Parallel Seq Scan (but see also the Parallel Index Scan
and Parallel Bitmap Scan proposals). The scatter operation enables
parallelism in all the executor nodes above it, as far as the
enclosing 'gather' operation which must appear somewhere above it.
Currently the only kind of gather operation is a Gather node (but see
also the Gather Merge proposal which adds a new one).
The inner plan is built from a non-partial parallel-safe path and will
be run in every worker.
Note that a Hash Join node in 9.6 isn't parallel-aware itself: it's
not doing anything special at execution time to support parallelism.
The planner has determined that correct partial results will be
produced by this plan, but the executor nodes are blissfully unaware
of parallelism.
PROPOSED NEW PLAN VARIANTS
Shortly I will post a patch which introduces two new hash join plan
variants that are parallel-aware:
1. Parallel Hash Join with Shared Hash
-> Parallel Hash Join
-> <partial outer plan>
-> Shared Hash
-> <non-partial parallel-safe inner plan>
In this case, there is only one copy of the hash table and only one
participant loads it. The other participants wait patiently for one
chosen backend to finish building the hash table, and then they all
wake up and probe.
Call the number of participants P, being the number of workers + 1
(for the leader). Compared to a non-shared hash plan, we avoid
wasting CPU and IO resources running P copies of the inner plan in
parallel (something that is not well captured in our costing model for
parallel query today), and we can allow ourselves to use a hash table
P times larger while sticking to the same overall space target of
work_mem * P.
2. Parallel Hash Join with Parallel Shared Hash
-> Parallel Hash Join
-> <partial outer plan>
-> Parallel Shared Hash
-> <partial inner plan>
In this case, the inner plan is run in parallel by all participants.
We have the advantages of a shared hash table as described above, and
now we can also divide the work of running the inner plan and hashing
the resulting tuples by P participants. Note that Parallel Shared
Hash is acting as a special kind of gather operation that is the
counterpart to the scatter operation contained in the inner plan.
PERFORMANCE
So far I have been unable to measure any performance degradation
compared with unpatched master for hash joins with non-shared hash.
That's good because it means that I didn't slow existing plans down
when I introduced a bunch of conditional branches to existing hash
join code.
Laptop testing shows greater than 2x speedups on several of the TPC-H
queries with single batches, and no slowdowns. I will post test
numbers on big rig hardware in the coming weeks when I have the
batching code in more complete and stable shape.
IMPLEMENTATION
I have taken the approach of extending the existing hash join
algorithm, rather than introducing separate hash join executor nodes
or a fundamentally different algorithm. Here's a short description of
what the patch does:
1. SHARED HASH TABLE
To share data between participants, the patch uses two other patches I
have proposed: DSA areas[1], which provide a higher level interface
to DSM segments to make programming with processes a little more like
programming with threads, and in particular a per-parallel-query DSA
area[2] that is made available for any executor node that needs some
shared work space.
The patch uses atomic operations to push tuples into the hash table
buckets while building, rehashing and loading, and then the hash table
is immutable during probing (except for match flags used to implement
outer joins). The existing memory chunk design is retained for dense
allocation of tuples, which provides a convenient way to rehash the
table when its size changes.
2. WORK COORDINATION
To coordinate parallel work, this patch uses two other patches:
barriers[3], to implement a 'barrier' or 'phaser' synchronisation
primitive, and those in turn use the condition variables proposed by
Robert Haas.
Barriers provide a way for participants to break work up into phases
that they unanimously agree to enter together, which is a basic
requirement for parallelising hash joins. It is not safe to insert
into the hash table until exactly one participant has created it; it
is not safe to probe the hash table until all participants have
finished inserting into it; it is not safe to scan it for unmatched
tuples until all participants have finished probing it; it is not safe
to discard it and start loading the next batch until ... you get the
idea. You could also construct appropriate synchronisation using
various other interlocking primitives or flow control systems, but
fundamentally these wait points would exist at some level, and I think
this way is quite clean and simple. YMMV.
If we had exactly W workers and the leader didn't participate, then we
could use a simple simple pthread- or MPI-style barrier without an
explicit notion of 'phase'. We would simply take the existing hash
join code, add the shared hash table, add barrier waits at various
points and make sure that all participants always hit all of those
points in the same order, and it should All Just Work. But we have a
variable party size and a dual-role leader process, and I want to
highlight the specific problems that causes here because they increase
the patch size significantly:
Problem 1: We don't know how many workers will actually start. We
know how many were planned, but at execution time we may have
exhausted limits and actually get a smaller number. So we can't use
"static" barriers like the classic barriers in POSIX or MPI where the
group size is known up front. We need "dynamic" barriers with attach
and detach operations. As soon as you have varying party size you
need some kind of explicit model of the current phase, so that a new
participant can know what to do when it joins. For that reason, this
patch uses a phase number to track progress through the parallel hash
join. See MultiExecHash and ExecHashJoin which have switch statements
allowing a newly joined participant to synchronise their own state
machine and program counter with the phase.
Problem 2: One participant is not like the others: Gather may or may
not decide to run its subplan directly if the worker processes aren't
producing any tuples (and the proposed Gather Merge is the same). The
problem is that it also needs to consume tuples from the fixed-size
queues of the regular workers. A deadlock could arise if the leader's
plan blocks waiting for other participants while another participant
has filled its output queue and is waiting for the leader to consume.
One way to avoid such deadlocks is to follow the rule that the leader
should never wait for other participants if there is any possibility
that they have emitted tuples. The simplest way to do that would be
to have shared hash plans refuse to run in the leader by returning
NULL to signal the end of this partial tuple stream, but then we'd
lose a CPU compared to non-shared hash plans. The latest point the
leader can exit while respecting that rule is at the end of probing
the first batch. That is the approach taken by the patch currently.
See ExecHashCheckForEarlyExit for logic and discussion. It would be
better to be able to use the leader in later batches too, but as far
as I can see that'd require changes that are out of scope for this
patch. One idea would be an executor protocol change allowing plans
running in the leader to detach and yield, saying 'I have no further
tuples right now, but I'm not finished; try again later', and then
reattach when you call it back. Clearly that sails close to
asynchronous execution territory.
Problem 3: If the leader drops out after the first batch to solve
problem 2, then it may leave behind batch files which must be
processed by other participants. I had originally planned to defer
work on batch file sharing until a later iteration, thinking that it
would be a nice performance improvement to redistribute work from
uneven batch files, but it turns out to be necessary for correct
results because of participants exiting early. I am working on a very
simple batch sharing system to start with... Participants still
generate their own batch files, and then new operations BufFileExport
and BufFileImport are used to grant read-only access to the BufFile to
other participants. Each participant reads its own batch files
entirely and then tries to read from every other participant's batch
files until they are all exhausted, using a shared read head. The
per-tuple locking granularity, extra seeking and needless buffering in
every backend on batch file reads aren't great, and I'm still figuring
out temporary file cleanup/ownership semantics. There may be an
opportunity to make use of 'unified' BufFile concepts from Peter
Geoghegan's work, or create some new reusable shared tuple spilling
infrastructure.
3. COSTING
For now, I have introduced a GUC called cpu_shared_tuple_cost which
provides a straw-man model of the overhead of exchanging tuples via a
shared hash table, and the extra process coordination required. If
it's zero then a non-shared hash plan (ie multiple copies) has the
same cost as a shared hash plan, even though the non-shared hash plan
wastefully runs P copies of the plan. If cost represents runtime and
and we assume perfectly spherical cows running without interference
from each other, that makes some kind of sense, but it doesn't account
for the wasted resources and contention caused by running the same
plan in parallel. I don't know what to do about that yet. If
cpu_shared_tuple_cost is a positive number, as it probably should be
(more on that later), then shared hash tables look more expensive than
non-shared ones, which is technically true (CPU cache sharing etc) but
unhelpful because what you lose there you tend to gain by not running
all those plans in parallel. In other words cpu_shared_tuple_cost
doesn't really model the cost situation at all well, but it's a useful
GUC for development purposes for now as positive and negative numbers
can be used to turn the feature on and off for testing... As for
work_mem, it seems to me that 9.6 already established that work_mem is
a per participant limit, and it would be only fair to let a shared
plan use a total of work_mem * P too. I am still working on work_mem
accounting and reporting. Accounting for the parallelism in parallel
shared hash plans is easy though: their estimated tuple count is
already divided by P in the underlying partial path, and that is a
fairly accurate characterisation of what's going to happen at
execution time: it's often going to go a lot faster, and those plans
are the real goal of this work.
STATUS
Obviously this is a work in progress. I am actively working on the following:
* rescan
* batch number increases
* skew buckets
* costing model and policy/accounting for work_mem
* shared batch file reading
* preloading next batch
* debugging and testing
* tidying and refactoring
The basic approach is visible and simple cases are working though, so
I am submitting this WIP work for a round of review in the current
commitfest and hoping to get some feedback and ideas. I will post the
patch in a follow-up email shortly... Thanks for reading!
[1] https://www.postgresql.org/message-id/flat/CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA(at)mail(dot)gmail(dot)com#CAEepm=1z5WLuNoJ80PaCvz6EtG9dN0j-KuHcHtU6QEfcPP5-qA@mail.gmail.com
[2] https://www.postgresql.org/message-id/flat/CAEepm%3D0HmRefi1%2BxDJ99Gj5APHr8Qr05KZtAxrMj8b%2Bay3o6sA%40mail.gmail.com
[3] https://www.postgresql.org/message-id/flat/CAEepm%3D2_y7oi01OjA_wLvYcWMc9_d%3DLaoxrY3eiROCZkB_qakA%40mail.gmail.com
--
Thomas Munro
http://www.enterprisedb.com
这个patch的讨论,详见邮件组,本文末尾URL。
PostgreSQL社区的作风非常严谨,一个patch可能在邮件组中讨论几个月甚至几年,根据大家的意见反复的修正,patch合并到master已经非常成熟,所以PostgreSQL的稳定性也是远近闻名的。