Re: Parallel Full Hash Join

From: Melanie Plageman <melanieplageman(at)gmail(dot)com>
To: Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
Cc: pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Parallel Full Hash Join
Date: 2020-09-21 20:49:17
Message-ID: CAAKRu_bSJfCSNFBk+sKaE-ViFNdo_qvqW1MDEedbbG=99PrPHA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Sep 11, 2019 at 11:23 PM Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
wrote:

>
> While thinking about looping hash joins (an alternative strategy for
> limiting hash join memory usage currently being investigated by
> Melanie Plageman in a nearby thread[1]), the topic of parallel query
> deadlock hazards came back to haunt me. I wanted to illustrate the
> problems I'm aware of with the concrete code where I ran into this
> stuff, so here is a new-but-still-broken implementation of $SUBJECT.
> This was removed from the original PHJ submission when I got stuck and
> ran out of time in the release cycle for 11. Since the original
> discussion is buried in long threads and some of it was also a bit
> confused, here's a fresh description of the problems as I see them.
> Hopefully these thoughts might help Melanie's project move forward,
> because it's closely related, but I didn't want to dump another patch
> into that other thread. Hence this new thread.
>
> I haven't succeeded in actually observing a deadlock with the attached
> patch (though I did last year, very rarely), but I also haven't tried
> very hard. The patch seems to produce the right answers and is pretty
> scalable, so it's really frustrating not to be able to get it over the
> line.
>
> Tuple queue deadlock hazard:
>
> If the leader process is executing the subplan itself and waiting for
> all processes to arrive in ExecParallelHashEndProbe() (in this patch)
> while another process has filled up its tuple queue and is waiting for
> the leader to read some tuples an unblock it, they will deadlock
> forever. That can't happen in the the committed version of PHJ,
> because it never waits for barriers after it has begun emitting
> tuples.
>
> Some possible ways to fix this:
>
> 1. You could probably make it so that the PHJ_BATCH_SCAN_INNER phase
> in this patch (the scan for unmatched tuples) is executed by only one
> process, using the "detach-and-see-if-you-were-last" trick. Melanie
> proposed that for an equivalent problem in the looping hash join. I
> think it probably works, but it gives up a lot of parallelism and thus
> won't scale as nicely as the attached patch.
>

I have attached a patch which implements this
(v1-0001-Parallel-FOJ-ROJ-single-worker-scan-buckets.patch).

For starters, in order to support parallel FOJ and ROJ, I re-enabled
setting the match bit for the tuples in the hashtable which
3e4818e9dd5be294d97c disabled. I did so using the code suggested in [1],
reading the match bit to see if it is already set before setting it.

Then, workers except for the last worker detach after exhausting the
outer side of a batch, leaving one worker to proceed to HJ_FILL_INNER
and do the scan of the hash table and emit unmatched inner tuples.

I have also attached a variant on this patch which I am proposing to
replace it (v1-0001-Parallel-FOJ-ROJ-single-worker-scan-chunks.patch)
which has a new ExecParallelScanHashTableForUnmatched() in which the
single worker doing the unmatched scan scans one HashMemoryChunk at a
time and then frees them as it goes. I thought this might perform better
than the version which uses the buckets because 1) it should do a bit
less pointer chasing and 2) it frees each chunk of the hash table as it
scans it which (maybe) would save a bit of time during
ExecHashTableDetachBatch() when it goes through and frees the hash
table, but, my preliminary tests showed a negligible difference between
this and the version using buckets. I will do a bit more testing,
though.

I tried a few other variants of these patches, including one in which
the workers detach from the batch inside of the batch loading and
probing phase machine, ExecParallelHashJoinNewBatch(). This meant that
all workers transition to HJ_FILL_INNER and then HJ_NEED_NEW_BATCH in
order to detach in the batch phase machine. This, however, involved
adding a lot of new variables to distinguish whether or or not the
unmatched outer scan was already done, whether or not the current worker
was the worker elected to do the scan, etc. Overall, it is probably
incorrect to use the HJ_NEED_NEW_BATCH state in this way. I had
originally tried this to avoid operating on the batch_barrier in the
main hash join state machine. I've found that the more different places
we add code attaching and detaching to the batch_barrier (and other PHJ
barriers, for that matter), the harder it is to debug the code, however,
I think in this case it is required.

