Re: [HACKERS] Parallel Hash take II

From: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Rushabh Lathia <rushabh(dot)lathia(at)gmail(dot)com>, Prabhat Sahu <prabhat(dot)sahu(at)enterprisedb(dot)com>, Peter Geoghegan <pg(at)bowt(dot)ie>, Robert Haas <robertmhaas(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>, Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Oleg Golovanov <rentech(at)mail(dot)ru>
Subject: Re: [HACKERS] Parallel Hash take II
Date: 2017-12-20 06:20:57
Views: Raw Message | Whole Thread | Download mbox
Lists: pgsql-hackers

On Thu, Dec 14, 2017 at 10:12 PM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> Looking at the main patch (v28).
> First off: This looks pretty good, the code's quite readable now
> (especially compared to earlier versions), the comments are good. Really
> like the nodeHash split, and the always inline hackery in nodeHashjoin.
> Think we're getting really really close.


> * ExecHashJoinImpl
> *
> - * This function implements the Hybrid Hashjoin algorithm.
> + * This function implements the Hybrid Hashjoin algorithm. By forcing it
> + * to be always inline many compilers are able to specialize it for
> + * parallel = true/false without repeating the code.
> *
> what about adding the above explanation for the always inline?

Ok, I added something like that.

> + /*
> + * So far we have no idea whether there are any other participants,
> + * and if so, what phase they are working on. The only thing we care
> + * about at this point is whether someone has already created the
> + * SharedHashJoinBatch objects, the main hash table for batch 0 and
> + * (if necessary) the skew hash table yet. One backend will be
> + * elected to do that now if necessary.
> + */
> The 'yet' sounds a bit weird in combination with the 'already'.

Fixed. Also removed an outdated mention of skew hash table.

> + static void
> + ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
> + ...
> + /* Elect one participant to prepare the operation. */
> That's a good chunk of code. I'm ever so slightly inclined to put that
> into a separate function. But I'm not sure it'd look good. Feel entirely
> free to disregard.

Disregarding for now...

> + static HashJoinTuple
> + ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple,
> + dsa_pointer *shared)
> Not really happy with the name. ExecParallelHashTableInsert() calling
> ExecParallelHashLoadTuple() to insert a tuple into the hashtable doesn't
> quite strike me as right; the naming similarity to
> ExecParallelHashTableLoad seems problematic too.
> ExecParallelHashAllocTuple() or such?
> One could argue it'd not be a bad idea to keep a similar split as
> dense_alloc() and memcpy() have, but I'm not really convinced by
> myself. Hm.

Yeah, the names are confusing. So:

1. I changed ExecParallelHashTableLoad() to
ExecParallelHashTableInsertCurrentBatch(). It's just like
ExecParallelHashTableInsert() except that it's not prepared to send
tuples to a different batch or to run out of memory (it's used only
for loading batches from disk where the total size was already checked
by ExecParallelHashTablePrealloc(), and in the parallel case there is
no 'send forward to future batch' requirement so we might as well skip
the conditional stuff).

2. I changed ExecParallelHashLoadTuple() to
ExecParallelHashTupleAlloc(), and made it so that the caller must call
memcpy so that it's more like dense_alloc(). I have now been working
on PostgreSQL long enough that arbitrary differences in case
conventions don't even hurt my eyes anymore.

I also renamed ExecParallelHashTableAllocate() to
ExecParallelHashTableAlloc(), and ExecParallelPreallocate() to
ExecParallelHashTuplePrealloc(), to conform with the that style.

> + /* Elect one participant to prepare the operation. */
> Imo that comment could use a one-line summary of what preparing means.


