Re: WIP: [[Parallel] Shared] Hash

From: Andres Freund <andres(at)anarazel(dot)de>
To: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
Cc: Peter Geoghegan <pg(at)bowt(dot)ie>, Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>, Robert Haas <robertmhaas(at)gmail(dot)com>
Subject: Re: WIP: [[Parallel] Shared] Hash
Date: 2017-03-28 20:31:41
Message-ID: 20170328203141.3ixg2nmdtxtsk4cq@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 2017-03-27 22:33:03 -0700, Andres Freund wrote:
> On 2017-03-23 20:35:09 +1300, Thomas Munro wrote:
> > Here is a new patch series responding to feedback from Peter and Andres:
>
> Here's a review of 0007 & 0010 together - they're going to have to be
> applied together anyway...
> ...
> ok, ENOTIME for today...

Continuing, where I dropped of tiredly yesterday.

- ExecHashJoinSaveTuple(tuple,
- hashvalue,
- &hashtable->innerBatchFile[batchno]);
+ if (HashJoinTableIsShared(hashtable))
+ sts_puttuple(hashtable->shared_inner_batches, batchno, &hashvalue,
+ tuple);
+ else
+ ExecHashJoinSaveTuple(tuple,
+ hashvalue,
+ &hashtable->innerBatchFile[batchno]);
}
}

Why isn't this done inside of ExecHashJoinSaveTuple?

@@ -1280,6 +1785,68 @@ ExecHashTableReset(HashJoinTable hashtable)

+ /* Rewind the shared read heads for this batch, inner and outer. */
+ sts_prepare_parallel_read(hashtable->shared_inner_batches,
+ curbatch);
+ sts_prepare_parallel_read(hashtable->shared_outer_batches,
+ curbatch);

It feels somewhat wrong to do this in here, rather than on the callsites.

+ }
+
+ /*
+ * Each participant needs to make sure that data it has written for
+ * this partition is now read-only and visible to other participants.
+ */
+ sts_end_write(hashtable->shared_inner_batches, curbatch);
+ sts_end_write(hashtable->shared_outer_batches, curbatch);
+
+ /*
+ * Wait again, so that all workers see the new hash table and can
+ * safely read from batch files from any participant because they have
+ * all ended writing.
+ */
+ Assert(BarrierPhase(&hashtable->shared->barrier) ==
+ PHJ_PHASE_RESETTING_BATCH(curbatch));
+ BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_RESETTING);
+ Assert(BarrierPhase(&hashtable->shared->barrier) ==
+ PHJ_PHASE_LOADING_BATCH(curbatch));
+ ExecHashUpdate(hashtable);
+
+ /* Forget the current chunks. */
+ hashtable->current_chunk = NULL;
+ return;
+ }

/*
* Release all the hash buckets and tuples acquired in the prior pass, and
@@ -1289,10 +1856,10 @@ ExecHashTableReset(HashJoinTable hashtable)
oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);

/* Reallocate and reinitialize the hash bucket headers. */
- hashtable->buckets = (HashJoinTuple *)
- palloc0(nbuckets * sizeof(HashJoinTuple));
+ hashtable->buckets = (HashJoinBucketHead *)
+ palloc0(nbuckets * sizeof(HashJoinBucketHead));

- hashtable->spaceUsed = nbuckets * sizeof(HashJoinTuple);
+ hashtable->spaceUsed = nbuckets * sizeof(HashJoinBucketHead);

/* Cannot be more than our previous peak; we had this size before. */
Assert(hashtable->spaceUsed <= hashtable->spacePeak);
@@ -1301,6 +1868,22 @@ ExecHashTableReset(HashJoinTable hashtable)

/* Forget the chunks (the memory was freed by the context reset above). */
hashtable->chunks = NULL;
+
+ /* Rewind the shared read heads for this batch, inner and outer. */
+ if (hashtable->innerBatchFile[curbatch] != NULL)
+ {
+ if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, SEEK_SET))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rewind hash-join temporary file: %m")));
+ }
+ if (hashtable->outerBatchFile[curbatch] != NULL)
+ {
+ if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rewind hash-join temporary file: %m")));
+ }
}

/*
@@ -1310,12 +1893,21 @@ ExecHashTableReset(HashJoinTable hashtable)
void
ExecHashTableResetMatchFlags(HashJoinTable hashtable)
{
+ dsa_pointer chunk_shared = InvalidDsaPointer;
HashMemoryChunk chunk;
HashJoinTuple tuple;
int i;

/* Reset all flags in the main table ... */
- chunk = hashtable->chunks;
+ if (HashJoinTableIsShared(hashtable))
+ {
+ /* This only runs in the leader during rescan initialization. */
+ Assert(!IsParallelWorker());
+ hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
+ chunk = pop_chunk_queue(hashtable, &chunk_shared);
+ }
+ else
+ chunk = hashtable->chunks;

Hm - doesn't pop_chunk_queue empty the work queue?

