Re: Parallel Full Hash Join

From: Melanie Plageman <melanieplageman(at)gmail(dot)com>
To: Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
Cc: Ian Lawrence Barwick <barwick(at)gmail(dot)com>, Justin Pryzby <pryzby(at)telsasoft(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Greg Nancarrow <gregn4422(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Parallel Full Hash Join
Date: 2023-03-25 20:51:59
Message-ID: 20230325205159.lzqhplrbrdqeug6o@liskov
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Sat, Mar 25, 2023 at 09:21:34AM +1300, Thomas Munro wrote:
> * reuse the same umatched_scan_{chunk,idx} variables as above
> * rename the list of chunks to scan to work_queue
> * fix race/memory leak if we see PHJ_BATCH_SCAN when we attach (it
> wasn't OK to just fall through)

ah, good catch.

> I don't love the way that both ExecHashTableDetachBatch() and
> ExecParallelPrepHashTableForUnmatched() duplicate logic relating to
> the _SCAN/_FREE protocol, but I'm struggling to find a better idea.
> Perhaps I just need more coffee.

I'm not sure if I have strong feelings either way.
To confirm I understand, though: in ExecHashTableDetachBatch(), the call
to BarrierArriveAndDetachExceptLast() serves only to advance the barrier
phase through _SCAN, right? It doesn't really matter if this worker is
the last worker since BarrierArriveAndDetach() handles that for us.
There isn't another barrier function to do this (and I mostly think it
is fine), but I did have to think on it for a bit.

Oh, and, unrelated, but it is maybe worth updating the BarrierAttach()
function comment to mention BarrierArriveAndDetachExceptLast().

> I think your idea of opportunistically joining the scan if it's
> already running makes sense to explore for a later step, ie to make
> multi-batch PHFJ fully fair, and I think that should be a fairly easy
> code change, and I put in some comments where changes would be needed.

makes sense.

I have some very minor pieces of feedback, mainly about extraneous
commas that made me uncomfortable ;)

> From 8b526377eb4a4685628624e75743aedf37dd5bfe Mon Sep 17 00:00:00 2001
> From: Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
> Date: Fri, 24 Mar 2023 14:19:07 +1300
> Subject: [PATCH v12 1/2] Scan for unmatched hash join tuples in memory order.
>
> In a full/right outer join, we need to scan every tuple in the hash
> table to find the ones that were not matched while probing, so that we

Given how you are using the word "so" here, I think that comma before it
is not needed.

> @@ -2083,58 +2079,45 @@ bool
> ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
> {
> HashJoinTable hashtable = hjstate->hj_HashTable;
> - HashJoinTuple hashTuple = hjstate->hj_CurTuple;
> + HashMemoryChunk chunk;
>
> - for (;;)
> + while ((chunk = hashtable->unmatched_scan_chunk))
> {
> - /*
> - * hj_CurTuple is the address of the tuple last returned from the
> - * current bucket, or NULL if it's time to start scanning a new
> - * bucket.
> - */
> - if (hashTuple != NULL)
> - hashTuple = hashTuple->next.unshared;
> - else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
> - {
> - hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
> - hjstate->hj_CurBucketNo++;
> - }
> - else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
> + while (hashtable->unmatched_scan_idx < chunk->used)
> {
> - int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
> + HashJoinTuple hashTuple = (HashJoinTuple)
> + (HASH_CHUNK_DATA(hashtable->unmatched_scan_chunk) +
> + hashtable->unmatched_scan_idx);
>
> - hashTuple = hashtable->skewBucket[j]->tuples;
> - hjstate->hj_CurSkewBucketNo++;
> - }
> - else
> - break; /* finished all buckets */
> + MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
> + int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
>
> - while (hashTuple != NULL)
> - {
> - if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
> - {
> - TupleTableSlot *inntuple;
> + /* next tuple in this chunk */
> + hashtable->unmatched_scan_idx += MAXALIGN(hashTupleSize);
>
> - /* insert hashtable's tuple into exec slot */
> - inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
> - hjstate->hj_HashTupleSlot,
> - false); /* do not pfree */
> - econtext->ecxt_innertuple = inntuple;
> + if (HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
> + continue;
>
> - /*
> - * Reset temp memory each time; although this function doesn't
> - * do any qual eval, the caller will, so let's keep it
> - * parallel to ExecScanHashBucket.
> - */
> - ResetExprContext(econtext);

I don't think I had done this before. Good call.

> + /* insert hashtable's tuple into exec slot */
> + econtext->ecxt_innertuple =
> + ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
> + hjstate->hj_HashTupleSlot,
> + false);

> From 6f4e82f0569e5b388440ca0ef268dd307388e8f8 Mon Sep 17 00:00:00 2001
> From: Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
> Date: Fri, 24 Mar 2023 15:23:14 +1300
> Subject: [PATCH v12 2/2] Parallel Hash Full Join.
>
> Full and right outer joins were not supported in the initial
> implementation of Parallel Hash Join, because of deadlock hazards (see

no comma needed before the "because" here

> discussion). Therefore FULL JOIN inhibited page-based parallelism,
> as the other join strategies can't do it either.

I actually don't quite understand what this means? It's been awhile for
me, so perhaps I'm being dense, but what is page-based parallelism?
Also, I would put a comma after "Therefore" :)

> Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on
> the inner side of one batch's hash table. For now, sidestep the
> deadlock problem by terminating parallelism there. The last process to
> arrive at that phase emits the unmatched tuples, while others detach and
> are free to go and work on other batches, if there are any, but
> otherwise they finish the join early.
>
> That unfairness is considered acceptable for now, because it's better
> than no parallelism at all. The build and probe phases are run in
> parallel, and the new scan-for-unmatched phase, while serial, is usually
> applied to the smaller of the two relations and is either limited by
> some multiple of work_mem, or it's too big and is partitioned into
> batches and then the situation is improved by batch-level parallelism.
> In future work on deadlock avoidance strategies, we may find a way to
> parallelize the new phase safely.

Is it worth mentioning something about parallel-oblivious parallel hash
join not being able to do this still? Or is that obvious?

> *
> @@ -2908,6 +3042,12 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
> chunk->next.shared = hashtable->batches[curbatch].shared->chunks;
> hashtable->batches[curbatch].shared->chunks = chunk_shared;
>
> + /*
> + * Also make this the head of the work_queue list. This is used as a
> + * cursor for scanning all chunks in the batch.
> + */
> + hashtable->batches[curbatch].shared->work_queue = chunk_shared;
> +
> if (size <= HASH_CHUNK_THRESHOLD)
> {
> /*
> @@ -3116,18 +3256,31 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
> {
> int curbatch = hashtable->curbatch;
> ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
> + bool attached = true;
>
> /* Make sure any temporary files are closed. */
> sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
> sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
>
> - /* Detach from the batch we were last working on. */
> - if (BarrierArriveAndDetach(&batch->batch_barrier))
> + /* After attaching we always get at least to PHJ_BATCH_PROBE. */
> + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
> + BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
> +
> + /*
> + * Even if we aren't doing a full/right outer join, we'll step through
> + * the PHJ_BATCH_SCAN phase just to maintain the invariant that freeing
> + * happens in PHJ_BATCH_FREE, but that'll be wait-free.
> + */
> + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)

full/right joins should never fall into this code path, right?

If so, would we be able to assert about that? Maybe it doesn't make
sense, though...

> + attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
> + if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
> {
> /*
> - * Technically we shouldn't access the barrier because we're no
> - * longer attached, but since there is no way it's moving after
> - * this point it seems safe to make the following assertion.
> + * We are not longer attached to the batch barrier, but we're the
> + * process that was chosen to free resources and it's safe to
> + * assert the current phase. The ParallelHashJoinBatch can't go
> + * away underneath us while we are attached to the build barrier,
> + * making this access safe.
> */
> Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);

Otherwise, LGTM.

- Melanie

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2023-03-25 21:08:52 Re: what should install-world do when docs are not available?
Previous Message Tom Lane 2023-03-25 20:40:03 Re: what should install-world do when docs are not available?