Re: Parallel Hash take II

From: Andres Freund <andres(at)anarazel(dot)de>
To: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
Cc: Rushabh Lathia <rushabh(dot)lathia(at)gmail(dot)com>, Prabhat Sahu <prabhat(dot)sahu(at)enterprisedb(dot)com>, Peter Geoghegan <pg(at)bowt(dot)ie>, Robert Haas <robertmhaas(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>, Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Oleg Golovanov <rentech(at)mail(dot)ru>
Subject: Re: Parallel Hash take II
Date: 2017-11-07 21:01:55
Message-ID: 20171107210155.kuksdd324kgz5oev@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

Here's a review of v24

+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+-- Make a simple relation with well distributed keys and correctly
+-- estimated size.
+create table simple as
+ select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table simple set (parallel_workers = 2);
+analyze simple;
+-- Make a relation whose size we will under-estimate. We want stats
+-- to say 1000 rows, but actually there are 20,000 rows.
+create table bigger_than_it_looks as
+ select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+alter table bigger_than_it_looks set (autovacuum_enabled = 'false');
+alter table bigger_than_it_looks set (parallel_workers = 2);
+delete from bigger_than_it_looks where id <= 19000;
+vacuum bigger_than_it_looks;
+analyze bigger_than_it_looks;
+insert into bigger_than_it_looks
+ select generate_series(1, 19000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';

It seems kinda easier to just manipulate ndistinct and reltuples...

+set max_parallel_workers_per_gather = 0;
+set work_mem = '4MB';

I hope there's a fair amount of slop here - with different archs you're
going to see quite some size differences.

+-- The "good" case: batches required, but we plan the right number; we
+-- plan for 16 batches, and we stick to that number, and peak memory
+-- usage says within our work_mem budget
+-- non-parallel
+set max_parallel_workers_per_gather = 0;
+set work_mem = '128kB';

So how do we know that's actually the case we're testing rather than
something arbitrarily different? There's IIRC tests somewhere that just
filter the json explain output to the right parts...

+/*
+ * Build the name for a given segment of a given BufFile.
+ */
+static void
+MakeSharedSegmentName(char *name, const char *buffile_name, int segment)
+{
+ snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
+}

Not a fan of this name - you're not "making" a filename here (as in
allocating or such). I think I'd just remove the Make prefix.

+/*
+ * Open a file that was previously created in another backend with
+ * BufFileCreateShared in the same SharedFileSet using the same name. The
+ * backend that created the file must have called BufFileClose() or
+ * BufFileExport() to make sure that it is ready to be opened by other
+ * backends and render it read-only.
+ */

Is it actually guaranteed that it's another backend / do we rely on
that?

+BufFile *
+BufFileOpenShared(SharedFileSet *fileset, const char *name)
+{

+ /*
+ * If we didn't find any files at all, then no BufFile exists with this
+ * tag.
+ */
+ if (nfiles == 0)
+ return NULL;

s/taag/name/?

+/*
+ * Delete a BufFile that was created by BufFileCreateShared in the given
+ * SharedFileSet using the given name.
+ *
+ * It is not necessary to delete files explicitly with this function. It is
+ * provided only as a way to delete files proactively, rather than waiting for
+ * the SharedFileSet to be cleaned up.
+ *
+ * Only one backend should attempt to delete a given name, and should know
+ * that it exists and has been exported or closed.
+ */
+void
+BufFileDeleteShared(SharedFileSet *fileset, const char *name)
+{
+ char segment_name[MAXPGPATH];
+ int segment = 0;
+ bool found = false;
+
+ /*
+ * We don't know how many segments the file has. We'll keep deleting
+ * until we run out. If we don't manage to find even an initial segment,
+ * raise an error.
+ */
+ for (;;)
+ {
+ MakeSharedSegmentName(segment_name, name, segment);
+ if (!SharedFileSetDelete(fileset, segment_name, true))
+ break;
+ found = true;
+ ++segment;
+ }

Hm. Do we properly delete all the files via the resowner mechanism if
this fails midway? I.e. if there are no leading segments? Also wonder if
this doesn't need a CFI check.

+void
+PathNameCreateTemporaryDir(const char *basedir, const char *directory)
+{
+ if (mkdir(directory, S_IRWXU) < 0)
+ {
+ if (errno == EEXIST)
+ return;
+
+ /*
+ * Failed. Try to create basedir first in case it's missing. Tolerate
+ * ENOENT to close a race against another process following the same
+ * algorithm.
+ */
+ if (mkdir(basedir, S_IRWXU) < 0 && errno != ENOENT)
+ elog(ERROR, "cannot create temporary directory \"%s\": %m",
+ basedir);

ENOENT or EEXIST?

+File
+PathNameCreateTemporaryFile(const char *path, bool error_on_failure)
+{
+ File file;
+
+ /*
+ * Open the file. Note: we don't use O_EXCL, in case there is an orphaned
+ * temp file that can be reused.
+ */
+ file = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY);
+ if (file <= 0)
+ {
+ if (error_on_failure)
+ elog(ERROR, "could not create temporary file \"%s\": %m", path);
+ else
+ return file;
+ }
+
+ /* Mark it for temp_file_limit accounting. */
+ VfdCache[file].fdstate |= FD_TEMP_FILE_LIMIT;
+
+ /*
+ * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we still
+ * want to make sure they get closed at end of xact.
+ */
+ ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+ ResourceOwnerRememberFile(CurrentResourceOwner, file);
+ VfdCache[file].resowner = CurrentResourceOwner;

So maybe I'm being pedantic here, but wouldn't the right order be to do
ResourceOwnerEnlargeFiles() *before* creating the file? It's a memory
allocating operation, so it can fail, which'd leak the file.

+/*
+ * Open a file that was created with PathNameCreateTemporaryFile, possibly in
+ * another backend. Files opened this way don't count agains the

s/agains/against/

+ * temp_file_limit of the caller, are read-only and are automatically closed
+ * at the end of the transaction but are not deleted on close.
+ */
+File
+PathNameOpenTemporaryFile(const char *path)
+{
+ File file;
+
+ /* We open the file read-only. */
+ file = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
+
+ /* If no such file, then we don't raise an error. */
+ if (file <= 0 && errno != ENOENT)
+ elog(ERROR, "could not open temporary file \"%s\": %m", path);
+
+ if (file > 0)
+ {
+ /*
+ * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we
+ * still want to make sure they get closed at end of xact.
+ */
+ ResourceOwnerEnlargeFiles(CurrentResourceOwner);
+ ResourceOwnerRememberFile(CurrentResourceOwner, file);
+ VfdCache[file].resowner = CurrentResourceOwner;

Same complaint as above, ResourceOwnerEnlargeFiles() should be done
earlier.

+/*
+ * Delete a file by pathname. Return true if the file existed, false if
+ * didn't.
+ */
+bool
+PathNameDeleteTemporaryFile(const char *path, bool error_on_failure)
+{
+ struct stat filestats;
+ int stat_errno;
+
+ /* Get the final size for pgstat reporting. */
+ if (stat(path, &filestats) != 0)
+ stat_errno = errno;
+ else
+ stat_errno = 0;
+
+ /*
+ * Unlike FileClose's automatic file deletion code, we tolerate
+ * non-existence to support BufFileDeleteShared which doesn't know how
+ * many segments it has to delete until it runs out.
+ */
+ if (stat_errno == ENOENT)
+ return false;
+
+ if (unlink(path) < 0)
+ {
+ if (errno != ENOENT)
+ elog(error_on_failure ? ERROR : LOG,
+ "cannot unlink temporary file \"%s\": %m", path);
+ return false;
+ }
+
+ if (stat_errno == 0)
+ ReportTemporaryFileUsage(path, filestats.st_size);
+ else
+ {
+ errno = stat_errno;
+ elog(LOG, "could not stat file \"%s\": %m", path);
+ }

All these messages are "not expected to ever happen" ones, right?

+ return true;
+}
+
/*
* close a file when done with it
*/
@@ -1537,10 +1747,17 @@ FileClose(File file)
Delete(file);
}

+ if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
+ {
+ /* Subtract its size from current usage (do first in case of error) */
+ temporary_files_size -= vfdP->fileSize;
+ vfdP->fileSize = 0;
+ }

So, is it right to do so unconditionally and without regard for errors?
If the file isn't deleted, it shouldn't be subtracted from fileSize. I
guess you're managing that through the flag, but that's not entirely
obvious.

diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c
new file mode 100644
index 00000000000..6da80838b37
--- /dev/null
+++ b/src/backend/storage/file/sharedfileset.c
@@ -0,0 +1,240 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedfileset.c
+ * Shared temporary file management.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/storage/file/sharedfileset.c
+ *
+ *-------------------------------------------------------------------------
+ */

A slightly bigger comment wouldn't hurt.

+/*
+ * Attach to a set of directories that was created with SharedFileSetInit.
+ */
+void
+SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
+{
+ bool success;
+
+ SpinLockAcquire(&fileset->mutex);
+ if (fileset->refcnt == 0)
+ success = false;

I've not read finished reading through this, but is this safe? If the
segment's gone, is the spinlock guaranteed to still be a spinlock? I
suspect this isn't a problem because just the underlying data is
removed, but the SharedFileSet stays alive?

+static void
+GetSharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace)
+{
+ char tempdirpath[MAXPGPATH];
+
+ GetTempTablespacePath(tempdirpath, tablespace);
+ snprintf(path, MAXPGPATH, "%s/%s%d.%d.sharedfileset" PG_TEMP_SUBDIR_SUFFIX,
+ tempdirpath, PG_TEMP_FILE_PREFIX,
+ fileset->creator_pid, fileset->number);
+}

+/*
+ * Sorting hat to determine which tablespace a given shared temporary file
+ * belongs in.
+ */
+static Oid
+ChooseTablespace(const SharedFileSet *fileset, const char *name)
+{
+ uint32 hash = hash_any((const unsigned char *) name, strlen(name));
+
+ return fileset->tablespaces[hash % fileset->ntablespaces];
+}

Hm. I wonder if just round-robin through these isn't a better approach.

+/*
+ * Compute the full path of a file in a SharedFileSet.
+ */
+static void
+GetSharedFilePath(char *path, SharedFileSet *fileset, const char *name)
+{
+ char dirpath[MAXPGPATH];
+
+ GetSharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
+ snprintf(path, MAXPGPATH, "%s/" PG_TEMP_FILE_PREFIX ".%s", dirpath, name);
+}
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 4c35ccf65eb..8b91d5a6ebe 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -528,16 +528,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
PrintRelCacheLeakWarning(res);
RelationClose(res);
}
-
- /* Ditto for dynamic shared memory segments */
- while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
- {
- dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
-
- if (isCommit)
- PrintDSMLeakWarning(res);
- dsm_detach(res);
- }
}
else if (phase == RESOURCE_RELEASE_LOCKS)
{
@@ -654,6 +644,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
PrintFileLeakWarning(res);
FileClose(res);
}
+
+ /* Ditto for dynamic shared memory segments */
+ while (ResourceArrayGetAny(&(owner->dsmarr), &foundres))
+ {
+ dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres);
+
+ if (isCommit)
+ PrintDSMLeakWarning(res);
+ dsm_detach(res);
+ }
}

Is that entirely unproblematic? Are there any DSM callbacks that rely on
locks still being held? Please split this part into a separate commit
with such analysis.

+/* The initial size of chunks in pages. */
+#define STS_MIN_CHUNK_PAGES 4

Could use quick description at how you've arrived at that specific
value.

+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+ int npages; /* Size of this chunk in BLCKSZ pages. */
+ int ntuples; /* Number of tuples in this chunk. */
+ char data[FLEXIBLE_ARRAY_MEMBER];
+} SharedTuplestoreChunk;
+
+/* Per-participant shared state. */
+typedef struct SharedTuplestoreParticipant
+{
+ slock_t mutex;
+ BlockNumber read_page; /* Page number for next read. */
+ BlockNumber npages; /* Number of pages written. */
+ bool writing; /* Used only for assertions. */
+
+ /*
+ * We need variable sized chunks, because we might be asked to store
+ * gigantic tuples. To avoid the locking contention that would come from
+ * reading chunk sizes from disk, we store the chunk size for ranges of
+ * the file in a compact format in memory. chunk_pages starts out at
+ * STS_MIN_CHUNK_PAGES and then doubles each time we reach a page listed
+ * in chunk_expansion_log.
+ */
+ BlockNumber chunk_expansion_log[sizeof(BlockNumber) * CHAR_BIT];
+ int chunk_expansions;
+ int chunk_expansion;
+ int chunk_pages;

This needs more explanation.

+/*
+ * Initialize a SharedTuplestore in existing shared memory. There must be
+ * space for sts_estimate(participants) bytes. If flags is set to the value
+ * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
+ * eagerly (but this isn't yet implemented).

s/iset set to the value/includes the value/ - otherwise it's not really
a flags argument.

+ * Tuples that are stored may optionally carry a piece of fixed sized
+ * meta-data which will be retrieved along with the tuple. This is useful for
+ * the hash codes used for multi-batch hash joins, but could have other
+ * applications.

"hash codes"?

+/*
+ * Prepare to rescan. Only participant should call this. After it returns,
+ * all participants should call sts_begin_parallel_scan() and then loop over
+ * sts_parallel_scan_next().
+ */

s/should/may/? Also maybe document what happens with in-progress reads
(or rather them not being allowed to exist)?

+/*
+ * Write a tuple. If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+ MinimalTuple tuple)
+{

+ /* Do we have space? */
+ size = accessor->sts->meta_data_size + tuple->t_len;
+ if (accessor->write_pointer + size >= accessor->write_end)
+ {
+ /* Try flushing to see if that creates enough space. */
+ if (accessor->write_chunk != NULL)
+ sts_flush_chunk(accessor);
+
+ /*
+ * It may still not be enough in the case of a gigantic tuple, or if
+ * we haven't created a chunk buffer at all yet.
+ */
+ if (accessor->write_pointer + size >= accessor->write_end)
+ {
+ SharedTuplestoreParticipant *participant;
+ size_t space_needed;
+ int pages_needed;
+
+ /* How many pages to hold this data and the chunk header? */
+ space_needed = offsetof(SharedTuplestoreChunk, data) + size;
+ pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ;
+ pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES);
+
+ /*
+ * Double the chunk size until it's big enough, and record that
+ * fact in the shared expansion log so that readers know about it.
+ */
+ participant = &accessor->sts->participants[accessor->participant];
+ while (accessor->write_pages < pages_needed)
+ {
+ accessor->write_pages *= 2;
+ participant->chunk_expansion_log[participant->chunk_expansions++] =
+ accessor->write_page;
+ }

Hm. Isn't that going to be pretty unfunny if you have one large and a
lot of small tuples?

+ /* Create the output buffer. */
+ if (accessor->write_chunk != NULL)
+ pfree(accessor->write_chunk);
+ accessor->write_chunk = (SharedTuplestoreChunk *)
+ palloc0(accessor->write_pages * BLCKSZ);

Are we guaranteed to be in a long-lived memory context here?

+/*
+ * Get the next tuple in the current parallel scan.
+ */
+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+ SharedTuplestoreParticipant *p;
+ BlockNumber read_page;
+ int chunk_pages;
+ bool eof;
+
+ for (;;)
+ {
+ /* Can we read more tuples from the current chunk? */
+ if (likely(accessor->read_ntuples < accessor->read_ntuples_available))
+ return sts_read_tuple(accessor, meta_data);

I'm not convinced this is a good use of likely/unlikely (not biased and
not performance critical enough).

+ /* Find the location of a new chunk to read. */
+ p = &accessor->sts->participants[accessor->read_participant];
+
+ SpinLockAcquire(&p->mutex);
+ eof = p->read_page >= p->npages;
+ if (!eof)
+ {
+ /*
+ * Figure out how big this chunk is. It will almost always be the
+ * same as the last chunk loaded, but if there is one or more
+ * entry in the chunk expansion log for this page then we know
+ * that it doubled that number of times. This avoids the need to
+ * do IO to adjust the read head, so we don't need to hold up
+ * concurrent readers. (An alternative to this extremely rarely
+ * run loop would be to use more space storing the new size in the
+ * log so we'd have 'if' instead of 'while'.)
+ */
+ read_page = p->read_page;
+ while (p->chunk_expansion < p->chunk_expansions &&
+ p->chunk_expansion_log[p->chunk_expansion] == p->read_page)
+ {
+ p->chunk_pages *= 2;
+ p->chunk_expansion++;
+ }
+ chunk_pages = p->chunk_pages;
+
+ /* The next reader will start after this chunk. */
+ p->read_page += chunk_pages;
+ }
+ SpinLockRelease(&p->mutex);

This looks more like the job of an lwlock rather than a spinlock.

+/*
+ * Create the name used for our shared BufFiles.
+ */
+static void
+make_name(char *name, SharedTuplestoreAccessor *accessor, int participant)
+{
+ snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
+}

Name's a bit generic. And it's still not really making ;)

Going to buy some groceries and then look at the next patches.

- Andres

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Robert Haas 2017-11-07 21:21:55 Re: why not parallel seq scan for slow functions
Previous Message Robert Haas 2017-11-07 20:48:04 Re: [PATCH] Overestimated filter cost and its mitigation