> 2. You could probably make it so that only the leader process drops
> out of executing the inner unmatched scan, and then I think you
> wouldn't have this very specific problem at the cost of losing some
> (but not all) parallelism (ie the leader), but there might be other
> variants of the problem. For example, a GatherMerge leader process
> might be blocked waiting for the next tuple for a tuple from P1, while
> P2 is try to write to a full queue, and P1 waits for P2.
>
> 3. You could introduce some kind of overflow for tuple queues, so
> that tuple queues can never block because they're full (until you run
> out of extra memory buffers or disk and error out). I haven't
> seriously looked into this but I'm starting to suspect it's the
> industrial strength general solution to the problem and variants of it
> that show up in other parallelism projects (Parallel Repartition). As
> Robert mentioned last time I talked about this[2], you'd probably only
> want to allow spooling (rather than waiting) when the leader is
> actually waiting for other processes; I'm not sure how exactly to
> control that.
>
> 4. <thinking-really-big>Goetz Graefe's writing about parallel sorting
> comes close to this topic, which he calls flow control deadlocks. He
> mentions the possibility of infinite spooling like (3) as a solution.
> He's describing a world where producers and consumers are running
> concurrently, and the consumer doesn't just decide to start running
> the subplan (what we call "leader participation"), so he doesn't
> actually have a problem like Gather deadlock. He describes
> planner-enforced rules that allow deadlock free execution even with
> fixed-size tuple queue flow control by careful controlling where
> order-forcing operators are allowed to appear, so he doesn't have a
> problem like Gather Merge deadlock. I'm not proposing we should
> create a whole bunch of producer and consumer processes to run
> different plan fragments, but I think you can virtualise the general
> idea in an async executor with "streams", and that also solves other
> problems when you start working with partitions in a world where it's
> not even sure how many workers will show up. I see this as a long
> term architectural goal requiring vast amounts of energy to achieve,
> hence my new interest in (3) for now.</thinking-really-big>
>
> Hypothetical inter-node deadlock hazard:
>
> Right now I think it is the case the whenever any node begins pulling
> tuples from a subplan, it continues to do so until either the query
> ends early or the subplan runs out of tuples. For example, Append
> processes its subplans one at a time until they're done -- it doesn't
> jump back and forth. Parallel Append doesn't necessarily run them in
> the order that they appear in the plan, but it still runs each one to
> completion before picking another one. If we ever had a node that
> didn't adhere to that rule, then two Parallel Full Hash Join nodes
> could dead lock, if some of the workers were stuck waiting in one
> while some were stuck waiting in the other.
>
> If we were happy to decree that that is a rule of the current
> PostgreSQL executor, then this hypothetical problem would go away.
> For example, consider the old patch I recently rebased[3] to allow
> Append over a bunch of FDWs representing remote shards to return
> tuples as soon as they're ready, not necessarily sequentially (and I
> think several others have worked on similar patches). To be
> committable under such a rule that applies globally to the whole
> executor, that patch would only be allowed to *start* them in any
> order, but once it's started pulling tuples from a given subplan it'd
> have to pull them all to completion before considering another node.
>
> (Again, that problem goes away in an async model like (4), which will
> also be able to do much more interesting things with FDWs, and it's
> the FDW thing that I think generates more interest in async execution
> than my rambling about abstract parallel query problems.)
>
>
The leader exclusion tactics and the spooling idea don't solve the
execution order deadlock possibility, so, this "all except last detach
and last does unmatched inner scan" seems like the best way to solve
both types of deadlock.
There is another option that could maintain some parallelism for the
unmatched inner scan.

