Re: WIP: [[Parallel] Shared] Hash

From: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Peter Geoghegan <pg(at)heroku(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: WIP: [[Parallel] Shared] Hash
Date: 2017-03-09 11:58:53
Message-ID: CAEepm=1oZsEvnB3xtjavEEx-P02ivQV0_Adt=vQjftmn-nahng@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox
Thread:
Lists: pgsql-hackers

On Wed, Mar 8, 2017 at 12:58 PM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> 0002: Check hash join work_mem usage at the point of chunk allocation.
>
> Modify the existing hash join code to detect work_mem exhaustion at
> the point where chunks are allocated, instead of checking after every
> tuple insertion. This matches the logic used for estimating, and more
> importantly allows for some parallelism in later patches.
>
> diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
> index 406c180..af1b66d 100644
> --- a/src/backend/executor/nodeHash.c
> +++ b/src/backend/executor/nodeHash.c
> @@ -48,7 +48,8 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
> int bucketNumber);
> static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
>
> -static void *dense_alloc(HashJoinTable hashtable, Size size);
> +static void *dense_alloc(HashJoinTable hashtable, Size size,
> + bool respect_work_mem);
>
> I still dislike this, but maybe Robert's point of:
>
> On 2017-02-16 08:57:21 -0500, Robert Haas wrote:
>> On Wed, Feb 15, 2017 at 9:36 PM, Andres Freund <andres(at)anarazel(dot)de> wrote:
>> > Isn't it kinda weird to do this from within dense_alloc()? I mean that
>> > dumps a lot of data to disk, frees a bunch of memory and so on - not
>> > exactly what "dense_alloc" implies. Isn't the free()ing part also
>> > dangerous, because the caller might actually use some of that memory,
>> > like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked
>> > deeply enough to check whether that's an active bug, but it seems like
>> > inviting one if not.
>>
>> I haven't looked at this, but one idea might be to just rename
>> dense_alloc() to ExecHashBlahBlahSomething(). If there's a real
>> abstraction layer problem here then we should definitely fix it, but
>> maybe it's just the angle at which you hold your head.
>
> Is enough.

There is a problem here. It can determine that it needs to increase
the number of batches, effectively splitting the current batch, but
then the caller goes on to insert the current tuple anyway, even
though it may no longer belong in this batch. I will post a fix for
that soon. I will also refactor it so that it doesn't do that work
inside dense_alloc. You're right, that's too weird.

In the meantime, here is a new patch series addressing the other
things you raised.