> + /*
> + * We probably also need a smaller bucket array. How many
> + * tuples do we expect per batch, assuming we have only
> + * half of them so far?
> Makes sense, but did cost me a minute of thinking. Maybe add a short
> explanation why.


> + /* Wait for the above to be finished. */
> + BarrierArriveAndWait(&pstate->grow_batches_barrier,
> + /* Fall through. */
> Just to make sure I understand: The "empty" phase is to protect the
> process of the electee doing the sizing calculations etc? And the
> reason that's not possible to do just by waiting for
> PHJ_GROW_BATCHES_REPARTITIONING is that somebody could dynamically
> arrive, i.e. it'd not be needed in a statically sized barrier?

Yeah, it's where you wait for the serial phase above to be finished.
It needs an entry point of its own for the reason you said: someone
might attach while the allocation is taking place, and therefore needs
to wait for the allocation phase to finish. By doing it this way
instead of (say) putting a wait at the start of the
PHJ_GROW_BATCHES_REPARTITIONING case, we allow someone to start
running at PHJ_GROW_BATCHES_REPARTITIONING phase and do useful work
immediately. Does that make sense?

> + /* reset temp memory each time to avoid leaks from qual expr */
> + ResetExprContext(econtext);
> +
> + if (ExecQual(hjclauses, econtext))
> I personally think it's better to avoid this pattern and store the
> result of the ExecQual() in a variable, ResetExprContext() and then act
> on the result. No need to keep memory around for longer, and for bigger
> contexts you're more likely to have all the metadata in cache.
> I'd previously thought about introducing ExecQualAndReset() or such...

Makes sense, but this is code that is identical in
ExecScanHashBucket() so I think we should keep it this way for now,
and explore expression context lifetime improvements in a separate
patch? Looks like the same change could be made in other nodes too.

> * src/backend/executor/nodeHashjoin.c
> *
> + *
> This is a pretty good explanation. How about adding a reference to it
> from nodeHash.c's header?


> +static TupleTableSlot * /* return: a tuple or NULL */
> +ExecHashJoin(PlanState *pstate)
> +{
> + /*
> + * On sufficiently smart compilers this should be inlined with the
> + * parallel-aware branches removed.
> + */
> + return ExecHashJoinImpl(pstate, false);
> Ah, the explanation I desired above is here. Still seems good to have a
> comment at the somewhat suspicious use of always_inline.


> +
> + /*
> + * If we started up so late that the shared batches have been freed
> + * already by ExecHashTableDetach(), then we are finished.
> + */
> + if (!DsaPointerIsValid(hashtable->parallel_state->batches))
> + return false;
> This is really the only place that weird condition is detected? And why
> is that possible in the first place? And if possible, shouldn't we have
> detected earlier? Also, if possible, what prevents this to occur in a
> way that test fails, because pstate->batches is freed, but not yet
> reset?

ExecParallelHashJoinNewBatch(), where this code appears, is generally
the place that we discover that we're finished. Normally we discover
that we're finished by seeing that there are no batches left to chew
on, by inspecting the per-batch state in shmem. This weird condition
arises when a worker starts up so late that the join is finished and
the shmem space used to tracks batches has already been freed. I
agree that that was badly explained and there was in fact something a
bit kooky about that coding. I have now changed it so that
ExecParallelHashEnsureBatchAccessors() detects this case and has a
better comment to explain it, and ExecParallelHashJoinNewBatch() now
just looks out for hashtable->batches == NULL with a comment referring
to the other place.

You can hit this case by hacking the code thus:

--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -986,6 +986,10 @@ ParallelWorkerMain(Datum main_arg)
Assert(ParallelWorkerNumber == -1);
memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));

+ elog(LOG, "delaying startup to stagger workers ...");
+ pg_usleep(ParallelWorkerNumber * 2000000);
+ elog(LOG, "... continuing");

... and then:

create table foo as
select generate_series(1, 1000000)::int a,
'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'::text b;
analyze foo;
set parallel_leader_participation = off;
explain analyze select count(*) from foo r join foo s using (a);

The point is that worker 0 completes the entire multi-batch join and
cleans up, freeing the parallel batch info, and then worker 1 arrives
2 seconds later and it can't even look up the batch info to figure out
which batches are finished. It has to be like that, because worker 0
doesn't even know about worker 1, and as far as it's concerned the
last one out turns out the lights.

> + while ((tuple = sts_parallel_scan_next(hashtable->batches[batchno].inner_tuples,
> + &hashvalue)))
> That's a fairly long line. Couldn't this whole block made more readable by
> using a temp variable for inner_tuples?


While running through every test I could think of I discovered that
the multi-batch parallel-aware hash join with rescan regression test
failed to be multi-batch on 32 bit x86, so I needed to tweak the
number of rows in the table called "bar".

Thomas Munro

Attachment Content-Type Size
parallel-hash-v30.patch application/octet-stream 155.3 KB

In response to


Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Khandekar 2017-12-20 06:22:38 Re: [HACKERS] UPDATE of partition key
Previous Message David Kamholz 2017-12-20 06:03:38 domain cast in parameterized vs. non-parameterized query