Re: WIP: [[Parallel] Shared] Hash

From: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Peter Geoghegan <pg(at)bowt(dot)ie>, Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>, Robert Haas <robertmhaas(at)gmail(dot)com>
Subject: Re: WIP: [[Parallel] Shared] Hash
Date: 2017-03-31 04:53:12
Message-ID: CAEepm=1VjSE8Jskrz1UF9JJh9sD_wwVF468H3ZXxMD97_HxMHg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi hackers,

Thanks very much to Rafia for testing, and to Andres for his copious
review feedback. Here's a new version. Changes:

1. Keep all the backing files that are part of a BufFileSet in
subdirectories, as suggested by Andres. Now, instead of that
unpopular logic for scanning ranges of possible file paths to delete,
we can just blow away whole directories that group sets of related
files.

2. Don't expose 'participant' and 'partition' concepts, Andres didn't
like much, in the BufFile API. There is a new concept 'stripe' which
client code of BufFileSet can use to specify the participant number in
a more general way without saying so: it's really just a way to spread
files over tablespaces. I'm not sure if tablespaces are really used
that way much, but it seemed like Peter wasn't going to be too happy
with a proposal that didn't do *something* to respect the existing
temp_tablespaces GUC beahviour (and he'd be right). But I didn't
think it would make any kind of sense at all to stripe by 1GB segments
as private BufFiles do when writing from multiple processes, as I have
argued elsewhere, hence this scheme.

The 'qunique' function used here (basically poor man's std::unique) is
one I proposed earlier, with the name suggested by Tom Lane:

See https://www.postgresql.org/message-id/flat/CAEepm%3D2vmFTNpAmwbGGD2WaryM6T3hSDVKQPfUwjdD_5XY6vAA%40mail.gmail.com
.

3. Merged the single-batch and multi-batch patches into one.
EarlierI had the idea that it was easier to review them in layers
since I hoped people might catch a glimpse of the central simplicity
without being hit by a wall of multi-batch logic, but since Andres is
reviewing and disagrees, I give you 0010-hj-parallel-v11.patch which
weighs in at 32 files changed, 2278 insertions(+), 250 deletions(-).

4. Moved the DSM handling to the every end of resowner.c's cleanup.
Peter pointed out that it would otherwise happen before fd.c Files are
closed. He was concerned about a different aspect of that which I'm
not sure I fully understand, but at the very least it seemed to
represent a significant problem for this design on Windows. I
discussed this briefly with Robert off-list and he told me that there
is probably no good reason for the ordering that we have, and what's
more, there may be good arguments even outside this case for DSM
segments being cleaned up as late as possible, now that they contain
shared control information and not just tuple data as once had been
imagined. I can't think of any reason why this would not be safe.
Can you?

5. The empty inner relation optimisation implemented.

Some smaller changes and miles of feedback inline below:

On Mon, Mar 27, 2017 at 11:03 AM, Thomas Munro
<thomas(dot)munro(at)enterprisedb(dot)com> wrote:
> On Mon, Mar 27, 2017 at 9:41 AM, Andres Freund <andres(at)anarazel(dot)de> wrote:
>> SharedBufFile allows temporary files to be created by one backend and
>> then exported for read-only access by other backends, with clean-up
>> managed by reference counting associated with a DSM segment. This includes
>> changes to fd.c and buffile.c to support new kinds of temporary file.
>>
>>
>> diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
>> index 4ca0ea4..a509c05 100644
>> --- a/src/backend/storage/file/buffile.c
>> +++ b/src/backend/storage/file/buffile.c
>>
>> I think the new facilities should be explained in the file's header.
>
> Will do.

Done.

>> @@ -68,9 +71,10 @@ struct BufFile
>> * avoid making redundant FileSeek calls.
>> */
>>
>> - bool isTemp; /* can only add files if this is TRUE */
>> + bool isSegmented; /* can only add files if this is TRUE */
>>
>> That's a bit of a weird and uncommented upon change.
>
> I was trying to cut down on the number of places we use the word
> 'temporary' to activate various different behaviours. In this case,
> the only thing it controls is whether the BufFile is backed by one
> single fd.c File or many segments, so I figured it should be renamed.
>
> As Peter and you have pointed out, there may be a case for removing it
> altogether.

Done in 0007-hj-remove-buf-file-is-temp-v11.patch.

>> @@ -79,6 +83,8 @@ struct BufFile
>> */
>> ResourceOwner resowner;
>>
>> + BufFileTag tag; /* for discoverability between backends */
>>
>> Not perfectly happy with the name tag here, the name is a bit too
>> similar to BufferTag - something quite different.
>
> Yeah, will rename.

Done. That existed only because I had sharedbuffile.c which needed
special access to buffile.c via those weird 'tag' interfaces. In the
new version that isn't required, and a new struct BufFileSet is
provided by buffile.c/h.

>> +static void
>> +make_tagged_path(char *tempdirpath, char *tempfilepath,
>> + const BufFileTag *tag, int segment)
>> +{
>> + if (tag->tablespace == DEFAULTTABLESPACE_OID ||
>> + tag->tablespace == GLOBALTABLESPACE_OID)
>> + snprintf(tempdirpath, MAXPGPATH, "base/%s", PG_TEMP_FILES_DIR);
>> + else
>> + {
>> + snprintf(tempdirpath, MAXPGPATH, "pg_tblspc/%u/%s/%s",
>> + tag->tablespace, TABLESPACE_VERSION_DIRECTORY,
>> + PG_TEMP_FILES_DIR);
>> + }
>> +
>> + snprintf(tempfilepath, MAXPGPATH, "%s/%s%d.%d.%d.%d.%d", tempdirpath,
>> + PG_TEMP_FILE_PREFIX,
>> + tag->creator_pid, tag->set, tag->partition, tag->participant,
>> + segment);
>>
>> Is there a risk that this ends up running afoul of filename length
>> limits on some platforms?

The names are shorter now, and split over two levels:

pgsql_tmp37303.2.set/pgsql_tmp.p30.b0.0

>> If we do decide not to change this: Why is that sufficient? Doesn't the
>> same problem exist for segments later than the first?
>
> It does exist and it is handled. The comment really should say
> "unlinking segment N + 1 (if it exists) before creating segment N".
> Will update.

I got rid of this. This doesn't come up anymore because the patch now
blows away whole directories. There is never a case where files left
over after a crash-restart would confuse us. There may be left over
directories, but if we find that we can't create a directory, we try
to delete it and all its contents first (ie to see if there was a
leftover directory from before a crash-restart) and then try again, so
individual segment files shouldn't be able to confuse us.

>> + * PathNameCreateTemporaryFile, PathNameOpenTemporaryFile and
>> + * PathNameDeleteTemporaryFile are used for temporary files that may be shared
>> + * between backends. A File created or opened with these functions is not
>> + * automatically deleted when the file is closed, but it is automatically
>> + * closed and end of transaction and counts agains the temporary file limit of
>> + * the backend that created it. Any File created this way must be explicitly
>> + * deleted with PathNameDeleteTemporaryFile. Automatic file deletion is not
>> + * provided because this interface is designed for use by buffile.c and
>> + * indirectly by sharedbuffile.c to implement temporary files with shared
>> + * ownership and cleanup.
>>
>> Hm. Those name are pretty easy to misunderstand, no? s/Temp/Shared/?
>
> Hmm. Yeah these may be better. Will think about that.

I like these names. This is fd.c providing named temporary files.
They are definitely temporary files still: they participate in the
total temp limit and logging/pgstat and they are automatically closed.
The only different things are: they have names permitting opening by
other backends, and (it follows) are not automatically deleted on
close. buffile.c takes over that duty using a BufFileSet.

>> +File
>> +PathNameOpenTemporaryFile(char *tempfilepath)
>> +{
>> + File file;
>> +
>> + /*
>> + * Open the file. Note: we don't use O_EXCL, in case there is an orphaned
>> + * temp file that can be reused.
>> + */
>> + file = PathNameOpenFile(tempfilepath, O_RDONLY | PG_BINARY, 0);
>>
>> If so, wouldn't we need to truncate the file?
>
> Yes, this lacks O_TRUNC. Thanks.

Actually the reason I did that is because I wanted to open the file
with O_RDONLY, which is incompatible with O_TRUNC. Misleading comment
removed.

>> + * A single SharedBufFileSet can manage any number of 'tagged' BufFiles that
>> + * are shared between a fixed number of participating backends. Each shared
>> + * BufFile can be written to by a single participant but can be read by any
>> + * backend after it has been 'exported'. Once a given BufFile is exported, it
>> + * becomes read-only and cannot be extended. To create a new shared BufFile,
>> + * a participant needs its own distinct participant number, and needs to
>> + * specify an arbitrary partition number for the file. To make it available
>> + * to other backends, it must be explicitly exported, which flushes internal
>> + * buffers and renders it read-only. To open a file that has been shared, a
>> + * backend needs to know the number of the participant that created the file,
>> + * and the partition number. It is the responsibily of calling code to ensure
>> + * that files are not accessed before they have been shared.
>>
>> Hm. One way to make this safer would be to rename files when exporting.
>> Should be sufficient to do this to the first segment, I guess.
>
> Interesting idea. Will think about that. That comment isn't great
> and repeats itself. Will improve.

Comment improved. I haven't investigated a file-renaming scheme for
exporting files yet.

>> + * Each file is identified by a partition number and a participant number, so
>> + * that a SharedBufFileSet can be viewed as a 2D table of individual files.
>>
>> I think using "files" as a term here is a bit dangerous - they're
>> individually segmented again, right?
>
> True. It's a 2D matrix of BufFiles. The word "file" is super
> overloaded here. Will fix.

No longer present.

>> +/*
>> + * The number of bytes of shared memory required to construct a
>> + * SharedBufFileSet.
>> + */
>> +Size
>> +SharedBufFileSetSize(int participants)
>> +{
>> + return offsetof(SharedBufFileSet, participants) +
>> + sizeof(SharedBufFileParticipant) * participants;
>> +}
>>
>> The function name sounds a bit like a function actuallize setting some
>> size... s/Size/DetermineSize/?
>
> Hmm yeah "set" as verb vs "set" as noun. I think "estimate" is the
> established word for this sort of thing (even though that seems
> strange because it sounds like it doesn't have to be exactly right:
> clearly in all these shmem-space-reservation functions it has to be
> exactly right). Will change.

Done. (Of course 'estimate' is both a noun and a verb too, and for
extra points pronounced differently...)

>>
>> +/*
>> + * Create a new file suitable for sharing. Each backend that calls this must
>> + * use a distinct participant number. Behavior is undefined if a participant
>> + * calls this more than once for the same partition number. Partitions should
>> + * ideally be numbered consecutively or in as small a range as possible,
>> + * because file cleanup will scan the range of known partitions looking for
>> + * files.
>> + */
>>
>> Wonder if we shouldn't just create a directory for all such files.
>
> Hmm. Yes, that could work well. Will try that.

Done.

>> I'm a bit unhappy with the partition terminology around this. It's
>> getting a bit confusing. We have partitions, participants and
>> segements. Most of them could be understood for something entirely
>> different than the meaning you have here...
>
> Ok. Let me try to explain [explanation...].
>
> (Perhaps SharedBufFileSet should be called PartitionedBufFileSet?)

I got rid of most of that terminology. Now I have BufFileSet which is
a set of named BufFiles and it's up to client code to manage the
namespace within it. SharedTuplestore happens to build names that
include partition and participant numbers, but that's its business.
There is also a 'stripe' number, which is used as a way to spread
files across multiple temp_tablespaces.

>> +static void
>> +shared_buf_file_on_dsm_detach(dsm_segment *segment, Datum datum)
>> +{
>> + bool unlink_files = false;
>> + SharedBufFileSet *set = (SharedBufFileSet *) DatumGetPointer(datum);
>> +
>> + SpinLockAcquire(&set->mutex);
>> + Assert(set->refcount > 0);
>> + if (--set->refcount == 0)
>> + unlink_files = true;
>> + SpinLockRelease(&set->mutex);
>>
>> I'm a bit uncomfortable with releasing a refcount, and then still using
>> the memory from the set... I don't think there's a concrete danger
>> here as the code stands, but it's a fairly dangerous pattern.
>
> Will fix.

I could fix that but I'd feel bad about doing more work while holding
the spinlock (even though it can't possibly be contended because we
are the last to detach). I have added a comment to explain that it's
safe to continue accessing the DSM segment while in this function
body.

On Mon, Mar 27, 2017 at 10:47 AM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> On 2017-03-23 20:35:09 +1300, Thomas Munro wrote:
>> Here is a new patch series responding to feedback from Peter and Andres:
>
> +
> +/* Per-participant shared state. */
> +typedef struct SharedTuplestoreParticipant
> +{
> + LWLock lock;
>
> Hm. No padding (ala LWLockMinimallyPadded / LWLockPadded) - but that's
> probably ok, for now.

I hunted around but didn't see an idiom for making this whole struct
cacheline-sized.

> + bool error; /* Error occurred flag. */
> + bool eof; /* End of file reached. */
> + int read_fileno; /* BufFile segment file number. */
> + off_t read_offset; /* Offset within segment file. */
>
> Hm. I wonder if it'd not be better to work with 64bit offsets, and just
> separate that out upon segment access.

This falls out of the current two-part BufFileTell and BufFileSeek
interface. Since translation could be done trivially
(single_address_space_offset = fileno * MAX_PHYSICAL_FILESIZE +
offset), that might be a reasonable refactoring, but it seems to be
material for a separate patch, considering that other client code
would be affected, no?

> +/* The main data structure in shared memory. */
>
> "main data structure" isn't particularly meaningful.

Fixed.

> +struct SharedTuplestore
> +{
> + int reading_partition;
> + int nparticipants;
> + int flags;
>
> Maybe add a comment saying /* flag bits from SHARED_TUPLESTORE_* */?

Done.

> + Size meta_data_size;
>
> What's this?

Comments added to every struct member.

> + SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
>
> I'd add a comment here, that there's further data after participants.

Done.

> +};
>
> +
> +/* Per-participant backend-private state. */
> +struct SharedTuplestoreAccessor
> +{
>
> Hm. The name and it being backend-local are a bit conflicting.

Hmm. It's a (SharedTupleStore) Accessor, not a Shared (...). Not
sure if we have an established convention for this kind of thing...

> + int participant; /* My partitipant number. */
> + SharedTuplestore *sts; /* The shared state. */
> + int nfiles; /* Size of local files array. */
> + BufFile **files; /* Files we have open locally for writing. */
>
> Shouldn't this mention that it's indexed by partition?

Done.

> + BufFile *read_file; /* The current file to read from. */
> + int read_partition; /* The current partition to read from. */
> + int read_participant; /* The current participant to read from. */
> + int read_fileno; /* BufFile segment file number. */
> + off_t read_offset; /* Offset within segment file. */
> +};
>
>
> +/*
> + * Initialize a SharedTuplestore in existing shared memory. There must be
> + * space for sts_size(participants) bytes. If flags is set to the value
> + * SHARED_TUPLESTORE_SINGLE_PASS then each partition may only be read once,
> + * because underlying files will be deleted.
>
> Any reason not to use flags that are compatible with tuplestore.c?

tuplestore.c uses some executor.h flags like EXEC_FLAG_MARK.
sharedtuplestore.c's interface and capabilities are extremely
primitive and only really let it do exactly what I needed to do here.
Namely, every participant writes into its own set of partition files,
and then all together we perform a single "partial scan" in some
undefined order to get all the tuples back and share them out between
backends. Extending it to behave more like the real tuplestore may be
interesting for other projects (dynamic partitioning etc) but it
didn't seem like a good idea to speculate on what exactly would be
needed. This particular flag means 'please delete individual backing
files as we go after reading them', and I don't believe there is any
equivalent; someone thought the private HJ should do that so I figured
I should do it here too.

> + * Tuples that are stored may optionally carry a piece of fixed sized
> + * meta-data which will be retrieved along with the tuple. This is useful for
> + * the hash codes used for multi-batch hash joins, but could have other
> + * applications.
> + */
> +SharedTuplestoreAccessor *
> +sts_initialize(SharedTuplestore *sts, int participants,
> + int my_participant_number,
> + Size meta_data_size,
> + int flags,
> + dsm_segment *segment)
> +{
>
> Not sure I like that the naming here has little in common with
> tuplestore.h's api.

Hmm. I feel like its interface needs to be significantly different to
express the things it needs to do, especially at initialisation. As
for the tuple write/write interface, how would you improve this?

sts_puttuple(...);
sts_puttuple(...);
...
sts_end_write_all_partitions(...);

sts_prepare_partial_scan(...); /* in one backend only */
sts_begin_partial_scan(...);
... = sts_gettuple(...);
... = sts_gettuple(...);
...
sts_end_partial_scan(...);

One thought that I keep having: the private hash join code should also
use tuplestore. But a smarter tuplestore that knows how to hold onto
the hash value (the meta-data in my sharedtuplestore.c) and knows
about partitions (batches). It would be nice if the private and
shared batching code finished up harmonised in this respect.

> +
> +MinimalTuple
> +sts_gettuple(SharedTuplestoreAccessor *accessor, void *meta_data)
> +{
>
> This needs docs.

Done.

> + SharedBufFileSet *fileset = GetSharedBufFileSet(accessor->sts);
> + MinimalTuple tuple = NULL;
> +
> + for (;;)
> + {
>
> ...
> + /* Check if this participant's file has already been entirely read. */
> + if (participant->eof)
> + {
> + BufFileClose(accessor->read_file);
> + accessor->read_file = NULL;
> + LWLockRelease(&participant->lock);
> + continue;
>
> Why are we closing the file while holding the lock?

Fixed.

> +
> + /* Read the optional meta-data. */
> + eof = false;
> + if (accessor->sts->meta_data_size > 0)
> + {
> + nread = BufFileRead(accessor->read_file, meta_data,
> + accessor->sts->meta_data_size);
> + if (nread == 0)
> + eof = true;
> + else if (nread != accessor->sts->meta_data_size)
> + ereport(ERROR,
> + (errcode_for_file_access(),
> + errmsg("could not read from temporary file: %m")));
> + }
> +
> + /* Read the size. */
> + if (!eof)
> + {
> + nread = BufFileRead(accessor->read_file, &tuple_size, sizeof(tuple_size));
> + if (nread == 0)
> + eof = true;
>
> Why is it legal to have EOF here, if metadata previously didn't have an
> EOF? Perhaps add an error if accessor->sts->meta_data_size != 0?

Improved comments.

> + if (eof)
> + {
> + participant->eof = true;
> + if ((accessor->sts->flags & SHARED_TUPLESTORE_SINGLE_PASS) != 0)
> + SharedBufFileDestroy(fileset, accessor->read_partition,
> + accessor->read_participant);
> +
> + participant->error = false;
> + LWLockRelease(&participant->lock);
> +
> + /* Move to next participant's file. */
> + BufFileClose(accessor->read_file);
> + accessor->read_file = NULL;
> + continue;
> + }
> +
> + /* Read the tuple. */
> + tuple = (MinimalTuple) palloc(tuple_size);
> + tuple->t_len = tuple_size;
>
> Hm. Constantly re-allocing this doesn't strike me as a good idea (not to
> mention that the API doesn't mention this is newly allocated). Seems
> like it'd be a better idea to have a per-accessor buffer where this can
> be stored in - increased in size when necessary.

Done.

On Tue, Mar 28, 2017 at 6:33 PM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> On 2017-03-23 20:35:09 +1300, Thomas Munro wrote:
>> Here is a new patch series responding to feedback from Peter and Andres:
>
> Here's a review of 0007 & 0010 together - they're going to have to be
> applied together anyway...

I have now merged them FWIW.

> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
> index ac339fb566..775c9126c7 100644
> --- a/doc/src/sgml/config.sgml
> +++ b/doc/src/sgml/config.sgml
> @@ -3814,6 +3814,21 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
> </listitem>
> </varlistentry>
>
> + <varlistentry id="guc-cpu-shared-tuple-cost" xreflabel="cpu_shared_tuple_cost">
> + <term><varname>cpu_shared_tuple_cost</varname> (<type>floating point</type>)
> + <indexterm>
> + <primary><varname>cpu_shared_tuple_cost</> configuration parameter</primary>
> + </indexterm>
> + </term>
> + <listitem>
> + <para>
> + Sets the planner's estimate of the cost of sharing rows in
> + memory during a parallel query.
> + The default is 0.001.
> + </para>
> + </listitem>
> + </varlistentry>
> +
>
> Isn't that really low in comparison to the other costs? I think
> specifying a bit more what this actually measures would be good too - is
> it putting the tuple in shared memory? Is it accessing it?

Yeah. It was really just to make the earlier Shared Hash consistently
more expensive than private Hash, by a tiny amount. Then it wouldn't
kick in until it could help you avoid batching.

I will try to come up with some kind of argument based on data...

> + <varlistentry id="guc-cpu-synchronization-cost" xreflabel="cpu_synchronization_cost">
> + <term><varname>cpu_synchronization_cost</varname> (<type>floating point</type>)
> + <indexterm>
> + <primary><varname>cpu_synchronization_cost</> configuration parameter</primary>
> + </indexterm>
> + </term>
> + <listitem>
> + <para>
> + Sets the planner's estimate of the cost of waiting at synchronization
> + points for other processes while executing parallel queries.
> + The default is 1.0.
> + </para>
> + </listitem>
> + </varlistentry>
>
> Isn't this also really cheap in comparison to a, probably cached, seq
> page read?

It's not really the synchronisation primitive itself, which is fast,
it's how long the other guys may spend doing other stuff before they
reach the barrier. Currently we have a block granularity parallel
query system, so really this is an estimation of how long the average
participant will have to wait for the last of its peers to finish
chewing on up to one page of tuples from its (ultimate) source of
parallelism. Yeah I'm waffling a bit because I don't have a
principled answer to this question yet...

> + if (HashJoinTableIsShared(hashtable))
> + {
> + /*
> + * Synchronize parallel hash table builds. At this stage we know that
> + * the shared hash table has been created, but we don't know if our
> + * peers are still in MultiExecHash and if so how far through. We use
> + * the phase to synchronize with them.
> + */
> + barrier = &hashtable->shared->barrier;
> +
> + switch (BarrierPhase(barrier))
> + {
> + case PHJ_PHASE_BEGINNING:
>
> Note pgindent will indent this further. Might be worthwhile to try to
> pgindent the file, revert some of the unintended damage.

Fixed switch statement indentation. I will try pgindent soon and see
how badly it all breaks.

> /*
> * set expression context
> */
>
> I'd still like this to be moved to the start.

Done.

> @@ -126,17 +202,79 @@ MultiExecHash(HashState *node)
> /* Not subject to skew optimization, so insert normally */
> ExecHashTableInsert(hashtable, slot, hashvalue);
> }
> - hashtable->totalTuples += 1;
> + hashtable->partialTuples += 1;
> + if (!HashJoinTableIsShared(hashtable))
> + hashtable->totalTuples += 1;
> }
> }
>
> FWIW, I'd put HashJoinTableIsShared() into a local var - the compiler
> won't be able to do that on its own because external function calls
> could invalidate the result.

Done in in the hot loops.

> That brings me to a related topic: Have you measured whether your
> changes cause performance differences?

I have never succeeded in measuring any reproducible difference
between master with 0 workers and my patch with the 0 workers on
various contrived queries and TPCH queries (except the ones where my
patch makes certain outer joins faster for known reasons). I suspect
it just spends to much time ping ponging in and out of the node for
each tuple for tiny differences in coding to show up. But I could be
testing for the wrong things...

> + finish_loading(hashtable);
>
> I find the sudden switch to a different naming scheme in the same file a
> bit jarring.

Ok. I have now changed all of the static functions in nodeHash.c from
foo_bar to ExecHashFooBar.

> + if (HashJoinTableIsShared(hashtable))
> + BarrierDetach(&hashtable->shared->shrink_barrier);
> +
> + if (HashJoinTableIsShared(hashtable))
> + {
>
> Consecutive if blocks with the same condition...

Fixed.

>
> + bool elected_to_resize;
> +
> + /*
> + * Wait for all backends to finish building. If only one worker is
> + * running the building phase because of a non-partial inner plan, the
> + * other workers will pile up here waiting. If multiple worker are
> + * building, they should finish close to each other in time.
> + */
>
> That comment is outdated, isn't it?

Yes, fixed.

> /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
> - if (hashtable->nbuckets != hashtable->nbuckets_optimal)
> - ExecHashIncreaseNumBuckets(hashtable);
> + ExecHashUpdate(hashtable);
> + ExecHashIncreaseNumBuckets(hashtable);
>
> So this now doesn't actually increase the number of buckets anymore.

Well that function always returned if found there were already enough
buckets, so either the test at call site or in the function was
redundant. I have renamed it to ExecHashIncreaseNumBucketsIfNeeded()
to make that clearer.

> + reinsert:
> + /* If the table was resized, insert tuples into the new buckets. */
> + ExecHashUpdate(hashtable);
> + ExecHashReinsertAll(hashtable);
>
> ReinsertAll just happens to do nothing if we didn't have to
> resize... Not entirely obvious, sure reads as if it were unconditional.
> Also, it's not actually "All" when batching is in use, no?

Renamed to ExecHashReinsertHashtableIfNeeded.

> + post_resize:
> + if (HashJoinTableIsShared(hashtable))
> + {
> + Assert(BarrierPhase(barrier) == PHJ_PHASE_RESIZING);
> + BarrierWait(barrier, WAIT_EVENT_HASH_RESIZING);
> + Assert(BarrierPhase(barrier) == PHJ_PHASE_REINSERTING);
> + }
> +
> + reinsert:
> + /* If the table was resized, insert tuples into the new buckets. */
> + ExecHashUpdate(hashtable);
> + ExecHashReinsertAll(hashtable);
>
> Hm. So even non-resizing backends reach this - but they happen to not
> do anything because there's no work queued up, right? That's, uh, not
> obvious.

Added comments to that effect.

> For me the code here would be a good bit easier to read if we had a
> MultiExecHash and MultiExecParallelHash. Half of MultiExecHash is just
> if(IsShared) blocks, and copying would avoid potential slowdowns.

Hmm. Yeah I have struggled with this question in several places. For
example I have ExecHashLoadPrivateTuple and ExecHashLoadSharedTuple
because the intertwangled version was unbearable. But in
MultiExecHash's case, I feel there is some value in showing that the
basic hash build steps are the same. The core loop, where the main
action really happens, is unchanged.

> + /*
> + * Set up for skew optimization, if possible and there's a need for
> + * more than one batch. (In a one-batch join, there's no point in
> + * it.)
> + */
> + if (nbatch > 1)
> + ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
>
> So there's no equivalent to the skew optimization for parallel query
> yet... It doesn't sound like that should be particulalry hard on first
> blush?

Making the skew table shared, setting up buckets for MVCs, build and
probing it is easy. It's work_mem exhaustion and shrinking and
related jiggery pokery that'll be tricky, but I'll shortly be looking
at that with vigour and vim. That there may be one or two empty
relation optimisations that I haven't got yet because they involve a
bit of extra communication.

> static void
> -ExecHashIncreaseNumBatches(HashJoinTable hashtable)
> +ExecHashIncreaseNumBatches(HashJoinTable hashtable, int nbatch)
>
> So this doesn't actually increase the number of batches anymore... At
> the very least this should mention that the main work is done in
> ExecHashShrink.

Yeah. Done.

> +/*
> + * Process the queue of chunks whose tuples need to be redistributed into the
> + * correct batches until it is empty. In the best case this will shrink the
> + * hash table, keeping about half of the tuples in memory and sending the rest
> + * to a future batch.
> + */
> +static void
> +ExecHashShrink(HashJoinTable hashtable)
>
> Should mention this really only is meaningful after
> ExecHashIncreaseNumBatches has run.

Updated.

> +{
> + long ninmemory;
> + long nfreed;
> + dsa_pointer chunk_shared;
> + HashMemoryChunk chunk;
>
> - /* If know we need to resize nbuckets, we can do it while rebatching. */
> - if (hashtable->nbuckets_optimal != hashtable->nbuckets)
> + if (HashJoinTableIsShared(hashtable))
> {
> - /* we never decrease the number of buckets */
> - Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
> + /*
> + * Since a newly launched participant could arrive while shrinking is
> + * already underway, we need to be able to jump to the correct place
> + * in this function.
> + */
> + switch (PHJ_SHRINK_PHASE(BarrierPhase(&hashtable->shared->shrink_barrier)))
> + {
> + case PHJ_SHRINK_PHASE_BEGINNING: /* likely case */
> + break;
> + case PHJ_SHRINK_PHASE_CLEARING:
> + goto clearing;
> + case PHJ_SHRINK_PHASE_WORKING:
> + goto working;
> + case PHJ_SHRINK_PHASE_DECIDING:
> + goto deciding;
> + }
>
> Hm, so we jump into different nesting levels here :/

I rewrote this without goto. Mea culpa.

> ok, ENOTIME for today...

Thanks! Was enough to keep me busy for some time...

> diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
> index f2c885afbe..87d8f3766e 100644
> --- a/src/backend/executor/nodeHashjoin.c
> +++ b/src/backend/executor/nodeHashjoin.c
> @@ -6,10 +6,78 @@
> * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
> * Portions Copyright (c) 1994, Regents of the University of California
> *
> - *
> * IDENTIFICATION
> * src/backend/executor/nodeHashjoin.c
> *
> + * NOTES:
> + *
> + * PARALLELISM
> + *
> + * Hash joins can participate in parallel queries in two ways: in
> + * non-parallel-aware mode, where each backend builds an identical hash table
> + * and then probes it with a partial outer relation, or parallel-aware mode
> + * where there is a shared hash table that all participants help to build. A
> + * parallel-aware hash join can save time and space by dividing the work up
> + * and sharing the result, but has extra communication overheads.
>
> There's a third, right? The hashjoin, and everything below it, could
> also not be parallel, but above it could be some parallel aware node
> (e.g. a parallel aware HJ).

Yeah that's the same thing: it's not aware of parallelism. Its outer
plan may be partial or not, and it doesn't even know. That's the
distinction I'm trying to make clear: actually doing something special
for parallelism. I've update the text slightly to say that the outer
plan may be partial or not in a hash join that is under Gather.

> + * In both cases, hash joins use a private state machine to track progress
> + * through the hash join algorithm.
>
> That's not really parallel specific, right? Perhaps just say that
> parallel HJs use the normal state machine?

Updated.

> + * In a parallel-aware hash join, there is also a shared 'phase' which
> + * co-operating backends use to synchronize their local state machine and
> + * program counter with the multi-process join. The phase is managed by a
> + * 'barrier' IPC primitive.
>
> Hm. I wonder if 'phase' shouldn't just be name
> sharedHashJoinState. Might be a bit easier to understand than a
> different terminology.

Hmm. Well it is a lot like a state machine but it might be more
confusing to have both local and shared 'state'. I think 'phases' of
parallel computation are quite intuitive. I'm rather attached to this
terminology...

> + * The phases are as follows:
> + *
> + * PHJ_PHASE_BEGINNING -- initial phase, before any participant acts
> + * PHJ_PHASE_CREATING -- one participant creates the shmem hash table
> + * PHJ_PHASE_BUILDING -- all participants build the hash table
> + * PHJ_PHASE_RESIZING -- one participant decides whether to expand buckets
> + * PHJ_PHASE_REINSERTING -- all participants reinsert tuples if necessary
> + * PHJ_PHASE_PROBING -- all participants probe the hash table
> + * PHJ_PHASE_UNMATCHED -- all participants scan for unmatched tuples
>
> I think somewhere here - and probably around the sites it's happening -
> should mention that state transitions are done kinda implicitly via
> BarrierWait progressing to the numerically next phase. That's not
> entirely obvious (and actually limits what the barrier mechanism can be
> used for...).

Yeah. Added comments.

On Wed, Mar 29, 2017 at 9:31 AM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> - ExecHashJoinSaveTuple(tuple,
> - hashvalue,
> - &hashtable->innerBatchFile[batchno]);
> + if (HashJoinTableIsShared(hashtable))
> + sts_puttuple(hashtable->shared_inner_batches, batchno, &hashvalue,
> + tuple);
> + else
> + ExecHashJoinSaveTuple(tuple,
> + hashvalue,
> + &hashtable->innerBatchFile[batchno]);
> }
> }
>
> Why isn't this done inside of ExecHashJoinSaveTuple?

I had it that way earlier but the arguments got ugly. I suppose it
could take an SOMETHING_INNER/SOMETHING_OUTER enum and a partition
number.

I wonder if SharedTuplestore should be able to handle the private case too...

> @@ -1280,6 +1785,68 @@ ExecHashTableReset(HashJoinTable hashtable)
>
> + /* Rewind the shared read heads for this batch, inner and outer. */
> + sts_prepare_parallel_read(hashtable->shared_inner_batches,
> + curbatch);
> + sts_prepare_parallel_read(hashtable->shared_outer_batches,
> + curbatch);
>
> It feels somewhat wrong to do this in here, rather than on the callsites.

The private hash table code does the moral equivalent directly below:
it uses BufFileSeek to rewind the current inner and outer batch to the
start.

> + }
> +
> + /*
> + * Each participant needs to make sure that data it has written for
> + * this partition is now read-only and visible to other participants.
> + */
> + sts_end_write(hashtable->shared_inner_batches, curbatch);
> + sts_end_write(hashtable->shared_outer_batches, curbatch);
> +
> + /*
> + * Wait again, so that all workers see the new hash table and can
> + * safely read from batch files from any participant because they have
> + * all ended writing.
> + */
> + Assert(BarrierPhase(&hashtable->shared->barrier) ==
> + PHJ_PHASE_RESETTING_BATCH(curbatch));
> + BarrierWait(&hashtable->shared->barrier, WAIT_EVENT_HASH_RESETTING);
> + Assert(BarrierPhase(&hashtable->shared->barrier) ==
> + PHJ_PHASE_LOADING_BATCH(curbatch));
> + ExecHashUpdate(hashtable);
> +
> + /* Forget the current chunks. */
> + hashtable->current_chunk = NULL;
> + return;
> + }
>
> /*
> * Release all the hash buckets and tuples acquired in the prior pass, and
> @@ -1289,10 +1856,10 @@ ExecHashTableReset(HashJoinTable hashtable)
> oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
>
> /* Reallocate and reinitialize the hash bucket headers. */
> - hashtable->buckets = (HashJoinTuple *)
> - palloc0(nbuckets * sizeof(HashJoinTuple));
> + hashtable->buckets = (HashJoinBucketHead *)
> + palloc0(nbuckets * sizeof(HashJoinBucketHead));
>
> - hashtable->spaceUsed = nbuckets * sizeof(HashJoinTuple);
> + hashtable->spaceUsed = nbuckets * sizeof(HashJoinBucketHead);
>
> /* Cannot be more than our previous peak; we had this size before. */
> Assert(hashtable->spaceUsed <= hashtable->spacePeak);
> @@ -1301,6 +1868,22 @@ ExecHashTableReset(HashJoinTable hashtable)
>
> /* Forget the chunks (the memory was freed by the context reset above). */
> hashtable->chunks = NULL;
> +
> + /* Rewind the shared read heads for this batch, inner and outer. */
> + if (hashtable->innerBatchFile[curbatch] != NULL)
> + {
> + if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, SEEK_SET))
> + ereport(ERROR,
> + (errcode_for_file_access(),
> + errmsg("could not rewind hash-join temporary file: %m")));
> + }
> + if (hashtable->outerBatchFile[curbatch] != NULL)
> + {
> + if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
> + ereport(ERROR,
> + (errcode_for_file_access(),
> + errmsg("could not rewind hash-join temporary file: %m")));
> + }
> }
>
> /*
> @@ -1310,12 +1893,21 @@ ExecHashTableReset(HashJoinTable hashtable)
> void
> ExecHashTableResetMatchFlags(HashJoinTable hashtable)
> {
> + dsa_pointer chunk_shared = InvalidDsaPointer;
> HashMemoryChunk chunk;
> HashJoinTuple tuple;
> int i;
>
> /* Reset all flags in the main table ... */
> - chunk = hashtable->chunks;
> + if (HashJoinTableIsShared(hashtable))
> + {
> + /* This only runs in the leader during rescan initialization. */
> + Assert(!IsParallelWorker());
> + hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
> + chunk = pop_chunk_queue(hashtable, &chunk_shared);
> + }
> + else
> + chunk = hashtable->chunks;
>
> Hm - doesn't pop_chunk_queue empty the work queue?

Well first it puts the main chunks onto the work queue, and then it
pops them off one by one clearing flags until there is nothing left on
the work queue. But this is only running in one backend. It's not
very exciting. Do you see a bug here?

> +/*
> + * Load a tuple into shared dense storage, like 'load_private_tuple'. This
> + * version is for shared hash tables.
> + */
> +static HashJoinTuple
> +load_shared_tuple(HashJoinTable hashtable, MinimalTuple tuple,
> + dsa_pointer *shared, bool respect_work_mem)
> +{
>
> Hm. Are there issues with "blessed" records being stored in shared
> memory? I seem to recall you talking about it, but I see nothing
> addressing the issue here? (later) Ah, I see - you just prohibit
> paralleism in that case - might be worth pointing to.

Note added.

I had difficulty testing that. I couldn't create anonymous ROW(...)
values without the project moving above the hash table. Andrew Gierth
showed me a way to prevent that with OFFSET 0 but that disabled
parallelism. I tested that code by writing extra test code to dump
the output of tlist_references_transient_type() on the tlists of
various test paths not in a parallel query. Ideas welcome, as I feel
like this belongs in a regression test.

> + /* Check if some other participant has increased nbatch. */
> + if (hashtable->shared->nbatch > hashtable->nbatch)
> + {
> + Assert(respect_work_mem);
> + ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch);
> + }
> +
> + /* Check if we need to help shrinking. */
> + if (hashtable->shared->shrink_needed && respect_work_mem)
> + {
> + hashtable->current_chunk = NULL;
> + LWLockRelease(&hashtable->shared->chunk_lock);
> + return NULL;
> + }
> +
> + /* Oversized tuples get their own chunk. */
> + if (size > HASH_CHUNK_THRESHOLD)
> + chunk_size = size + HASH_CHUNK_HEADER_SIZE;
> + else
> + chunk_size = HASH_CHUNK_SIZE;
> +
> + /* If appropriate, check if work_mem would be exceeded by a new chunk. */
> + if (respect_work_mem &&
> + hashtable->shared->grow_enabled &&
> + hashtable->shared->nbatch <= MAX_BATCHES_BEFORE_INCREASES_STOP &&
> + (hashtable->shared->size +
> + chunk_size) > (work_mem * 1024L *
> + hashtable->shared->planned_participants))
> + {
> + /*
> + * It would be exceeded. Let's increase the number of batches, so we
> + * can try to shrink the hash table.
> + */
> + hashtable->shared->nbatch *= 2;
> + ExecHashIncreaseNumBatches(hashtable, hashtable->shared->nbatch);
> + hashtable->shared->chunk_work_queue = hashtable->shared->chunks;
> + hashtable->shared->chunks = InvalidDsaPointer;
> + hashtable->shared->shrink_needed = true;
> + hashtable->current_chunk = NULL;
> + LWLockRelease(&hashtable->shared->chunk_lock);
> +
> + /* The caller needs to shrink the hash table. */
> + return NULL;
> + }
>
> Hm - we could end up calling ExecHashIncreaseNumBatches twice here?
> Probably harmless.

Yes. In the code higher up we could observe that someone else has
increased the number of batches: here we are just updating our local
hashtable->nbatch. Then further down we could decide that it needs to
be done again because we work out that this allocation will push us
over the work_mem limit. Really that function just *sets* the number
of batches. It's really the code beginning hashtable->shared->nbatch
*= 2 that is really increasing the number of batches and setting up
the state for all participants to shrink the hash table and free up
some memory.

>
> /* ----------------------------------------------------------------
> * ExecHashJoin
> @@ -129,6 +200,14 @@ ExecHashJoin(HashJoinState *node)
> /* no chance to not build the hash table */
> node->hj_FirstOuterTupleSlot = NULL;
> }
> + else if (hashNode->shared_table_data != NULL)
> + {
> + /*
> + * The empty-outer optimization is not implemented for
> + * shared hash tables yet.
> + */
> + node->hj_FirstOuterTupleSlot = NULL;
>
> Hm, why is this checking for the shared-ness of the join in a different
> manner?

The usual manner is HashJoinTableIsShare(hashtable) but you see
Assert(hashtable == NULL) a few lines earlier; this is the
HJ_BUILD_HASHTABLE state where it hasn't been constructed yet. When
ExecHashTableCreate (a bit further down) constructs it it'll assign
hashtable->shared = state->shared_table_data (to point to a bit of DSM
memory). The reason the usual test is based on the HashJoinTable
pointer usually called 'hashtable' is because that is passed around
almost everywhere so it's convenient to use that.

> + if (HashJoinTableIsShared(hashtable))
> + {
> + /*
> + * An important optimization: if this is a
> + * single-batch join and not an outer join, there is
> + * no reason to synchronize again when we've finished
> + * probing.
> + */
> + Assert(BarrierPhase(&hashtable->shared->barrier) ==
> + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
> + if (hashtable->nbatch == 1 && !HJ_FILL_INNER(node))
> + return NULL; /* end of join */
> +
> + /*
> + * Check if we are a leader that can't go further than
> + * probing the first batch, to avoid risk of deadlock
> + * against workers.
> + */
> + if (!LeaderGateCanContinue(&hashtable->shared->leader_gate))
> + {
> + /*
> + * Other backends will need to handle all future
> + * batches written by me. We don't detach until
> + * after we've finished writing to all batches so
> + * that they are flushed, otherwise another
> + * participant might try to read them too soon.
> + */
> + sts_end_write_all_partitions(hashNode->shared_inner_batches);
> + sts_end_write_all_partitions(hashNode->shared_outer_batches);
> + BarrierDetach(&hashtable->shared->barrier);
> + hashtable->detached_early = true;
> + return NULL;
> + }
> +
> + /*
> + * We can't start searching for unmatched tuples until
> + * all participants have finished probing, so we
> + * synchronize here.
> + */
> + Assert(BarrierPhase(&hashtable->shared->barrier) ==
> + PHJ_PHASE_PROBING_BATCH(hashtable->curbatch));
> + if (BarrierWait(&hashtable->shared->barrier,
> + WAIT_EVENT_HASHJOIN_PROBING))
> + {
> + /* Serial phase: prepare for unmatched. */
> + if (HJ_FILL_INNER(node))
> + {
> + hashtable->shared->chunk_work_queue =
> + hashtable->shared->chunks;
> + hashtable->shared->chunks = InvalidDsaPointer;
> + }
> + }
>
> Couldn't we skip that if this isn't an outer join? Not sure if the
> complication would be worth it...

Yes, well we don't even get this far in the very common case of a
single batch inner join (see note above that about an "important
optimization"). If it's outer you need this, and if there are
multiple batches it hardly matters if you have to go through this
extra step. But you're right that there are a few missed
opportunities here and there.

> +void
> +ExecShutdownHashJoin(HashJoinState *node)
> +{
> + /*
> + * By the time ExecEndHashJoin runs in a work, shared memory has been
>
> s/work/worker/

Fixed.

> + * destroyed. So this is our last chance to do any shared memory cleanup.
> + */
> + if (node->hj_HashTable)
> + ExecHashTableDetach(node->hj_HashTable);
> +}
>
> + There is no extra charge
> + * for probing the hash table for outer path row, on the basis that
> + * read-only access to a shared hash table shouldn't be any more
> + * expensive.
> + */
>
> Hm, that's debatable. !shared will mostly be on the local numa node,
> shared probably not.

Agreed, NUMA surely changes the situation for probing. I wonder if it
deserves a separate GUC. I'm actually quite hesitant to try to model
things like that because it seems like a can of worms. I will try to
come up with some numbers backed up with data though. Watch this
space.

> * Get hash table size that executor would use for inner relation.
> *
> + * Shared hash tables are allowed to use the work_mem of all participants
> + * combined to make up for the fact that there is only one copy shared by
> + * all.
>
> Hm. I don't quite understand that reasoning.

Our model for memory usage limits is that every instance of an
executor node is allowed to allocate up to work_mem. If I run a
parallel hash join in 9.6 with 3 workers and I have set work_mem to
10MB, then the system will attempt to stay under 10MB in each
participant, using up to 40MB across the 4 processes.

The goal of Parallel Shared Hash is to divide the work of building the
hash table up over the 4 backends, and combine the work_mem of the 4
backends to create a shared hash table. The total amount of memory
used is the same, but we make much better use of it. Make sense?

> * XXX for the moment, always assume that skew optimization will be
> * performed. As long as SKEW_WORK_MEM_PERCENT is small, it's not worth
> * trying to determine that for sure.
>
> If we don't do skew for parallelism, should we skip that bit?

I am looking into the skew optimisation. Will report back on that
soon, and also try to get some data relevant to costing.

--
Thomas Munro
http://www.enterprisedb.com

Attachment Content-Type Size
parallel-shared-hash-v11.tgz application/x-gzip 64.8 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Craig Ringer 2017-03-31 04:55:35 Re: [PATCH] Reduce src/test/recovery verbosity
Previous Message Amit Langote 2017-03-31 04:51:59 Re: postgres_fdw IMPORT SCHEMA and partitioned tables