Re: [HACKERS] Parallel Hash take II

From: Andres Freund <andres(at)anarazel(dot)de>
To: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
Cc: Rushabh Lathia <rushabh(dot)lathia(at)gmail(dot)com>, Prabhat Sahu <prabhat(dot)sahu(at)enterprisedb(dot)com>, Peter Geoghegan <pg(at)bowt(dot)ie>, Robert Haas <robertmhaas(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>, Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Oleg Golovanov <rentech(at)mail(dot)ru>
Subject: Re: [HACKERS] Parallel Hash take II
Date: 2017-12-13 22:45:07
Message-ID: 20171213224507.ntl2n3w2qp4qhss6@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

Looking at the latest version of the tuplestore patch:

diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
new file mode 100644
index 00000000000..d1233221a58
--- /dev/null
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -0,0 +1,583 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedtuplestore.c
+ * Simple mechanism for sharing tuples between backends.
+ *
+ * This module provides a shared temporary tuple storage mechanism, providing
+ * a parallel-aware subset of the features of tuplestore.c. Multiple backends
+ * can write to a SharedTuplestore, and then multiple backends can later scan
+ * the stored tuples. Currently, the only scan type supported is a parallel
+ * scan where each backend reads an arbitrary subset of the tuples that were
+ * written.

Cool.

+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+ int ntuples; /* Number of tuples in this chunk. */
+ bool overflow; /* Continuation of previous chunk? */
+ char data[FLEXIBLE_ARRAY_MEMBER];
+} SharedTuplestoreChunk;

Ah. I was thinking we could have the 'overflow' variable be an int,
indicating the remaining length of the oversized tuple. That'd allow us
to skip ahead to the end of the oversized tuple in concurrent processes
after hitting it.

+/*
+ * Write a tuple. If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+ MinimalTuple tuple)
+{
+ size_t size;
+
+ /* Do we have our own file yet? */
+ if (accessor->write_file == NULL)
+ {
+ SharedTuplestoreParticipant *participant;
+ char name[MAXPGPATH];
+
+ /* Create one. Only this backend will write into it. */
+ sts_filename(name, accessor, accessor->participant);
+ accessor->write_file = BufFileCreateShared(accessor->fileset, name);
+
+ /* Set up the shared state for this backend's file. */
+ participant = &accessor->sts->participants[accessor->participant];
+ participant->writing = true; /* for assertions only */
+ }
+
+ /* Do we have space? */
+ size = accessor->sts->meta_data_size + tuple->t_len;
+ if (accessor->write_pointer + size >= accessor->write_end)
+ {
+ if (accessor->write_chunk == NULL)
+ {
+ /* First time through. Allocate chunk. */
+ accessor->write_chunk = (SharedTuplestoreChunk *)
+ MemoryContextAllocZero(accessor->context,
+ STS_CHUNK_PAGES * BLCKSZ);
+ accessor->write_chunk->ntuples = 0;
+ accessor->write_pointer = &accessor->write_chunk->data[0];
+ accessor->write_end = (char *)
+ accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
+ }
+ else
+ {
+ /* See if flushing helps. */
+ sts_flush_chunk(accessor);
+ }
+
+ /* It may still not be enough in the case of a gigantic tuple. */
+ if (accessor->write_pointer + size >= accessor->write_end)
+ {
+ size_t written;
+
+ /*
+ * We'll write the beginning of the oversized tuple, and then
+ * write the rest in some number of 'overflow' chunks.
+ */
+ if (accessor->write_pointer + accessor->sts->meta_data_size >=
+ accessor->write_end)
+ elog(ERROR, "meta-data too long");

That seems more like an Assert than a proper elog? Given that we're
calculating size just a few lines above...

+ if (accessor->sts->meta_data_size > 0)
+ memcpy(accessor->write_pointer, meta_data,
+ accessor->sts->meta_data_size);
+ written = accessor->write_end - accessor->write_pointer -
+ accessor->sts->meta_data_size;
+ memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
+ tuple, written);

Also, shouldn't the same Assert() be here as well if you have it above?

+static MinimalTuple
+sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+ MinimalTuple tuple;
+ uint32 size;
+ size_t remaining_size;
+ size_t this_chunk_size;
+ char *destination;
+
+ /*
+ * We'll keep track of bytes read from this chunk so that we can detect an
+ * overflowing tuples and switch to reading overflow pages.
+ */
+ if (accessor->sts->meta_data_size > 0)
+ {
+ if (BufFileRead(accessor->read_file,
+ meta_data,
+ accessor->sts->meta_data_size) !=
+ accessor->sts->meta_data_size)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail("Short read while reading meta-data")));

The errdetail doesn't follow the style guide (not a sentence ending with
.), and seems internal-ish. I'm ok with keeping it, but perhaps we
should change all these to be errdetail_internal()? Seems pointless to
translate all of them.

+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+ SharedTuplestoreParticipant *p;
+ BlockNumber read_page;
+ bool eof;
+
+ for (;;)
+ {
+ /* Can we read more tuples from the current chunk? */
+ if (accessor->read_ntuples < accessor->read_ntuples_available)
+ return sts_read_tuple(accessor, meta_data);
+
+ /* Find the location of a new chunk to read. */
+ p = &accessor->sts->participants[accessor->read_participant];
+
+ LWLockAcquire(&p->lock, LW_EXCLUSIVE);
+ eof = p->read_page >= p->npages;
+ if (!eof)
+ {
+ read_page = p->read_page;
+ p->read_page += STS_CHUNK_PAGES;
+ }
+ LWLockRelease(&p->lock);

So if we went to the world I'm suggesting, with overflow containing the
length till the end of the tuple, this'd probably would have to look a
bit different.

+ if (!eof)
+ {
+ SharedTuplestoreChunk chunk_header;
+
+ /* Make sure we have the file open. */
+ if (accessor->read_file == NULL)
+ {
+ char name[MAXPGPATH];
+
+ sts_filename(name, accessor, accessor->read_participant);
+ accessor->read_file =
+ BufFileOpenShared(accessor->fileset, name);
+ if (accessor->read_file == NULL)
+ elog(ERROR, "could not open temporary file %s", name);

Isn't this more an Assert or just not anything? There's now way
BufFileOpenShared should ever return NULL, no?

+
+ /* Seek and load the chunk header. */
+ if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail("Could not seek to next block")));
+ if (BufFileRead(accessor->read_file, &chunk_header,
+ offsetof(SharedTuplestoreChunk, data)) !=
+ offsetof(SharedTuplestoreChunk, data))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from shared tuplestore temporary file"),
+ errdetail("Short read while reading chunk header")));
+
+ /* If this is an overflow chunk, we skip it. */
+ if (chunk_header.overflow)
+ continue;
+
+ accessor->read_ntuples = 0;
+ accessor->read_ntuples_available = chunk_header.ntuples;
+ accessor->read_bytes = offsetof(SharedTuplestoreChunk, data);

Perhaps somewhere around here comment that we'll just loop around and
call sts_read_tuple() in the next loop iteration?

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Langote 2017-12-14 00:50:32 Re: [Sender Address Forgery]Re: [HACKERS] path toward faster partition pruning
Previous Message Jeff Janes 2017-12-13 21:55:09 Re: Top-N sorts verses parallelism