Re: Parallel Hash take II

From: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Rushabh Lathia <rushabh(dot)lathia(at)gmail(dot)com>, Prabhat Sahu <prabhat(dot)sahu(at)enterprisedb(dot)com>, Peter Geoghegan <pg(at)bowt(dot)ie>, Robert Haas <robertmhaas(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>, Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Oleg Golovanov <rentech(at)mail(dot)ru>
Subject: Re: Parallel Hash take II
Date: 2017-11-13 12:30:30
Message-ID: CAEepm=1NapjQkDB+=689vjxMHU2yu1LC7k7V64AwC=PN38QjVQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi Andres and Peter,

Please see below for inline responses to your feedback. New patch attached.

On Wed, Nov 8, 2017 at 10:01 AM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> +set min_parallel_table_scan_size = 0;
> +set parallel_setup_cost = 0;
> +-- Make a simple relation with well distributed keys and correctly
> +-- estimated size.
> +create table simple as
> + select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
> +alter table simple set (parallel_workers = 2);
> +analyze simple;
> +-- Make a relation whose size we will under-estimate. We want stats
> +-- to say 1000 rows, but actually there are 20,000 rows.
> +create table bigger_than_it_looks as
> + select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
> +alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
> +alter table bigger_than_it_looks set (parallel_workers = 2);
> +delete from bigger_than_it_looks where id <= 19000;
> +vacuum bigger_than_it_looks;
> +analyze bigger_than_it_looks;
> +insert into bigger_than_it_looks
> + select generate_series(1, 19000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
>
> It seems kinda easier to just manipulate ndistinct and reltuples...

Done.

> +set max_parallel_workers_per_gather = 0;
> +set work_mem = '4MB';
>
> I hope there's a fair amount of slop here - with different archs you're
> going to see quite some size differences.

Yeah, this is a problem I wrestled with. See next.

> +-- The "good" case: batches required, but we plan the right number; we
> +-- plan for 16 batches, and we stick to that number, and peak memory
> +-- usage says within our work_mem budget
> +-- non-parallel
> +set max_parallel_workers_per_gather = 0;
> +set work_mem = '128kB';
>
> So how do we know that's actually the case we're testing rather than
> something arbitrarily different? There's IIRC tests somewhere that just
> filter the json explain output to the right parts...

Yeah, good idea. My earlier attempts to dump out the hash join
dimensions ran into problems with architecture sensitivity and then
some run-to-run non-determinism in the parallel case (due to varying
fragmentation depending on how many workers get involved in time).
The attached version tells you about batch growth without reporting
the exact numbers, except in the "ugly" case where we know that there
is only one possible outcome because the extreme skew detector is
guaranteed to go off after the first nbatch increase (I got rid of all
other tuples except ones with the same key to make this true).

This exercise did reveal a bug in
0008-Show-hash-join-per-worker-information-in-EXPLAIN-ANA.patch
though: it is capturing shared instrumentation too soon in the
non-Parallel Hash case so the nbatch reported by EXPLAIN ANALYZE might
be too low if we grew while probing. Oops. Will post a fix for that.

> +/*
> + * Build the name for a given segment of a given BufFile.
> + */
> +static void
> +MakeSharedSegmentName(char *name, const char *buffile_name, int segment)
> +{
> + snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
> +}
>
> Not a fan of this name - you're not "making" a filename here (as in
> allocating or such). I think I'd just remove the Make prefix.

Done. I also changed some similar code where I'd used GetXXX when
building paths.

> +/*
> + * Open a file that was previously created in another backend with
> + * BufFileCreateShared in the same SharedFileSet using the same name. The
> + * backend that created the file must have called BufFileClose() or
> + * BufFileExport() to make sure that it is ready to be opened by other
> + * backends and render it read-only.
> + */
>
> Is it actually guaranteed that it's another backend / do we rely on
> that?

No, it could be any backend that is attached to the SharedFileSet,
including the current one. Wording improved.

> +BufFile *
> +BufFileOpenShared(SharedFileSet *fileset, const char *name)
> +{
>
> + /*
> + * If we didn't find any files at all, then no BufFile exists with this
> + * tag.
> + */
> + if (nfiles == 0)
> + return NULL;
>
> s/taag/name/?

Fixed.

> +/*
> + * Delete a BufFile that was created by BufFileCreateShared in the given
> + * SharedFileSet using the given name.
> + *
> + * It is not necessary to delete files explicitly with this function. It is
> + * provided only as a way to delete files proactively, rather than waiting for
> + * the SharedFileSet to be cleaned up.
> + *
> + * Only one backend should attempt to delete a given name, and should know
> + * that it exists and has been exported or closed.
> + */
> +void
> +BufFileDeleteShared(SharedFileSet *fileset, const char *name)
> +{
> + char segment_name[MAXPGPATH];
> + int segment = 0;
> + bool found = false;
> +
> + /*
> + * We don't know how many segments the file has. We'll keep deleting
> + * until we run out. If we don't manage to find even an initial segment,
> + * raise an error.
> + */
> + for (;;)
> + {
> + MakeSharedSegmentName(segment_name, name, segment);
> + if (!SharedFileSetDelete(fileset, segment_name, true))
> + break;
> + found = true;
> + ++segment;
> + }
>
> Hm. Do we properly delete all the files via the resowner mechanism if
> this fails midway? I.e. if there are no leading segments? Also wonder if
> this doesn't need a CFI check.

The resowner mechanism recursively deletes everything, so order
doesn't matter here. CFI added.

> +void
> +PathNameCreateTemporaryDir(const char *basedir, const char *directory)
> +{
> + if (mkdir(directory, S_IRWXU) < 0)
> + {
> + if (errno == EEXIST)
> + return;
> +
> + /*
> + * Failed. Try to create basedir first in case it's missing. Tolerate
> + * ENOENT to close a race against another process following the same
> + * algorithm.
> + */
> + if (mkdir(basedir, S_IRWXU) < 0 && errno != ENOENT)
> + elog(ERROR, "cannot create temporary directory \"%s\": %m",
> + basedir);
>
> ENOENT or EEXIST?

Oops, right, fixed.

> +File
> +PathNameCreateTemporaryFile(const char *path, bool error_on_failure)
> +{
> + File file;
> +
> + /*
> + * Open the file. Note: we don't use O_EXCL, in case there is an orphaned
> + * temp file that can be reused.
> + */
> + file = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY);
> + if (file <= 0)
> + {
> + if (error_on_failure)
> + elog(ERROR, "could not create temporary file \"%s\": %m", path);
> + else
> + return file;
> + }
> +
> + /* Mark it for temp_file_limit accounting. */
> + VfdCache[file].fdstate |= FD_TEMP_FILE_LIMIT;
> +
> + /*
> + * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we still
> + * want to make sure they get closed at end of xact.
> + */
> + ResourceOwnerEnlargeFiles(CurrentResourceOwner);
> + ResourceOwnerRememberFile(CurrentResourceOwner, file);
> + VfdCache[file].resowner = CurrentResourceOwner;
>
> So maybe I'm being pedantic here, but wouldn't the right order be to do
> ResourceOwnerEnlargeFiles() *before* creating the file? It's a memory
> allocating operation, so it can fail, which'd leak the file.

Fixed. See also commit c5269472.

> +/*
> + * Open a file that was created with PathNameCreateTemporaryFile, possibly in
> + * another backend. Files opened this way don't count agains the
>
> s/agains/against/

Fixed.

> + * temp_file_limit of the caller, are read-only and are automatically closed
> + * at the end of the transaction but are not deleted on close.
> + */
> +File
> +PathNameOpenTemporaryFile(const char *path)
> +{
> + File file;
> +
> + /* We open the file read-only. */
> + file = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
> +
> + /* If no such file, then we don't raise an error. */
> + if (file <= 0 && errno != ENOENT)
> + elog(ERROR, "could not open temporary file \"%s\": %m", path);
> +
> + if (file > 0)
> + {
> + /*
> + * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we
> + * still want to make sure they get closed at end of xact.
> + */
> + ResourceOwnerEnlargeFiles(CurrentResourceOwner);
> + ResourceOwnerRememberFile(CurrentResourceOwner, file);
> + VfdCache[file].resowner = CurrentResourceOwner;
>
> Same complaint as above, ResourceOwnerEnlargeFiles() should be done
> earlier.

Fixed.

> +/*
> + * Delete a file by pathname. Return true if the file existed, false if
> + * didn't.
> + */
> +bool
> +PathNameDeleteTemporaryFile(const char *path, bool error_on_failure)
> +{
> + struct stat filestats;
> + int stat_errno;
> +
> + /* Get the final size for pgstat reporting. */
> + if (stat(path, &filestats) != 0)
> + stat_errno = errno;
> + else
> + stat_errno = 0;
> +
> + /*
> + * Unlike FileClose's automatic file deletion code, we tolerate
> + * non-existence to support BufFileDeleteShared which doesn't know how
> + * many segments it has to delete until it runs out.
> + */
> + if (stat_errno == ENOENT)
> + return false;
> +
> + if (unlink(path) < 0)
> + {
> + if (errno != ENOENT)
> + elog(error_on_failure ? ERROR : LOG,
> + "cannot unlink temporary file \"%s\": %m", path);
> + return false;
> + }
> +
> + if (stat_errno == 0)
> + ReportTemporaryFileUsage(path, filestats.st_size);
> + else
> + {
> + errno = stat_errno;
> + elog(LOG, "could not stat file \"%s\": %m", path);
> + }
>
> All these messages are "not expected to ever happen" ones, right?

You'd have to suffer a nasty filesystem failure, remount read-only or
manually with permissions or something. Not sure where the line is,
but I've changed all of these new elog calls to ereport.

> + return true;
> +}
> +
> /*
> * close a file when done with it
> */
> @@ -1537,10 +1747,17 @@ FileClose(File file)
> Delete(file);
> }
>
> + if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
> + {
> + /* Subtract its size from current usage (do first in case of error) */
> + temporary_files_size -= vfdP->fileSize;
> + vfdP->fileSize = 0;
> + }
>
> So, is it right to do so unconditionally and without regard for errors?
> If the file isn't deleted, it shouldn't be subtracted from fileSize. I
> guess you're managing that through the flag, but that's not entirely
> obvious.

I think it is. Reasoning: The existing behaviour of fd.c is that if
we don't manage to delete temporary files, we'll LOG something and
forget about them (they'll be cleaned up eventually by a clean restart
or human intervention). If you have a filesystem that lets you create
and write files but not unlink them then Postgres will eventually eat
all your disk. The alternative would be not to adjust
temporary_files_size so that we prevent this backend from creating
more temporary files: that might make some kind of sense but it
wouldn't solve any real operational problems: you could still eat all
the disk space by disconnecting and reconnecting to get a new session
with a new temp filespace allowance. If we want true space limit
we'll need to design that, but that seems out of scope for this
discussion.

> diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c
> new file mode 100644
> index 00000000000..6da80838b37
> --- /dev/null
> +++ b/src/backend/storage/file/sharedfileset.c
> @@ -0,0 +1,240 @@
> +/*-------------------------------------------------------------------------
> + *
> + * sharedfileset.c
> + * Shared temporary file management.
> + *
> + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
> + * Portions Copyright (c) 1994, Regents of the University of California
> + *
> + * IDENTIFICATION
> + * src/backend/storage/file/sharedfileset.c
> + *
> + *-------------------------------------------------------------------------
> + */
>
> A slightly bigger comment wouldn't hurt.

Done.

> +/*
> + * Attach to a set of directories that was created with SharedFileSetInit.
> + */
> +void
> +SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
> +{
> + bool success;
> +
> + SpinLockAcquire(&fileset->mutex);
> + if (fileset->refcnt == 0)
> + success = false;
>
> I've not read finished reading through this, but is this safe? If the
> segment's gone, is the spinlock guaranteed to still be a spinlock? I
> suspect this isn't a problem because just the underlying data is
> removed, but the SharedFileSet stays alive?

If you have a dsm_segment object, you are attached to it and it exists
(ie is mapped in your backend). The spinlock is guaranteed to be a
spinlock. The race being guarded against here is: a worker starts up
and attaches to the DSM segment and then tries to attach to the
SharedFileSet but finds that the other backends have all detached and
the refcount is 0. That's actually quite unlikely, because they
detach from the SharedFileSet as part of the process of detaching from
the DSM segment, so there is a pretty narrow window of time in which
SharedFileSet's refcount is 0 but the DSM segment still exists (they
haven't detached from that yet) and you managed to attach to the DSM
segment.

This is a variant of the problem SQL Smith found and I fixed in commit
fddf45b38097d14301d249fbeebca32e40233bd2. I think in general anything
that has its own reference count or other kind of state indicating "I
have been destroyed, you can't use me" and lives in a DSM segment
needs protection against this race. In general, I think parallel
workers that start up late (say in an error case where the leader gave
up or something) are bound to show racy error messages ranging from
"can't attach to this invalid DSM handle" to "I attached to the DSM
segment, but I can't seem to attach to <thing> inside it", which are
all just versions of "hey, where'd everyone go?" with different
timing.

> +/*
> + * Sorting hat to determine which tablespace a given shared temporary file
> + * belongs in.
> + */
> +static Oid
> +ChooseTablespace(const SharedFileSet *fileset, const char *name)
> +{
> + uint32 hash = hash_any((const unsigned char *) name, strlen(name));
> +
> + return fileset->tablespaces[hash % fileset->ntablespaces];
> +}
>
> Hm. I wonder if just round-robin through these isn't a better approach.

The problem is that all backends opening that file by name need to
agree on which tablespace it lives in, and I don't have per-file shmem
space to remember them. In an earlier version I required all callers
to tell me which "stripe number" the file lived in and to promise that
they would use the same stripe number for the same filename every time
in every backend, so then maybe the calling code could somehow figure
out how to round-robin the stripe numbers... but that seemed at best
clumsy. This way seemed much tidier, and has only one downside I
could think of: when you have only 2 files in the set there is a 50%
chance that they both finish up in the same tablespace. With any more
than that it should quickly approach even load balancing. It didn't
seem worth trying very hard to solve that problem, since people don't
really use temp_tablespaces for serious load balancing anyway AFAIK.

> +/*
> + * Compute the full path of a file in a SharedFileSet.
> + */
> +static void
> +GetSharedFilePath(char *path, SharedFileSet *fileset, const char *name)
> +{
> + char dirpath[MAXPGPATH];
> +
> + GetSharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
> + snprintf(path, MAXPGPATH, "%s/" PG_TEMP_FILE_PREFIX ".%s", dirpath, name);
> +}
> diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
> index 4c35ccf65eb..8b91d5a6ebe 100644
> --- a/src/backend/utils/resowner/resowner.c
> +++ b/src/backend/utils/resowner/resowner.c
> @@ -528,16 +528,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
> PrintRelCacheLeakWarning(res);
> RelationClose(res);
> }
> -
> - /* Ditto for dynamic shared memory segments */
> - while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
> - {
> - dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
> -
> - if (isCommit)
> - PrintDSMLeakWarning(res);
> - dsm_detach(res);
> - }
> }
> else if (phase == RESOURCE_RELEASE_LOCKS)
> {
> @@ -654,6 +644,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
> PrintFileLeakWarning(res);
> FileClose(res);
> }
> +
> + /* Ditto for dynamic shared memory segments */
> + while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
> + {
> + dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
> +
> + if (isCommit)
> + PrintDSMLeakWarning(res);
> + dsm_detach(res);
> + }
> }
>
> Is that entirely unproblematic? Are there any DSM callbacks that rely on
> locks still being held? Please split this part into a separate commit
> with such analysis.

I've removed this change. As far as I know I was wrong about Windows
needing this change for my patch set (due to FILE_SHARED_DELETE).

> +/* The initial size of chunks in pages. */
> +#define STS_MIN_CHUNK_PAGES 4
>
> Could use quick description at how you've arrived at that specific
> value.

Done. The new comment is:

/*
* The initial size of chunks, in pages. This is somewhat arbitrarily set to
* match the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of
* tuples at approximately the same rate as it allocates new chunks of memory
* to insert them into.
*/

> +/* Chunk written to disk. */
> +typedef struct SharedTuplestoreChunk
> +{
> + int npages; /* Size of this chunk in BLCKSZ pages. */
> + int ntuples; /* Number of tuples in this chunk. */
> + char data[FLEXIBLE_ARRAY_MEMBER];
> +} SharedTuplestoreChunk;
> +
> +/* Per-participant shared state. */
> +typedef struct SharedTuplestoreParticipant
> +{
> + slock_t mutex;
> + BlockNumber read_page; /* Page number for next read. */
> + BlockNumber npages; /* Number of pages written. */
> + bool writing; /* Used only for assertions. */
> +
> + /*
> + * We need variable sized chunks, because we might be asked to store
> + * gigantic tuples. To avoid the locking contention that would come from
> + * reading chunk sizes from disk, we store the chunk size for ranges of
> + * the file in a compact format in memory. chunk_pages starts out at
> + * STS_MIN_CHUNK_PAGES and then doubles each time we reach a page listed
> + * in chunk_expansion_log.
> + */
> + BlockNumber chunk_expansion_log[sizeof(BlockNumber) * CHAR_BIT];
> + int chunk_expansions;
> + int chunk_expansion;
> + int chunk_pages;
>
> This needs more explanation.

I hope this is now explained a bit better -- see also a few points further down.

> +/*
> + * Initialize a SharedTuplestore in existing shared memory. There must be
> + * space for sts_estimate(participants) bytes. If flags is set to the value
> + * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
> + * eagerly (but this isn't yet implemented).
>
> s/iset set to the value/includes the value/ - otherwise it's not really
> a flags argument.

Fixed.

> + * Tuples that are stored may optionally carry a piece of fixed sized
> + * meta-data which will be retrieved along with the tuple. This is useful for
> + * the hash codes used for multi-batch hash joins, but could have other
> + * applications.
>
> "hash codes"?

Fixed.

> +/*
> + * Prepare to rescan. Only participant should call this. After it returns,
> + * all participants should call sts_begin_parallel_scan() and then loop over
> + * sts_parallel_scan_next().
> + */
>
> s/should/may/? Also maybe document what happens with in-progress reads
> (or rather them not being allowed to exist)?

Done. Note that this interface is not currently used, but it could
obviously be used for rescans. In an earlier version this needed to
be called even for the first scan, but I got rid of that requirement.

> +/*
> + * Write a tuple. If a meta-data size was provided to sts_initialize, then a
> + * pointer to meta data of that size must be provided.
> + */
> +void
> +sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
> + MinimalTuple tuple)
> +{
>
> + /* Do we have space? */
> + size = accessor->sts->meta_data_size + tuple->t_len;
> + if (accessor->write_pointer + size >= accessor->write_end)
> + {
> + /* Try flushing to see if that creates enough space. */
> + if (accessor->write_chunk != NULL)
> + sts_flush_chunk(accessor);
> +
> + /*
> + * It may still not be enough in the case of a gigantic tuple, or if
> + * we haven't created a chunk buffer at all yet.
> + */
> + if (accessor->write_pointer + size >= accessor->write_end)
> + {
> + SharedTuplestoreParticipant *participant;
> + size_t space_needed;
> + int pages_needed;
> +
> + /* How many pages to hold this data and the chunk header? */
> + space_needed = offsetof(SharedTuplestoreChunk, data) + size;
> + pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ;
> + pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES);
> +
> + /*
> + * Double the chunk size until it's big enough, and record that
> + * fact in the shared expansion log so that readers know about it.
> + */
> + participant = &accessor->sts->participants[accessor->participant];
> + while (accessor->write_pages < pages_needed)
> + {
> + accessor->write_pages *= 2;
> + participant->chunk_expansion_log[participant->chunk_expansions++] =
> + accessor->write_page;
> + }
>
> Hm. Isn't that going to be pretty unfunny if you have one large and a
> lot of small tuples?

It will increase the parallel scan grain size, and then keep that size
for the rest of the contents of one backend's output file. I am aware
of two downsides to using a large parallel grain:

1. It determines the amount of unfairness when we run out of data:
it's the maximum amount of extra data that the unlucky last worker can
finish up with after all the others have finished. I think this
effect is reduced by higher level factors: when a reader runs out of
data in one backend's file, it'll start reading another backend's
file. If it's hit the end of all backends' files and this is an outer
batch, Parallel Hash will just go and work on another batch
immediately.

2. It affects the read-ahead heuristics. On Linux, if the parallel
scan grain size is larger than the read-ahead window and some other
backend advances the block counter between your reads then the scan
looks like random IO. Suppose you have the default read-ahead window
size of 512kB. You need to hit a tuple over 256kB in size *and* be
unlucky enough to have multiple backends reading the same file
concurrently (which I try to avoid as discussed elsewhere) to befuddle
the read-ahead heuristics. If this is ever a problem we could
consider explicit read-ahead advice.

A couple of alternatives seemed like bad ideas: we could read the
chunk size from the chunk headers, but then each read would be
dependent on (ie have to wait for) the preceding read, like Parallel
(btree) Index Scan and unlike Parallel Seq Scan; or we could track
changes in chunk size in shared memory in a more sophisticated way
that allows decreasing, but that would no longer be small, simple and
fixed size as I have it.

Better ideas?

BTW this code is covered by the regression test.

> + /* Create the output buffer. */
> + if (accessor->write_chunk != NULL)
> + pfree(accessor->write_chunk);
> + accessor->write_chunk = (SharedTuplestoreChunk *)
> + palloc0(accessor->write_pages * BLCKSZ);
>
> Are we guaranteed to be in a long-lived memory context here?

I changed it so that it captures CurrentMemoryContext when you
intialise or attach and uses that for allocating buffers.

> +/*
> + * Get the next tuple in the current parallel scan.
> + */
> +MinimalTuple
> +sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
> +{
> + SharedTuplestoreParticipant *p;
> + BlockNumber read_page;
> + int chunk_pages;
> + bool eof;
> +
> + for (;;)
> + {
> + /* Can we read more tuples from the current chunk? */
> + if (likely(accessor->read_ntuples < accessor->read_ntuples_available))
> + return sts_read_tuple(accessor, meta_data);
>
> I'm not convinced this is a good use of likely/unlikely (not biased and
> not performance critical enough).

Removed.

> + /* Find the location of a new chunk to read. */
> + p = &accessor->sts->participants[accessor->read_participant];
> +
> + SpinLockAcquire(&p->mutex);
> + eof = p->read_page >= p->npages;
> + if (!eof)
> + {
> + /*
> + * Figure out how big this chunk is. It will almost always be the
> + * same as the last chunk loaded, but if there is one or more
> + * entry in the chunk expansion log for this page then we know
> + * that it doubled that number of times. This avoids the need to
> + * do IO to adjust the read head, so we don't need to hold up
> + * concurrent readers. (An alternative to this extremely rarely
> + * run loop would be to use more space storing the new size in the
> + * log so we'd have 'if' instead of 'while'.)
> + */
> + read_page = p->read_page;
> + while (p->chunk_expansion < p->chunk_expansions &&
> + p->chunk_expansion_log[p->chunk_expansion] == p->read_page)
> + {
> + p->chunk_pages *= 2;
> + p->chunk_expansion++;
> + }
> + chunk_pages = p->chunk_pages;
> +
> + /* The next reader will start after this chunk. */
> + p->read_page += chunk_pages;
> + }
> + SpinLockRelease(&p->mutex);
>
> This looks more like the job of an lwlock rather than a spinlock.

I switched to the alternative algorithm mentioned in parentheses in
the comment. It uses a bit more space, but that loop is gone. In my
mind this is much like Parallel Seq Scan: we acquire a spinlock just
to advance the block pointer. The added complication is that we also
check if the chunk size has changed, which clang renders as this many
instructions:

postgres[0x10047eee0] <+176>: movslq 0x144(%r15,%rbx), %rcx
postgres[0x10047eee8] <+184>: cmpl 0x140(%r15,%rbx), %ecx
postgres[0x10047eef0] <+192>: jge 0x10047ef16 ;
<+230> at sharedtuplestore.c:489
postgres[0x10047eef2] <+194>: leaq (%r15,%rbx), %rdx
postgres[0x10047eef6] <+198>: cmpl %r12d, 0x40(%rdx,%rcx,8)
postgres[0x10047eefb] <+203>: jne 0x10047ef16 ;
<+230> at sharedtuplestore.c:489
postgres[0x10047eefd] <+205>: leaq 0x144(%r15,%rbx), %rsi
postgres[0x10047ef05] <+213>: leal 0x1(%rcx), %edi
postgres[0x10047ef08] <+216>: movl %edi, (%rsi)
postgres[0x10047ef0a] <+218>: movl 0x44(%rdx,%rcx,8), %ecx
postgres[0x10047ef0e] <+222>: movl %ecx, 0x148(%r15,%rbx)
postgres[0x10047ef16] <+230>: movl 0x148(%r15,%rbx), %r15d

That should be OK, right?

> +/*
> + * Create the name used for our shared BufFiles.
> + */
> +static void
> +make_name(char *name, SharedTuplestoreAccessor *accessor, int participant)
> +{
> + snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
> +}
>
> Name's a bit generic. And it's still not really making ;)

Renamed to sts_filename.

On Wed, Nov 8, 2017 at 10:32 AM, Peter Geoghegan <pg(at)bowt(dot)ie> wrote:
> On Tue, Nov 7, 2017 at 1:01 PM, Andres Freund <andres(at)anarazel(dot)de> wrote:
>> +/*
>> + * Build the name for a given segment of a given BufFile.
>> + */
>> +static void
>> +MakeSharedSegmentName(char *name, const char *buffile_name, int segment)
>> +{
>> + snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
>> +}
>>
>> Not a fan of this name - you're not "making" a filename here (as in
>> allocating or such). I think I'd just remove the Make prefix.
>
> +1
>
> Can we document the theory behind file naming here, if that isn't in
> the latest version? This is a routine private to parallel hash join
> (or shared tuplestore), not Buffile. Maybe Buffile should have some
> opinion on this, though. Just as a matter of style.

I have added some text about naming to BufFileCreateShared().

>> + /* Create the output buffer. */
>> + if (accessor->write_chunk != NULL)
>> + pfree(accessor->write_chunk);
>> + accessor->write_chunk = (SharedTuplestoreChunk *)
>> + palloc0(accessor->write_pages * BLCKSZ);
>>
>> Are we guaranteed to be in a long-lived memory context here?
>
> I imagine that Thomas looked to tuplestore_begin_heap() + interXact as
> a kind of precedent here. See comments above that function.

Yeah, but I had missed something important: I now capture the current
memory context when you initialise or attach to a SharedTuplestore.
That's the point at which the SharedTuplestoreAccessor is allocated,
and the caller needs to make sure that the right memory context is
active, but now that I hold onto it I can use the same one for future
buffer allocations. Like the code highlighted above. In nodeHash.c I
now make sure that always happens hashtable->hashCxt.

On Wed, Nov 8, 2017 at 4:40 PM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> Hi,
>
> * avoids wasting memory on duplicated hash tables
> * avoids wasting disk space on duplicated batch files
> * avoids wasting CPU executing duplicate subplans
>
> What's the last one referring to?

Erm, I was saying the same thing two different ways (with the
following line). Fixed.

> +static void
> +MultiExecParallelHash(HashState *node)
> +{
>
> + switch (BarrierPhase(build_barrier))
> + {
> + case PHJ_BUILD_ALLOCATING:
> +
> + /*
> + * Either I just allocated the initial hash table in
> + * ExecHashTableCreate(), or someone else is doing that. Either
> + * way, wait for everyone to arrive here so we can proceed, and
> + * then fall through.
> + */
> + BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING);
>
> Can you add a /* fallthrough */ comment here? Gcc is warning if you
> don't. While we currently have lotsa other places not having the
> annotation, it seem reasonable to have it in new code.

Fixed all warnings from GCC 7.2 -Wimplicit-fallthrough=3.

> + case PHJ_BUILD_HASHING_INNER:
> +
> + /*
> + * It's time to begin hashing, or if we just arrived here then
> + * hashing is already underway, so join in that effort. While
> + * hashing we have to be prepared to help increase the number of
> + * batches or buckets at any time, and if we arrived here when
> + * that was already underway we'll have to help complete that work
> + * immediately so that it's safe to access batches and buckets
> + * below.
> + */
> + if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
> + PHJ_GROW_BATCHES_ELECTING)
> + ExecParallelHashIncreaseNumBatches(hashtable);
> + if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
> + PHJ_GROW_BUCKETS_ELECTING)
> + ExecParallelHashIncreaseNumBuckets(hashtable);
> + ExecParallelHashEnsureBatchAccessors(hashtable);
>
> "accessors" sounds a bit weird for a bunch of pointers, but maybe that's
> just my ESL senses tingling wrongly.

Pointers to ParallelHashJoinBatchAccessor objects. That's where we
keep this backend's pointers into ParallelHashJoinBatch objects and
some other backend-local state. It's a pattern: you have some shared
object of type Foo, and every backend interacting with it will also
need an object of type FooAccessor if a simple pointer to the shared
object isn't enough. If you have a better name than 'Accessor' for
this then I'm all ears.

> /* ----------------------------------------------------------------
> @@ -240,12 +427,15 @@ ExecEndHash(HashState *node)
> * ----------------------------------------------------------------
> */
> HashJoinTable
> -ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
> +ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
>
> + /*
> + * Parallel Hash tries to use the combined work_mem of all workers to
> + * avoid the need to batch. If that won't work, it falls back to work_mem
> + * per worker and tries to process batches in parallel.
> + */
>
> One day we're going to need a better approach to this. I have no idea
> how, but this per-node, and now per_node * max_parallelism, approach has
> only implementation simplicity as its benefit.

I agree, and I am interested in that subject. In the meantime, I
think it'd be pretty unfair if parallel-oblivious hash join and
sort-merge join and every other parallel plan get to use work_mem * p
(and in some cases waste it with duplicate data), but Parallel Hash
isn't allowed to do the same (and put it to good use).

> +static HashJoinTuple
> +ExecParallelHashLoadTuple(HashJoinTable hashtable, MinimalTuple tuple,
> + dsa_pointer *shared)
> +{
>
> +static void
> +ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
> +{
>
>
>
> +/*
> + * Get the first tuple in a given bucket identified by number.
> + */
> +static HashJoinTuple
> +ExecHashFirstTupleInBucket(HashJoinTable hashtable, int bucketno)
> +{
> + if (hashtable->parallel_state)
> + {
> + dsa_pointer p =
> + dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);
>
> Can you make this, and possibly a few other places, more readable by
> introducing a temporary variable?

Done.

> +/*
> + * Insert a tuple at the front of a chain of tuples in DSA memory atomically.
> + */
> +static void
> +ExecParallelHashPushTuple(dsa_pointer_atomic *head,
> + HashJoinTuple tuple,
> + dsa_pointer tuple_shared)
> +{
> + do
> + {
> + tuple->next.shared = dsa_pointer_atomic_read(head);
> + } while (!dsa_pointer_atomic_compare_exchange(head,
> + &tuple->next.shared,
> + tuple_shared));
> +}
>
> This is hard to read.

Change to a for loop with a break.

> + * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
> + * be used repeatedly as required to coordinate expansions in the number of
> + * batches or buckets. Their phases are as follows:
> + *
> + * PHJ_GROW_BATCHES_ELECTING -- initial state
> + * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
> + * PHJ_GROW_BATCHES_REPARTITIONING -- all rep
> s/rep/repartition/?

Fixed.

> -
> +#include "utils/sharedtuplestore.h"
>
> deletes a separator newline.

Fixed.

> /* ----------------------------------------------------------------
> @@ -138,6 +236,18 @@ ExecHashJoin(PlanState *pstate)
> /* no chance to not build the hash table */
> node->hj_FirstOuterTupleSlot = NULL;
> }
> + else if (hashNode->parallel_state != NULL)
> + {
> + /*
> + * The empty-outer optimization is not implemented for
> + * shared hash tables, because no one participant can
> + * determine that there are no outer tuples, and it's not
> + * yet clear that it's worth the synchronization overhead
> + * of reaching consensus to figure that out. So we have
> + * to build the hash table.
> + */
> + node->hj_FirstOuterTupleSlot = NULL;
> + }
>
> Hm. Isn't MultiExecParallelHash already doing so?

I do support the empty-inner (empty table) optimisation.
MultiExecParallelHash makes sure that all workers agree on the total
number of inner tuples. Therefore they agree on whether to give up
early and not bother scanning the outer relation. The synchronisation
point required to implement this was unavoidable anyway (it's the
"main" synchronisation point in this algorithm: everyone has to be
finished building the hash table before anyone can probe it).

I don't support the empty-outer optimisation. I could support that
like this: all workers try to pull one tuple from the outer relation,
before even building the hash table. Then they would synchronise with
each other and see if anybody managed to get a tuple. If none of them
did, then we can give up even earlier and skip building the hash
table. The question is: is it worth introducing an extra
synchronisation point to reach consensus? It seems unlikely that the
outer relation is empty, so my answer is "no". There may be other
complications I haven't thought of.

> - node->hj_JoinState = HJ_NEED_NEW_OUTER;
> + if (hashtable->parallel_state)
> + {
> + Barrier *build_barrier;
> +
> + build_barrier = &hashtable->parallel_state->build_barrier;
> + if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
> + {
> + /*
> + * If multi-batch, we need to hash the outer relation
> + * up front.
> + */
> + if (hashtable->nbatch > 1)
> + ExecParallelHashJoinPartitionOuter(node);
> + BarrierArriveAndWait(build_barrier,
> + WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
> + }
> + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
> +
> + /* Each backend should now select a batch to work on. */
> + hashtable->curbatch = -1;
> + node->hj_JoinState = HJ_NEED_NEW_BATCH;
> +
> + continue;
> + }
> + else
> + node->hj_JoinState = HJ_NEED_NEW_OUTER;
>
> You know what I'm going to say about all these branches, and sigh.

BTW this is not per-tuple code -- it runs once at the end of hashing.
Not sure what you're looking for here.

> If we don't split this into two versions, we at least should store
> hashNode->parallel_state in a local var, so the compiler doesn't have to
> pull that out of memory after every external function call (of which
> there are a lot). In common cases it'll end up in a callee saved
> registers, and most of the called functions won't be too register
> starved (on x86-64).

Hmm. Well I did that already in v24 -- in many places there is now a
local variable called pstate.

> +/*
> + * Choose a batch to work on, and attach to it. Returns true if successful,
> + * false if there are no more batches.
> + */
> +static bool
> +ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
> +{
>
>
> + /*
> + * This batch is ready to probe. Return control to
> + * caller. We stay attached to batch_barrier so that the
> + * hash table stays alive until everyone's finish probing
>
> *finished?

Fixed.

> + case PHJ_BATCH_DONE:
> +
> + /*
> + * Already done. Detach and go around again (if any
> + * remain).
> + */
> + BarrierDetach(batch_barrier);
> +
> + /*
> + * We didn't work on this batch, but we need to observe
> + * its size for EXPLAIN.
> + */
> + ExecParallelHashUpdateSpacePeak(hashtable, batchno);
> + hashtable->batches[batchno].done = true;
> + hashtable->curbatch = -1;
> + break;
>
> Hm, maybe I'm missing something, but why is it guaranteed that "we
> didn't work on this batch"?

We just attached to it (see switch(BarrierAttach(batch_barrier))) and
jumped to this label. If it's PHJ_BATCH_DONE, then we know that some
other backend(s) did all the work and brought it to that state,
because otherwise our own backend-local flag
hashtable->batches[batchno].done would have been true and we'd have
skipped it.

In any case -- sorry about this -- the following patch that adds
EXPLAIN ANALYZE support actually removes that little bit of code
there. In the version you quote I wanted to make sure that every
backend had a peak at every batch's peak size. The follow-on patch
introduces extra shm state to instrument stuff better so that we get
per worker data into explain.c.

> +void
> +ExecShutdownHashJoin(HashJoinState *node)
> +{
> + /*
> + * By the time ExecEndHashJoin runs in a worker, shared memory has been
> + * destroyed. So this is our last chance to do any shared memory cleanup.
> + */
>
> This comment doesn't really make much sense to me.

Tried again. The point is that ExecShutdownXXX has to leave things in
a state where ExecEndXXX won't try to follow any pointers into DSM
segments, because it'll be unmapped.

> +void
> +ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
> +{
>
> could use a header comment.

Standard boiler-plate copied.

> a) The executor side is starting to look good.

Thanks!

> b) This is a lot of code.

Yeah.

On Thu, Nov 9, 2017 at 8:12 AM, Andres Freund <andres(at)anarazel(dot)de> wrote:
> @@ -747,7 +747,7 @@ try_hashjoin_path(PlannerInfo *root,
> * never have any output pathkeys, per comments in create_hashjoin_path.
> */
> initial_cost_hashjoin(root, &workspace, jointype, hashclauses,
> - outer_path, inner_path, extra);
> + outer_path, inner_path, extra, false);
>
> if (add_path_precheck(joinrel,
> workspace.startup_cost, workspace.total_cost,
> @@ -761,6 +761,7 @@ try_hashjoin_path(PlannerInfo *root,
> extra,
> outer_path,
> inner_path,
> + false, /* parallel_hash */
> extra->restrictlist,
> required_outer,
> hashclauses));
> @@ -776,6 +777,10 @@ try_hashjoin_path(PlannerInfo *root,
> * try_partial_hashjoin_path
> * Consider a partial hashjoin join path; if it appears useful, push it into
> * the joinrel's partial_pathlist via add_partial_path().
> + * The outer side is partial. If parallel_hash is true, then the inner path
> + * must be partial and will be run in parallel to create one or more shared
> + * hash tables; otherwise the inner path must be complete and a copy of it
> + * is run in every process to create separate identical private hash tables.
> */
>
> When do we have "or more shared hash tables" rather than one? Are you
> thinking about subordinate nodes?

If there are multiple batches, we'll load several hash tables into
memory at the same time.

> @@ -1839,6 +1846,10 @@ hash_inner_and_outer(PlannerInfo *root,
> * able to properly guarantee uniqueness. Similarly, we can't handle
> * JOIN_FULL and JOIN_RIGHT, because they can produce false null
> * extended rows. Also, the resulting path must not be parameterized.
> + * We should be able to support JOIN_FULL and JOIN_RIGHT for Parallel
> + * Hash, since in that case we're back to a single hash table with a
> + * single set of match bits for each batch, but that will require
> + * figuring out a deadlock-free way to wait for the probe to finish.
> */
>
> s/should be able/would be able/?

Done. It would definitely work (and worked in the earlier
deadlock-prone design).

> index 6a45b68e5df..2d38a5efae8 100644
> --- a/src/backend/storage/ipc/barrier.c
> +++ b/src/backend/storage/ipc/barrier.c
> @@ -451,7 +451,6 @@ BarrierDetachImpl(Barrier *barrier, bool arrive)
> release = true;
> barrier->arrived = 0;
> ++barrier->phase;
> - Assert(barrier->selected);
> barrier->selected = false;
> }
>
> Uh, what?

A fixup squashed into the wrong patch. Fixed.

As for the change itself: when I introduced BarrierArriveAndDetach()
it became possible for the barrier to advanced through phases without
anyone being selected, so the assertion wasn't valid.

> diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out
> index 35523bd8065..40a076d976f 100644
> --- a/src/test/regress/expected/join.out
> +++ b/src/test/regress/expected/join.out
> @@ -5821,6 +5821,9 @@ analyze extremely_skewed;
> insert into extremely_skewed
> select 42 as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
> from generate_series(1, 19000);
> +-- Make a relation with a couple of enormous tuples.
> +create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
> +alter table wide set (parallel_workers = 2);
>
> I'm doubtful this is actually going to be a wide tuple - this'll get
> compressed down quite a bit, no?
>
> postgres[26465][1]=# SELECT octet_length(t), pg_column_size(t) FROM wide ;
> ┌──────────────┬────────────────┐
> │ octet_length │ pg_column_size │
> ├──────────────┼────────────────┤
> │ 320000 │ 3671 │
> │ 320000 │ 3671 │
> └──────────────┴────────────────┘
> (2 rows)
>
>
> (and yes, it's ridiculous that a compressed datum of that size still
> takes up 3kb)

The tuple is small on disk, but it allows me to create a large tuple
in the hash table with the following incantation (hat-tip to Andrew
Gierth for this trick):

select length(max(s.t))
from wide left join (select id, coalesce(t, '') || '' as t from
wide) s using (id);

A non-strict expression and a left join result in a projected
detoasted decompressed monster tuple which you can confirm by sticking
an elog into ExecParallelHashLoadTuple() like so:

2017-11-10 13:57:07.193 NZDT [7337] LOG: tuple size = 320040

> +-- parallel with parallel-aware hash join
> +set max_parallel_workers_per_gather = 2;
> +set work_mem = '128kB';
> +set enable_parallel_hash = on;
>
> I think it'd be better if we structured the file so we just sat guc's
> with SET LOCAL inside a transaction.

I wrapped the whole region of join.sql concerned with hash joins in a
transaction that rolls back, so I don't have to write LOCAL. That's
just as good, right?

> All-in-all this part looks fairly boring.

Thank you.

--
Thomas Munro
http://www.enterprisedb.com

Attachment Content-Type Size
parallel-hash-v25.patchset.tgz application/x-gzip 65.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Pavel Stehule 2017-11-13 12:30:58 Re: proposal: schema variables
Previous Message Pavel Golub 2017-11-13 12:15:00 Re: proposal: schema variables