Re: WIP: [[Parallel] Shared] Hash

From: Andres Freund <andres(at)anarazel(dot)de>
To: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
Cc: Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Peter Geoghegan <pg(at)heroku(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: WIP: [[Parallel] Shared] Hash
Date: 2017-03-07 23:58:54
Message-ID: 20170307235854.rjmpiza6ztfy5wmm@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

0001: Do hash join work_mem accounting in chunks.

Don't think there's much left to say.

0002: Check hash join work_mem usage at the point of chunk allocation.

Modify the existing hash join code to detect work_mem exhaustion at
the point where chunks are allocated, instead of checking after every
tuple insertion. This matches the logic used for estimating, and more
importantly allows for some parallelism in later patches.

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 406c180..af1b66d 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -48,7 +48,8 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
int bucketNumber);
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);

-static void *dense_alloc(HashJoinTable hashtable, Size size);
+static void *dense_alloc(HashJoinTable hashtable, Size size,
+ bool respect_work_mem);

I still dislike this, but maybe Robert's point of:

On 2017-02-16 08:57:21 -0500, Robert Haas wrote:
> On Wed, Feb 15, 2017 at 9:36 PM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> > Isn't it kinda weird to do this from within dense_alloc()? I mean that
> > dumps a lot of data to disk, frees a bunch of memory and so on - not
> > exactly what "dense_alloc" implies. Isn't the free()ing part also
> > dangerous, because the caller might actually use some of that memory,
> > like e.g. in ExecHashRemoveNextSkewBucket() or such. I haven't looked
> > deeply enough to check whether that's an active bug, but it seems like
> > inviting one if not.
>
> I haven't looked at this, but one idea might be to just rename
> dense_alloc() to ExecHashBlahBlahSomething(). If there's a real
> abstraction layer problem here then we should definitely fix it, but
> maybe it's just the angle at which you hold your head.

Is enough.

0003: Scan for unmatched tuples in a hash join one chunk at a time.

@@ -1152,8 +1155,65 @@ bool
ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
- HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+ HashJoinTuple hashTuple;
+ MinimalTuple tuple;
+
+ /*
+ * First, process the queue of chunks holding tuples that are in regular
+ * (non-skew) buckets.
+ */
+ for (;;)
+ {
+ /* Do we need a new chunk to scan? */
+ if (hashtable->current_chunk == NULL)
+ {
+ /* Have we run out of chunks to scan? */
+ if (hashtable->unmatched_chunks == NULL)
+ break;
+
+ /* Pop the next chunk from the front of the queue. */
+ hashtable->current_chunk = hashtable->unmatched_chunks;
+ hashtable->unmatched_chunks = hashtable->current_chunk->next;
+ hashtable->current_chunk_index = 0;
+ }
+
+ /* Have we reached the end of this chunk yet? */
+ if (hashtable->current_chunk_index >= hashtable->current_chunk->used)
+ {
+ /* Go around again to get the next chunk from the queue. */
+ hashtable->current_chunk = NULL;
+ continue;
+ }
+
+ /* Take the next tuple from this chunk. */
+ hashTuple = (HashJoinTuple)
+ (hashtable->current_chunk->data + hashtable->current_chunk_index);
+ tuple = HJTUPLE_MINTUPLE(hashTuple);
+ hashtable->current_chunk_index +=
+ MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
+
+ /* Is it unmatched? */
+ if (!HeapTupleHeaderHasMatch(tuple))
+ {
+ TupleTableSlot *inntuple;
+
+ /* insert hashtable's tuple into exec slot */
+ inntuple = ExecStoreMinimalTuple(tuple,
+ hjstate->hj_HashTupleSlot,
+ false); /* do not pfree */
+ econtext->ecxt_innertuple = inntuple;
+
+ /* reset context each time (see below for explanation) */
+ ResetExprContext(econtext);
+ return true;
+ }
+ }

I suspect this might actually be slower than the current/old logic,
because the current_chunk tests are repeated every loop. I think
retaining the two loops the previous code had makes sense, i.e. one to
find a relevant chunk, and one to iterate through all tuples in a chunk,
checking for an unmatched one.

Have you run a performance comparison pre/post this patch? I don't
think there'd be a lot, but it seems important to verify that. I'd just
run a tpc-h pre/post comparison (prewarmed, fully cache resident,
parallelism disabled, hugepages is my personal recipe for the least
run-over-run variance).

0004: Add a barrier primitive for synchronizing backends.

