Re: WIP: [[Parallel] Shared] Hash

From: Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>
To: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
Cc: Andres Freund <andres(at)anarazel(dot)de>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Peter Geoghegan <pg(at)heroku(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: WIP: [[Parallel] Shared] Hash
Date: 2017-03-13 07:40:49
Message-ID: CAOGQiiP+cQ6zVq4YS2UZdGq3jcqR3c6FrvFN6KxLuJ1WtrSscQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox
Thread:
Lists: pgsql-hackers

On Thu, Mar 9, 2017 at 5:28 PM, Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
wrote:

> On Wed, Mar 8, 2017 at 12:58 PM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> > 0002: Check hash join work_mem usage at the point of chunk allocation.
> >
> > Modify the existing hash join code to detect work_mem exhaustion at
> > the point where chunks are allocated, instead of checking after every
> > tuple insertion. This matches the logic used for estimating, and more
> > importantly allows for some parallelism in later patches.
> >
> > diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/
> nodeHash.c
> > index 406c180..af1b66d 100644
> > --- a/src/backend/executor/nodeHash.c
> > +++ b/src/backend/executor/nodeHash.c
> > @@ -48,7 +48,8 @@ static void ExecHashSkewTableInsert(HashJoinTable
> hashtable,
> > int bucketNumber);
> > static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
> >
> > -static void *dense_alloc(HashJoinTable hashtable, Size size);
> > +static void *dense_alloc(HashJoinTable hashtable, Size size,
> > + bool respect_work_mem);
> >
> > I still dislike this, but maybe Robert's point of:
> >
> > On 2017-02-16 08:57:21 -0500, Robert Haas wrote:
> >> On Wed, Feb 15, 2017 at 9:36 PM, Andres Freund <andres(at)anarazel(dot)de>
> wrote:
> >> > Isn't it kinda weird to do this from within dense_alloc()? I mean
> that
> >> > dumps a lot of data to disk, frees a bunch of memory and so on - not
> >> > exactly what "dense_alloc" implies. Isn't the free()ing part also
> >> > dangerous, because the caller might actually use some of that memory,
> >> > like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked
> >> > deeply enough to check whether that's an active bug, but it seems like
> >> > inviting one if not.
> >>
> >> I haven't looked at this, but one idea might be to just rename
> >> dense_alloc() to ExecHashBlahBlahSomething(). If there's a real
> >> abstraction layer problem here then we should definitely fix it, but
> >> maybe it's just the angle at which you hold your head.
> >
> > Is enough.
>
> There is a problem here. It can determine that it needs to increase
> the number of batches, effectively splitting the current batch, but
> then the caller goes on to insert the current tuple anyway, even
> though it may no longer belong in this batch. I will post a fix for
> that soon. I will also refactor it so that it doesn't do that work
> inside dense_alloc. You're right, that's too weird.
>
> In the meantime, here is a new patch series addressing the other
> things you raised.
>
> > 0003: Scan for unmatched tuples in a hash join one chunk at a time.
> >
> >
> > @@ -1152,8 +1155,65 @@ bool
> > ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext
> *econtext)
> > {
> > HashJoinTable hashtable = hjstate->hj_HashTable;
> > - HashJoinTuple hashTuple = hjstate->hj_CurTuple;
> > + HashJoinTuple hashTuple;
> > + MinimalTuple tuple;
> > +
> > + /*
> > + * First, process the queue of chunks holding tuples that are in
> regular
> > + * (non-skew) buckets.
> > + */
> > + for (;;)
> > + {
> > + /* Do we need a new chunk to scan? */
> > + if (hashtable->current_chunk == NULL)
> > + {
> > + /* Have we run out of chunks to scan? */
> > + if (hashtable->unmatched_chunks == NULL)
> > + break;
> > +
> > + /* Pop the next chunk from the front of the
> queue. */
> > + hashtable->current_chunk =
> hashtable->unmatched_chunks;
> > + hashtable->unmatched_chunks =
> hashtable->current_chunk->next;
> > + hashtable->current_chunk_index = 0;
> > + }
> > +
> > + /* Have we reached the end of this chunk yet? */
> > + if (hashtable->current_chunk_index >=
> hashtable->current_chunk->used)
> > + {
> > + /* Go around again to get the next chunk from
> the queue. */
> > + hashtable->current_chunk = NULL;
> > + continue;
> > + }
> > +
> > + /* Take the next tuple from this chunk. */
> > + hashTuple = (HashJoinTuple)
> > + (hashtable->current_chunk->data +
> hashtable->current_chunk_index);
> > + tuple = HJTUPLE_MINTUPLE(hashTuple);
> > + hashtable->current_chunk_index +=
> > + MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
> > +
> > + /* Is it unmatched? */
> > + if (!HeapTupleHeaderHasMatch(tuple))
> > + {
> > + TupleTableSlot *inntuple;
> > +
> > + /* insert hashtable's tuple into exec slot */
> > + inntuple = ExecStoreMinimalTuple(tuple,
> > +
> hjstate->hj_HashTupleSlot,
> > +
> false); /* do not pfree */
> > + econtext->ecxt_innertuple = inntuple;
> > +
> > + /* reset context each time (see below for
> explanation) */
> > + ResetExprContext(econtext);
> > + return true;
> > + }
> > + }
> >
> > I suspect this might actually be slower than the current/old logic,
> > because the current_chunk tests are repeated every loop. I think
> > retaining the two loops the previous code had makes sense, i.e. one to
> > find a relevant chunk, and one to iterate through all tuples in a chunk,
> > checking for an unmatched one.
>
> Ok, I've updated it to use two loops as suggested. I couldn't measure
> any speedup as a result but it's probably better code that way.
>
> > Have you run a performance comparison pre/post this patch? I don't
> > think there'd be a lot, but it seems important to verify that. I'd just
> > run a tpc-h pre/post comparison (prewarmed, fully cache resident,
> > parallelism disabled, hugepages is my personal recipe for the least
> > run-over-run variance).
>
> I haven't been able to measure any difference in TPCH results yet. I
> tried to contrive a simple test where there is a measurable
> difference. I created a pair of tables and repeatedly ran two FULL
> OUTER JOIN queries. In Q1 no unmatched tuples are found in the hash
> table, and in Q2 every tuple in the hash table turns out to be
> unmatched. I consistently measure just over 10% improvement.
>
> CREATE TABLE t1 AS
> SELECT generate_series(1, 10000000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
> ';
>
> CREATE TABLE t2 AS
> SELECT generate_series(10000001, 20000000) AS id,
> 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
>
> SET work_mem = '1GB';
>
> -- Q1
> SELECT COUNT(*)
> FROM t1 FULL OUTER JOIN t1 other USING (id);
>
> -- Q2
> SELECT COUNT(*)
> FROM t1 FULL OUTER JOIN t2 USING (id);
>
> master: Q1 = 9.280s, Q2 = 9.645s
> 0003-hj-refactor-unmatched-v6.patch: Q1 = 8.341s, Q2 = 8.661s
> 0003-hj-refactor-unmatched-v7.patch: Q1 = 8.186s, Q2 = 8.642s
>
> > 0004: Add a barrier primitive for synchronizing backends.
> >
> >
> > +/*---------------------------------------------------------
> ----------------
> > + *
> > + * barrier.c
> > + * Barriers for synchronizing cooperating processes.
> > + *
> > + * Copyright (c) 2017, PostgreSQL Global Development Group
> > + *
> > + * This implementation of barriers allows for static sets of
> participants
> > + * known up front, or dynamic sets of participants which processes can
> join
> > + * or leave at any time. In the dynamic case, a phase number can be
> used to
> > + * track progress through a parallel algorithm; in the static case it
> isn't
> > + * needed.
> >
> > Why would a phase id generally not be needed in the static case?
> > There's also further references to it ("Increments the current phase.")
> > that dont quite jive with that.
>
> I've extended that text at the top to explain.
>
> Short version: there is always a phase internally; that comment refers
> to the need for client code to examine it. Dynamic barrier users
> probably need to care what it is, since progress can be made while
> they're not attached so they need a way to find out about that after
> they attach, but static barriers generally don't need to care about
> the phase number because nothing can happen without explicit action
> from all participants so they should be in sync automatically.
> Hopefully the new comments explain that better.
>
> > + * IDENTIFICATION
> > + * src/backend/storage/ipc/barrier.c
> >
> > This could use a short example usage scenario. Without knowing existing
> > usages of the "pattern", it's probably hard to grasp.
>
> Examples added.
>
> > + *-----------------------------------------------------------
> --------------
> > + */
> > +
> > +#include "storage/barrier.h"
> >
> > Aren't you missing an include of postgres.h here?
>
> Fixed.
>
> > +bool
> > +BarrierWait(Barrier *barrier, uint32 wait_event_info)
> > +{
> > + bool first;
> > + bool last;
> > + int start_phase;
> > + int next_phase;
> > +
> > + SpinLockAcquire(&barrier->mutex);
> > + start_phase = barrier->phase;
> > + next_phase = start_phase + 1;
> > + ++barrier->arrived;
> > + if (barrier->arrived == 1)
> > + first = true;
> > + else
> > + first = false;
> > + if (barrier->arrived == barrier->participants)
> > + {
> > + last = true;
> > + barrier->arrived = 0;
> > + barrier->phase = next_phase;
> > + }
> > + else
> > + last = false;
> > + SpinLockRelease(&barrier->mutex);
> >
> > Hm. So what's the defined concurrency protocol for non-static barriers,
> > when they attach after the spinlock here has been released? I think the
> > concurrency aspects deserve some commentary. Afaics it'll correctly
> > just count as the next phase - without any blocking - but that shouldn't
> > have to be inferred.
>
> It may join at start_phase or next_phase depending on what happened
> above. If it we just advanced the phase (by being the last to arrive)
> then another backend that attaches will be joining at phase ==
> next_phase, and if that new backend calls BarrierWait it'll be waiting
> for the phase after that.
>
> > Things might get wonky if that new participant
> > then starts waiting for the new phase, violating the assert below...
>
> > + Assert(barrier->phase == start_phase || barrier->phase
> == next_phase);
>
> I've added a comment near that assertion that explains the reason the
> assertion holds.
>
> Short version: The caller is attached, so there is no way for the
> phase to advance beyond next_phase without the caller's participation;
> the only possibilities to consider in the wait loop are "we're still
> waiting" or "the final participant arrived or detached, advancing the
> phase and releasing me".
>
> Put another way, no waiting backend can ever see phase advance beyond
> next_phase, because in order to do so, the waiting backend would need
> to run BarrierWait again; barrier->arrived can never reach
> barrier->participants a second time while we're in that wait loop.
>
> > +/*
> > + * Detach from a barrier. This may release other waiters from
> BarrierWait and
> > + * advance the phase, if they were only waiting for this backend.
> Return
> > + * true if this participant was the last to detach.
> > + */
> > +bool
> > +BarrierDetach(Barrier *barrier)
> > +{
> > + bool release;
> > + bool last;
> > +
> > + SpinLockAcquire(&barrier->mutex);
> > + Assert(barrier->participants > 0);
> > + --barrier->participants;
> > +
> > + /*
> > + * If any other participants are waiting and we were the last
> participant
> > + * waited for, release them.
> > + */
> > + if (barrier->participants > 0 &&
> > + barrier->arrived == barrier->participants)
> > + {
> > + release = true;
> > + barrier->arrived = 0;
> > + barrier->phase++;
> > + }
> > + else
> > + release = false;
> > +
> > + last = barrier->participants == 0;
> > + SpinLockRelease(&barrier->mutex);
> > +
> > + if (release)
> > + ConditionVariableBroadcast(&
> barrier->condition_variable);
> > +
> > + return last;
> > +}
> >
> > Doesn't this, again, run into danger of leading to an assert failure in
> > the loop in BarrierWait?
>
> I believe this code is correct. The assertion in BarrierWait can't
> fail, because waiters know that there is no way for the phase to get
> any further ahead without their help (because they are attached):
> again, the only possibilities are phase == start_phase (implying that
> they received a spurious condition variable signal) or phase ==
> next_phase (the last backend being waited on has finally arrived or
> detached, allowing other participants to proceed).
>
> I've attached a test module that starts N workers, and makes the
> workers attach, call BarrierWait a random number of times, then
> detach, and then rinse and repeat, until the phase reaches some large
> number and they all exit. This exercises every interleaving of the
> attach, wait, detach. CREATE EXTENSION test_barrier, then something
> like SELECT test_barrier_reattach_random(4, 1000000) to verify that no
> assertions are thrown and it always completes.
>
> > +#include "postgres.h"
> >
> > Huh, that normally shouldn't be in a header. I see you introduced that
> > in a bunch of other places too - that really doesn't look right to me.
>
> Fixed.
>
> In an attempt to test v7 of this patch on TPC-H 20 scale factor I found a
few regressions,
Q21: 52 secs on HEAD and 400 secs with this patch
Q8: 8 secs on HEAD to 14 secs with patch

However, to avoid me being framed as some sort of "jinx" [;)] I'd like to
report a few cases of improvements also,
Q3: improved to 44 secs from 58 secs on HEAD
Q9: 81 secs on HEAD to 48 secs with patch
Q10: improved to 47 secs from 57 secs on HEAD
Q14: 9 secs on HEAD to 5 secs with patch

The details of this experimental setup is as follows,
scale-factor: 20
work_mem = 1GB
shared_buffers = 10GB

For the output plans on head and with patch please find the attached tar
file. In case, you require any more information please let me know.
--
Regards,
Rafia Sabih
EnterpriseDB: http://www.enterprisedb.com/

Attachment Content-Type Size
ph.tar.gz application/x-gzip 68.1 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2017-03-13 08:03:51 Re: WIP: Faster Expression Processing v4
Previous Message Masahiko Sawada 2017-03-13 07:33:33 Typo in snapbuild.c