Re: Parallel Hash take II

From: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
To: Prabhat Sahu <prabhat(dot)sahu(at)enterprisedb(dot)com>
Cc: Rushabh Lathia <rushabh(dot)lathia(at)gmail(dot)com>, Peter Geoghegan <pg(at)bowt(dot)ie>, Robert Haas <robertmhaas(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>, Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Oleg Golovanov <rentech(at)mail(dot)ru>
Subject: Re: Parallel Hash take II
Date: 2017-10-24 09:10:53
Views: Raw Message | Whole Thread | Download mbox
Lists: pgsql-hackers

On Tue, Sep 19, 2017 at 8:06 AM, Robert Haas <robertmhaas(at)gmail(dot)com> wrote:
> On Thu, Sep 14, 2017 at 10:01 AM, Thomas Munro
> <thomas(dot)munro(at)enterprisedb(dot)com> wrote:
>> 3. Gather Merge and Parallel Hash Join may have a deadlock problem.
> [...]
> Thomas and I spent about an hour and a half brainstorming about this
> just now.
> [...]
> First, as long as nbatches == 1, we can use a hash
> table of up to size (participants * work_mem); if we have to switch to
> multiple batches, then just increase the number of batches enough that
> the current memory usage drops below work_mem. Second, following an
> idea originally by Ashutosh Bapat whose relevance to this issue Thomas
> Munro realized during our discussion, we can make all the batches
> small enough to fit in work_mem (rather than participants * work_mem
> as the current patch does) and spread them across the workers (in the
> style of Parallel Append, including potentially deploying multiple
> workers against the same batch if there are fewer batches than
> workers).

Here is an updated patch set that does that ^.

Assorted details:

1. To avoid deadlocks, we can only wait at barriers when we know that
all other attached backends have either arrived already or are
actively executing the code preceding the barrier wait, so that
progress is guaranteed. The rules is that executor nodes can remain
attached to a barrier after they've emitted a tuple, which is useful
for resource management (ie avoids inventing a separate reference
counting scheme), but must never again wait for it. With that
programming rule there can be no deadlock between executor nodes.

2. Multiple batches are processed at the same time in parallel,
rather than being processed serially. Participants try to spread
themselves out over different batches to reduce contention.

3. I no longer try to handle outer joins. I have an idea for how to
do that while adhering to the above deadlock-avoidance programming
rule, but I would like to consider that for a later patch.

4. I moved most of the parallel-aware code into ExecParallelHash*()
functions that exist alongside the private hash table versions. This
avoids uglifying the existing hash join code and introducing
conditional branches all over the place, as requested by Andres. This
made some of the earlier refactoring patches unnecessary.

5. Inner batch repartitioning, if required, is now completed up front
for Parallel Hash. Rather than waiting until we try to load hash
tables back into memory to discover that they don't fit, this version
tracks the size of hash table contents while writing the batches out.
That change has various pros and cons, but its main purpose is to
remove dependencies between batches.

6. Outer batch repartitioning is now done up front too, if it's
necessary. This removes the dependencies that exist between batch 0
and later batches, but means that outer batch 0 is now written to disk
if for multi-batch joins. I don't see any way to avoid that while
adhering to the deadlock avoidance rule stated above. If we've
already started emitting tuples for batch 0 (by returning control to
higher nodes) then we have no deadlock-free way to wait for the scan
of the outer relation to finish, so we can't safely process later
batches. Therefore we must buffer batch 0's tuples. This renders the
skew optimisation useless.

7. There is now some book-keeping state for each batch. For typical
cases the total space used is negligible but obviously you can
contrive degenerate cases where it eats a lot of memory (say by
creating a million batches, which is unlikely to work well for other
reasons). I have some ideas on reducing their size, but on the other
hand I also have some ideas on increasing it profitably... (this is
the perfect place to put the Bloom filters discussed elsewhere that
would make up for the loss of the skew optimisation, for selective
joins; a subject for another day).

8. Barrier API extended slightly. (1) BarrierWait() is renamed to
BarrierArriveAndWait(). (2) BarrierArriveAndDetach() is new. The new
function is the mechanism required to get from PHJ_BATCH_PROBING to
PHJ_BATCH_DONE without waiting, and corresponds to the operation known
as Phaser.arriveAndDeregister() in Java (and maybe also
barrier::arrive_and_drop() in the C++ concurrency TS and Clock.drop()
in X10, not sure).

9. I got rid of PARALLEL_KEY_EXECUTOR_NODE_NTH(). Previously I
wanted more than one reserved smh_toc key per executor node. Robert
didn't like that.

10. I got rid of "LeaderGate". That earlier attempt at deadlock
avoidance clearly missed the mark. (I think it probably defended
against the Gather + full TupleQueue deadlock but not the
GatherMerge-induced deadlock so it was a useless non-general

11. The previous incarnation of SharedTuplestore had a built-in
concept of partitions, which allowed the number of partitions to be
expanded any time but only allowed one partition to be read back in at
a time. That worked for the earlier kind of sequential partition
processing but doesn't work for parallel partition processing. I
chose to get rid of the built-in partitioning concept and create
separate SharedTuplestores for each batch, since I now have a place to
store per-batch information in memory.

12. The previous incarnation of BufFileSet had a concept of "stripe"
used for distributing temporary files over tablespaces; this is now
gone from the API. Files are distributed over your configured
temp_tablespaces without help from client code.

13. For now the rescan optimisation (ie reusing the hash table) is
not enabled for Parallel Hash. I had to provide
ExecHashJoinReInitializeDSM (for 41b0dd98), but haven't figured out
how to reach it yet and didn't want to wait any longer before posting
a new patch so that function is effectively blind code and will
probably require adjustments.

I've also attached a test that shows a worker starting up and
attaching at every phase.

Thomas Munro

Attachment Content-Type Size
parallel-hash-v21.patchset.tgz application/x-gzip 60.4 KB
parallel-hash-all-arrival-phases.sql application/octet-stream 3.3 KB

In response to


Browse pgsql-hackers by date

  From Date Subject
Next Message Alvaro Herrera 2017-10-24 10:39:12 Re: SIGSEGV in BRIN autosummarize
Previous Message Ivan Kartyshov 2017-10-24 07:56:15 Re: WIP: long transactions on hot standby feedback replica / proof of concept