> 0003: Scan for unmatched tuples in a hash join one chunk at a time.
>
>
> @@ -1152,8 +1155,65 @@ bool
> ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
> {
> HashJoinTable hashtable = hjstate->hj_HashTable;
> - HashJoinTuple hashTuple = hjstate->hj_CurTuple;
> + HashJoinTuple hashTuple;
> + MinimalTuple tuple;
> +
> + /*
> + * First, process the queue of chunks holding tuples that are in regular
> + * (non-skew) buckets.
> + */
> + for (;;)
> + {
> + /* Do we need a new chunk to scan? */
> + if (hashtable->current_chunk == NULL)
> + {
> + /* Have we run out of chunks to scan? */
> + if (hashtable->unmatched_chunks == NULL)
> + break;
> +
> + /* Pop the next chunk from the front of the queue. */
> + hashtable->current_chunk = hashtable->unmatched_chunks;
> + hashtable->unmatched_chunks = hashtable->current_chunk->next;
> + hashtable->current_chunk_index = 0;
> + }
> +
> + /* Have we reached the end of this chunk yet? */
> + if (hashtable->current_chunk_index >= hashtable->current_chunk->used)
> + {
> + /* Go around again to get the next chunk from the queue. */
> + hashtable->current_chunk = NULL;
> + continue;
> + }
> +
> + /* Take the next tuple from this chunk. */
> + hashTuple = (HashJoinTuple)
> + (hashtable->current_chunk->data + hashtable->current_chunk_index);
> + tuple = HJTUPLE_MINTUPLE(hashTuple);
> + hashtable->current_chunk_index +=
> + MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
> +
> + /* Is it unmatched? */
> + if (!HeapTupleHeaderHasMatch(tuple))
> + {
> + TupleTableSlot *inntuple;
> +
> + /* insert hashtable's tuple into exec slot */
> + inntuple = ExecStoreMinimalTuple(tuple,
> + hjstate->hj_HashTupleSlot,
> + false); /* do not pfree */
> + econtext->ecxt_innertuple = inntuple;
> +
> + /* reset context each time (see below for explanation) */
> + ResetExprContext(econtext);
> + return true;
> + }
> + }
>
> I suspect this might actually be slower than the current/old logic,
> because the current_chunk tests are repeated every loop. I think
> retaining the two loops the previous code had makes sense, i.e. one to
> find a relevant chunk, and one to iterate through all tuples in a chunk,
> checking for an unmatched one.

Ok, I've updated it to use two loops as suggested. I couldn't measure
any speedup as a result but it's probably better code that way.

> Have you run a performance comparison pre/post this patch? I don't
> think there'd be a lot, but it seems important to verify that. I'd just
> run a tpc-h pre/post comparison (prewarmed, fully cache resident,
> parallelism disabled, hugepages is my personal recipe for the least
> run-over-run variance).

I haven't been able to measure any difference in TPCH results yet. I
tried to contrive a simple test where there is a measurable
difference. I created a pair of tables and repeatedly ran two FULL
OUTER JOIN queries. In Q1 no unmatched tuples are found in the hash
table, and in Q2 every tuple in the hash table turns out to be
unmatched. I consistently measure just over 10% improvement.

CREATE TABLE t1 AS
SELECT generate_series(1, 10000000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';

CREATE TABLE t2 AS
SELECT generate_series(10000001, 20000000) AS id,
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';

SET work_mem = '1GB';

-- Q1
SELECT COUNT(*)
FROM t1 FULL OUTER JOIN t1 other USING (id);

-- Q2
SELECT COUNT(*)
FROM t1 FULL OUTER JOIN t2 USING (id);

master: Q1 = 9.280s, Q2 = 9.645s
0003-hj-refactor-unmatched-v6.patch: Q1 = 8.341s, Q2 = 8.661s
0003-hj-refactor-unmatched-v7.patch: Q1 = 8.186s, Q2 = 8.642s

> 0004: Add a barrier primitive for synchronizing backends.
>
>
> +/*-------------------------------------------------------------------------
> + *
> + * barrier.c
> + * Barriers for synchronizing cooperating processes.
> + *
> + * Copyright (c) 2017, PostgreSQL Global Development Group
> + *
> + * This implementation of barriers allows for static sets of participants
> + * known up front, or dynamic sets of participants which processes can join
> + * or leave at any time. In the dynamic case, a phase number can be used to
> + * track progress through a parallel algorithm; in the static case it isn't
> + * needed.
>
> Why would a phase id generally not be needed in the static case?
> There's also further references to it ("Increments the current phase.")
> that dont quite jive with that.

I've extended that text at the top to explain.

Short version: there is always a phase internally; that comment refers
to the need for client code to examine it. Dynamic barrier users
probably need to care what it is, since progress can be made while
they're not attached so they need a way to find out about that after
they attach, but static barriers generally don't need to care about
the phase number because nothing can happen without explicit action
from all participants so they should be in sync automatically.
Hopefully the new comments explain that better.

> + * IDENTIFICATION
> + * src/backend/storage/ipc/barrier.c
>
> This could use a short example usage scenario. Without knowing existing
> usages of the "pattern", it's probably hard to grasp.

Examples added.

> + *-------------------------------------------------------------------------
> + */
> +
> +#include "storage/barrier.h"
>
> Aren't you missing an include of postgres.h here?

Fixed.

> +bool
> +BarrierWait(Barrier *barrier, uint32 wait_event_info)
> +{
> + bool first;
> + bool last;
> + int start_phase;
> + int next_phase;
> +
> + SpinLockAcquire(&barrier->mutex);
> + start_phase = barrier->phase;
> + next_phase = start_phase + 1;
> + ++barrier->arrived;
> + if (barrier->arrived == 1)
> + first = true;
> + else
> + first = false;
> + if (barrier->arrived == barrier->participants)
> + {
> + last = true;
> + barrier->arrived = 0;
> + barrier->phase = next_phase;
> + }
> + else
> + last = false;
> + SpinLockRelease(&barrier->mutex);
>
> Hm. So what's the defined concurrency protocol for non-static barriers,
> when they attach after the spinlock here has been released? I think the
> concurrency aspects deserve some commentary. Afaics it'll correctly
> just count as the next phase - without any blocking - but that shouldn't
> have to be inferred.

It may join at start_phase or next_phase depending on what happened
above. If it we just advanced the phase (by being the last to arrive)
then another backend that attaches will be joining at phase ==
next_phase, and if that new backend calls BarrierWait it'll be waiting
for the phase after that.

> Things might get wonky if that new participant
> then starts waiting for the new phase, violating the assert below...

> + Assert(barrier->phase == start_phase || barrier->phase == next_phase);

I've added a comment near that assertion that explains the reason the
assertion holds.

Short version: The caller is attached, so there is no way for the
phase to advance beyond next_phase without the caller's participation;
the only possibilities to consider in the wait loop are "we're still
waiting" or "the final participant arrived or detached, advancing the
phase and releasing me".

Put another way, no waiting backend can ever see phase advance beyond
next_phase, because in order to do so, the waiting backend would need
to run BarrierWait again; barrier->arrived can never reach
barrier->participants a second time while we're in that wait loop.

> +/*
> + * Detach from a barrier. This may release other waiters from BarrierWait and
> + * advance the phase, if they were only waiting for this backend. Return
> + * true if this participant was the last to detach.
> + */
> +bool
> +BarrierDetach(Barrier *barrier)
> +{
> + bool release;
> + bool last;
> +
> + SpinLockAcquire(&barrier->mutex);
> + Assert(barrier->participants > 0);
> + --barrier->participants;
> +
> + /*
> + * If any other participants are waiting and we were the last participant
> + * waited for, release them.
> + */
> + if (barrier->participants > 0 &&
> + barrier->arrived == barrier->participants)
> + {
> + release = true;
> + barrier->arrived = 0;
> + barrier->phase++;
> + }
> + else
> + release = false;
> +
> + last = barrier->participants == 0;
> + SpinLockRelease(&barrier->mutex);
> +
> + if (release)
> + ConditionVariableBroadcast(&barrier->condition_variable);
> +
> + return last;
> +}
>
> Doesn't this, again, run into danger of leading to an assert failure in
> the loop in BarrierWait?

I believe this code is correct. The assertion in BarrierWait can't
fail, because waiters know that there is no way for the phase to get
any further ahead without their help (because they are attached):
again, the only possibilities are phase == start_phase (implying that
they received a spurious condition variable signal) or phase ==
next_phase (the last backend being waited on has finally arrived or
detached, allowing other participants to proceed).

I've attached a test module that starts N workers, and makes the
workers attach, call BarrierWait a random number of times, then
detach, and then rinse and repeat, until the phase reaches some large
number and they all exit. This exercises every interleaving of the
attach, wait, detach. CREATE EXTENSION test_barrier, then something
like SELECT test_barrier_reattach_random(4, 1000000) to verify that no
assertions are thrown and it always completes.

> +#include "postgres.h"
>
> Huh, that normally shouldn't be in a header. I see you introduced that
> in a bunch of other places too - that really doesn't look right to me.

Fixed.

--
Thomas Munro
http://www.enterprisedb.com

Attachment Content-Type Size
parallel-shared-hash-v7.tgz application/x-gzip 55.1 KB
test-barrier.patch application/octet-stream 6.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Alexander Korotkov 2017-03-09 12:00:57 Re: [PATCH] kNN for btree
Previous Message David Rowley 2017-03-09 11:51:13 Re: Foreign Join pushdowns not working properly for outer joins