This method is exactly like the "all except last detach and last does
unmatched inner scan" method from the perspective of the main hash join
state machine. The difference is in ExecParallelHashJoinNewBatch(). In
the batch_barrier phase machine, workers loop around looking for batches
that are not done.

In this "detach for now" method, all workers except the last one detach
from a batch after exhausting the outer side. They will mark the batch
they were just working on as "provisionally done" (as opposed to
"done"). The last worker advances the batch_barrier from
PHJ_BATCH_PROBING to PHJ_BATCH_SCAN_INNER.

All detached workers then proceed to HJ_NEED_NEW_BATCH and try to find
another batch to work on. If there are no batches that are neither
"done" or "provisionally done", then the worker will re-attach to
batches that are "provisionally done" and attempt to join in conducting
the unmatched inner scan. Once it finishes its worker there, it will
return to HJ_NEED_NEW_BATCH, enter ExecParallelHashJoinNewBatch() and
mark the batch as "done".

Because the worker detached from the batch, this method solves the tuple
queue flow control deadlock issue--this worker could not be attempting
to emit a tuple while the leader waits at the barrier for it. There is
no waiting at the barrier.

However, it is unclear to me whether or not this method will be at risk
of inter-node deadlock/execution order deadlock. It seems like this is
not more at risk than the existing code is for this issue.

If a worker never returns to the HashJoin after leaving to emit a tuple,
in any of the methods (and in master), the query would not finish
correctly because the workers are attached to the batch_barrier while
emitting tuples and, though they may not wait at this barrier again, the
hashtable is cleaned up by the last participant to detach, and this
would not happen if it doesn't return to the batch phase machine. I'm
not sure if this exhibits the problematic behavior detailed above, but,
if it does, it is not unique to this method.

Some other notes on the patch:
>
> Aside from the deadlock problem, there are some minor details to tidy
> up (handling of late starters probably not quite right, rescans not
> yet considered).

These would not be an issue with only one worker doing the scan but
would have to be handled in a potential new parallel-enabled solution
like I suggested above.

> There is a fun hard-coded parameter that controls
> the parallel step size in terms of cache lines for the unmatched scan;
> I found that 8 was a lot faster than 4, but no slower than 128 on my
> laptop, so I set it to 8.

I didn't add this cache line optimization to my chunk scanning method. I
could do so. Do you think it is more relevant, less relevant, or the
same if only one worker is doing the unmatched inner scan?

More thoughts along those micro-optimistic
> lines: instead of match bit in the header, you could tag the pointer
> and sometimes avoid having to follow it, and you could prefetch next
> non-matching tuple's cacheline by looking a head a bit.
>

I would be happy to try doing this once we get the rest of the patch
ironed out so that seeing how much of a performance difference it makes
is more straightforward.

>
> [1]
> https://www.postgresql.org/message-id/flat/CA%2BhUKGKWWmf%3DWELLG%3DaUGbcugRaSQbtm0tKYiBut-B2rVKX63g%40mail.gmail.com
> [2]
> https://www.postgresql.org/message-id/CA%2BTgmoY4LogYcg1y5JPtto_fL-DBUqvxRiZRndDC70iFiVsVFQ%40mail.gmail.com
> [3]
> https://www.postgresql.org/message-id/flat/CA%2BhUKGLBRyu0rHrDCMC4%3DRn3252gogyp1SjOgG8SEKKZv%3DFwfQ%40mail.gmail.com
>
>
>
[1]
https://www.postgresql.org/message-id/0F44E799048C4849BAE4B91012DB910462E9897A%40SHSMSX103.ccr.corp.intel.com

-- Melanie

Attachment Content-Type Size
v1-0001-Parallel-FOJ-ROJ-single-worker-scan-chunks.patch application/octet-stream 23.6 KB
v1-0001-Parallel-FOJ-ROJ-single-worker-scan-buckets.patch application/octet-stream 22.3 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2020-09-21 21:01:09 Re: recovering from "found xmin ... from before relfrozenxid ..."
Previous Message Tom Lane 2020-09-21 20:40:40 Re: recovering from "found xmin ... from before relfrozenxid ..."