Re: Parallel Hash take II

From: Andres Freund <andres(at)anarazel(dot)de>
To: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
Cc: Rushabh Lathia <rushabh(dot)lathia(at)gmail(dot)com>, Prabhat Sahu <prabhat(dot)sahu(at)enterprisedb(dot)com>, Peter Geoghegan <pg(at)bowt(dot)ie>, Robert Haas <robertmhaas(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>, Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Oleg Golovanov <rentech(at)mail(dot)ru>
Subject: Re: Parallel Hash take II
Date: 2017-11-08 03:40:32
Message-ID: 20171108034032.jxjw4xxtxhxzfakx@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

* avoids wasting memory on duplicated hash tables
* avoids wasting disk space on duplicated batch files
* avoids wasting CPU executing duplicate subplans

What's the last one referring to?

+static void
+MultiExecParallelHash(HashState *node)
+{

+ switch (BarrierPhase(build_barrier))
+ {
+ case PHJ_BUILD_ALLOCATING:
+
+ /*
+ * Either I just allocated the initial hash table in
+ * ExecHashTableCreate(), or someone else is doing that. Either
+ * way, wait for everyone to arrive here so we can proceed, and
+ * then fall through.
+ */
+ BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING);

Can you add a /* fallthrough */ comment here? Gcc is warning if you
don't. While we currently have lotsa other places not having the
annotation, it seem reasonable to have it in new code.

+ case PHJ_BUILD_HASHING_INNER:
+
+ /*
+ * It's time to begin hashing, or if we just arrived here then
+ * hashing is already underway, so join in that effort. While
+ * hashing we have to be prepared to help increase the number of
+ * batches or buckets at any time, and if we arrived here when
+ * that was already underway we'll have to help complete that work
+ * immediately so that it's safe to access batches and buckets
+ * below.
+ */
+ if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
+ PHJ_GROW_BATCHES_ELECTING)
+ ExecParallelHashIncreaseNumBatches(hashtable);
+ if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
+ PHJ_GROW_BUCKETS_ELECTING)
+ ExecParallelHashIncreaseNumBuckets(hashtable);
+ ExecParallelHashEnsureBatchAccessors(hashtable);

"accessors" sounds a bit weird for a bunch of pointers, but maybe that's
just my ESL senses tingling wrongly.

/* ----------------------------------------------------------------
@@ -240,12 +427,15 @@ ExecEndHash(HashState *node)
* ----------------------------------------------------------------
*/
HashJoinTable
-ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
+ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)

+ /*
+ * Parallel Hash tries to use the combined work_mem of all workers to
+ * avoid the need to batch. If that won't work, it falls back to work_mem
+ * per worker and tries to process batches in parallel.
+ */

One day we're going to need a better approach to this. I have no idea
how, but this per-node, and now per_node * max_parallelism, approach has
only implementation simplicity as its benefit.