+/*-------------------------------------------------------------------------
+ *
+ * barrier.c
+ * Barriers for synchronizing cooperating processes.
+ *
+ * Copyright (c) 2017, PostgreSQL Global Development Group
+ *
+ * This implementation of barriers allows for static sets of participants
+ * known up front, or dynamic sets of participants which processes can join
+ * or leave at any time. In the dynamic case, a phase number can be used to
+ * track progress through a parallel algorithm; in the static case it isn't
+ * needed.

Why would a phase id generally not be needed in the static case?
There's also further references to it ("Increments the current phase.")
that dont quite jive with that.

+ * IDENTIFICATION
+ * src/backend/storage/ipc/barrier.c

This could use a short example usage scenario. Without knowing existing
usages of the "pattern", it's probably hard to grasp.

+ *-------------------------------------------------------------------------
+ */
+
+#include "storage/barrier.h"

Aren't you missing an include of postgres.h here?

To quote postgres.h:
* This should be the first file included by PostgreSQL backend modules.
* Client-side code should include postgres_fe.h instead.

+bool
+BarrierWait(Barrier *barrier, uint32 wait_event_info)
+{
+ bool first;
+ bool last;
+ int start_phase;
+ int next_phase;
+
+ SpinLockAcquire(&barrier->mutex);
+ start_phase = barrier->phase;
+ next_phase = start_phase + 1;
+ ++barrier->arrived;
+ if (barrier->arrived == 1)
+ first = true;
+ else
+ first = false;
+ if (barrier->arrived == barrier->participants)
+ {
+ last = true;
+ barrier->arrived = 0;
+ barrier->phase = next_phase;
+ }
+ else
+ last = false;
+ SpinLockRelease(&barrier->mutex);

Hm. So what's the defined concurrency protocol for non-static barriers,
when they attach after the spinlock here has been released? I think the
concurrency aspects deserve some commentary. Afaics it'll correctly
just count as the next phase - without any blocking - but that shouldn't
have to be inferred. Things might get wonky if that new participant
then starts waiting for the new phase, violating the assert below...

+ /*
+ * Otherwise we have to wait for the last participant to arrive and
+ * advance the phase.
+ */
+ ConditionVariablePrepareToSleep(&barrier->condition_variable);
+ for (;;)
+ {
+ bool advanced;
+
+ SpinLockAcquire(&barrier->mutex);
+ Assert(barrier->phase == start_phase || barrier->phase == next_phase);
+ advanced = barrier->phase == next_phase;
+ SpinLockRelease(&barrier->mutex);
+ if (advanced)
+ break;

+ ConditionVariableSleep(&barrier->condition_variable, wait_event_info);
+ }
+ ConditionVariableCancelSleep();
+
+ return first;
+}

+/*
+ * Detach from a barrier. This may release other waiters from BarrierWait and
+ * advance the phase, if they were only waiting for this backend. Return
+ * true if this participant was the last to detach.
+ */
+bool
+BarrierDetach(Barrier *barrier)
+{
+ bool release;
+ bool last;
+
+ SpinLockAcquire(&barrier->mutex);
+ Assert(barrier->participants > 0);
+ --barrier->participants;
+
+ /*
+ * If any other participants are waiting and we were the last participant
+ * waited for, release them.
+ */
+ if (barrier->participants > 0 &&
+ barrier->arrived == barrier->participants)
+ {
+ release = true;
+ barrier->arrived = 0;
+ barrier->phase++;
+ }
+ else
+ release = false;
+
+ last = barrier->participants == 0;
+ SpinLockRelease(&barrier->mutex);
+
+ if (release)
+ ConditionVariableBroadcast(&barrier->condition_variable);
+
+ return last;
+}

Doesn't this, again, run into danger of leading to an assert failure in
the loop in BarrierWait?

+++ b/src/include/storage/barrier.h
@@ -0,0 +1,42 @@
+/*-------------------------------------------------------------------------
+ *
+ * barrier.h
+ * Barriers for synchronizing workers.
+ *
+ * Copyright (c) 2017, PostgreSQL Global Development Group
+ *
+ * src/include/storage/barrier.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef BARRIER_H
+#define BARRIER_H
+
+/*
+ * For the header previously known as "barrier.h", please include
+ * "port/atomics.h", which deals with atomics, compiler barriers and memory
+ * barriers.
+ */
+
+#include "postgres.h"

Huh, that normally shouldn't be in a header. I see you introduced that
in a bunch of other places too - that really doesn't look right to me.

- Andres

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2017-03-08 00:02:04 Re: Bizarre choice of case for RELKIND_PARTITIONED_TABLE
Previous Message Neha Khatri 2017-03-07 23:58:11 Re: [NOVICE] opr_charset rule in gram.y