+/*
+ * Load a tuple into shared dense storage, like 'load_private_tuple'. This
+ * version is for shared hash tables.
+ */
+static HashJoinTuple
+load_shared_tuple(HashJoinTable hashtable, MinimalTuple tuple,
+ dsa_pointer *shared, bool respect_work_mem)
+{

Hm. Are there issues with "blessed" records being stored in shared
memory? I seem to recall you talking about it, but I see nothing
addressing the issue here? (later) Ah, I see - you just prohibit
paralleism in that case - might be worth pointing to.

+ /* Check if some other participant has increased nbatch. */
+ if (hashtable->shared->nbatch > hashtable->nbatch)
+ {
+ Assert(respect_work_mem);
+ ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch);
+ }
+
+ /* Check if we need to help shrinking. */
+ if (hashtable->shared->shrink_needed && respect_work_mem)
+ {
+ hashtable->current_chunk = NULL;
+ LWLockRelease(&hashtable->shared->chunk_lock);
+ return NULL;
+ }
+
+ /* Oversized tuples get their own chunk. */
+ if (size > HASH_CHUNK_THRESHOLD)
+ chunk_size = size + HASH_CHUNK_HEADER_SIZE;
+ else
+ chunk_size = HASH_CHUNK_SIZE;
+
+ /* If appropriate, check if work_mem would be exceeded by a new chunk. */
+ if (respect_work_mem &&
+ hashtable->shared->grow_enabled &&
+ hashtable->shared->nbatch <= MAX_BATCHES_BEFORE_INCREASES_STOP &&
+ (hashtable->shared->size +
+ chunk_size) > (work_mem * 1024L *
+ hashtable->shared->planned_participants))
+ {
+ /*
+ * It would be exceeded. Let's increase the number of batches, so we
+ * can try to shrink the hash table.
+ */
+ hashtable->shared->nbatch *= 2;
+ ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch);
+ hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
+ hashtable->shared->chunks = InvalidDsaPointer;
+ hashtable->shared->shrink_needed = true;
+ hashtable->current_chunk = NULL;
+ LWLockRelease(&hashtable->shared->chunk_lock);
+
+ /* The caller needs to shrink the hash table. */
+ return NULL;
+ }

Hm - we could end up calling ExecHashIncreaseNumBatches twice here?
Probably harmless.

/* ----------------------------------------------------------------
* ExecHashJoin
@@ -129,6 +200,14 @@ ExecHashJoin(HashJoinState *node)
/* no chance to not build the hash table */
node->hj_FirstOuterTupleSlot = NULL;
}
+ else if (hashNode->shared_table_data != NULL)
+ {
+ /*
+ * The empty-outer optimization is not implemented for
+ * shared hash tables yet.
+ */
+ node->hj_FirstOuterTupleSlot = NULL;

Hm, why is this checking for the shared-ness of the join in a different
manner?

+ if (HashJoinTableIsShared(hashtable))
+ {
+ /*
+ * An important optimization: if this is a
+ * single-batch join and not an outer join, there is
+ * no reason to synchronize again when we've finished
+ * probing.
+ */
+ Assert(BarrierPhase(&hashtable->shared->barrier) ==
+ PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
+ if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node))
+ return NULL; /* end of join */
+
+ /*
+ * Check if we are a leader that can't go further than
+ * probing the first batch, to avoid risk of deadlock
+ * against workers.
+ */
+ if (!LeaderGateCanContinue(&hashtable->shared->leader_gate))
+ {
+ /*
+ * Other backends will need to handle all future
+ * batches written by me. We don't detach until
+ * after we've finished writing to all batches so
+ * that they are flushed, otherwise another
+ * participant might try to read them too soon.
+ */
+ sts_end_write_all_partitions(hashNode->shared_inner_batches);
+ sts_end_write_all_partitions(hashNode->shared_outer_batches);
+ BarrierDetach(&hashtable->shared->barrier);
+ hashtable->detached_early = true;
+ return NULL;
+ }
+
+ /*
+ * We can't start searching for unmatched tuples until
+ * all participants have finished probing, so we
+ * synchronize here.
+ */
+ Assert(BarrierPhase(&hashtable->shared->barrier) ==
+ PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
+ if (BarrierWait(&hashtable->shared->barrier,
+ WAIT_EVENT_HASHJOIN_PROBING))
+ {
+ /* Serial phase: prepare for unmatched. */
+ if (HJ_FILL_INNER(node))
+ {
+ hashtable->shared->chunk_work_queue =
+ hashtable->shared->chunks;
+ hashtable->shared->chunks = InvalidDsaPointer;
+ }
+ }

Couldn't we skip that if this isn't an outer join? Not sure if the
complication would be worth it...

+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+ /*
+ * By the time ExecEndHashJoin runs in a work, shared memory has been

s/work/worker/

+ * destroyed. So this is our last chance to do any shared memory cleanup.
+ */
+ if (node->hj_HashTable)
+ ExecHashTableDetach(node->hj_HashTable);
+}

+ There is no extra charge
+ * for probing the hash table for outer path row, on the basis that
+ * read-only access to a shared hash table shouldn't be any more
+ * expensive.
+ */

Hm, that's debatable. !shared will mostly be on the local numa node,
shared probably not.

* Get hash table size that executor would use for inner relation.
*
+ * Shared hash tables are allowed to use the work_mem of all participants
+ * combined to make up for the fact that there is only one copy shared by
+ * all.

Hm. I don't quite understand that reasoning.

* XXX for the moment, always assume that skew optimization will be
* performed. As long as SKEW_WORK_MEM_PERCENT is small, it's not worth
* trying to determine that for sure.

If we don't do skew for parallelism, should we skip that bit?

- Andres

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Mark Dilger 2017-03-28 20:36:44 Re: Monitoring roles patch
Previous Message Tom Lane 2017-03-28 20:17:12 Re: Monitoring roles patch