+static HashJoinTuple
+ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple,
+ dsa_pointer *shared)
+{

+static void
+ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
+{

+/*
+ * Get the first tuple in a given bucket identified by number.
+ */
+static HashJoinTuple
+ExecHashFirstTupleInBucket(HashJoinTable hashtable, int bucketno)
+{
+ if (hashtable->parallel_state)
+ {
+ dsa_pointer p =
+ dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);

Can you make this, and possibly a few other places, more readable by
introducing a temporary variable?

+/*
+ * Insert a tuple at the front of a chain of tuples in DSA memory atomically.
+ */
+static void
+ExecParallelHashPushTuple(dsa_pointer_atomic *head,
+ HashJoinTuple tuple,
+ dsa_pointer tuple_shared)
+{
+ do
+ {
+ tuple->next.shared = dsa_pointer_atomic_read(head);
+ } while (!dsa_pointer_atomic_compare_exchange(head,
+ &tuple->next.shared,
+ tuple_shared));
+}

This is hard to read.

+ * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
+ * be used repeatedly as required to coordinate expansions in the number of
+ * batches or buckets. Their phases are as follows:
+ *
+ * PHJ_GROW_BATCHES_ELECTING -- initial state
+ * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
+ * PHJ_GROW_BATCHES_REPARTITIONING -- all rep
s/rep/repartition/?

#include "access/htup_details.h"
+#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/hashjoin.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "utils/memutils.h"
-
+#include "utils/sharedtuplestore.h"

deletes a separator newline.


/* ----------------------------------------------------------------
@@ -138,6 +236,18 @@ ExecHashJoin(PlanState *pstate)
/* no chance to not build the hash table */
node->hj_FirstOuterTupleSlot = NULL;
}
+ else if (hashNode->parallel_state != NULL)
+ {
+ /*
+ * The empty-outer optimization is not implemented for
+ * shared hash tables, because no one participant can
+ * determine that there are no outer tuples, and it's not
+ * yet clear that it's worth the synchronization overhead
+ * of reaching consensus to figure that out. So we have
+ * to build the hash table.
+ */
+ node->hj_FirstOuterTupleSlot = NULL;
+ }

Hm. Isn't MultiExecParallelHash already doing so?

- node->hj_JoinState = HJ_NEED_NEW_OUTER;
+ if (hashtable->parallel_state)
+ {
+ Barrier *build_barrier;
+
+ build_barrier = &hashtable->parallel_state->build_barrier;
+ if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
+ {
+ /*
+ * If multi-batch, we need to hash the outer relation
+ * up front.
+ */
+ if (hashtable->nbatch > 1)
+ ExecParallelHashJoinPartitionOuter(node);
+ BarrierArriveAndWait(build_barrier,
+ WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
+ }
+ Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
+
+ /* Each backend should now select a batch to work on. */
+ hashtable->curbatch = -1;
+ node->hj_JoinState = HJ_NEED_NEW_BATCH;
+
+ continue;
+ }
+ else
+ node->hj_JoinState = HJ_NEED_NEW_OUTER;

You know what I'm going to say about all these branches, and sigh.

If we don't split this into two versions, we at least should store
hashNode->parallel_state in a local var, so the compiler doesn't have to
pull that out of memory after every external function call (of which
there are a lot). In common cases it'll end up in a callee saved
registers, and most of the called functions won't be too register
starved (on x86-64).

+/*
+ * Choose a batch to work on, and attach to it. Returns true if successful,
+ * false if there are no more batches.
+ */
+static bool
+ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
+{

+ /*
+ * This batch is ready to probe. Return control to
+ * caller. We stay attached to batch_barrier so that the
+ * hash table stays alive until everyone's finish probing

*finished?

+ case PHJ_BATCH_DONE:
+
+ /*
+ * Already done. Detach and go around again (if any
+ * remain).
+ */
+ BarrierDetach(batch_barrier);
+
+ /*
+ * We didn't work on this batch, but we need to observe
+ * its size for EXPLAIN.
+ */
+ ExecParallelHashUpdateSpacePeak(hashtable, batchno);
+ hashtable->batches[batchno].done = true;
+ hashtable->curbatch = -1;
+ break;

Hm, maybe I'm missing something, but why is it guaranteed that "we
didn't work on this batch"?

+void
+ExecShutdownHashJoin(HashJoinState *node)
+{
+ /*
+ * By the time ExecEndHashJoin runs in a worker, shared memory has been
+ * destroyed. So this is our last chance to do any shared memory cleanup.
+ */

This comment doesn't really make much sense to me.

+void
+ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
+{

could use a header comment.

a) The executor side is starting to look good.
b) This is a lot of code.
c) I'm tired, planner has to wait till tomorrow.

- Andres

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Kyotaro HORIGUCHI 2017-11-08 04:14:31 Re: Restricting maximum keep segments by repslots
Previous Message Michael Paquier 2017-11-08 02:47:40 Re: [PATCH] A hook for session start