Re: [HACKERS] Parallel Hash take II

From: Andres Freund <andres(at)anarazel(dot)de>
To: Thomas Munro <thomas(dot)munro(at)enterprisedb(dot)com>
Cc: Rushabh Lathia <rushabh(dot)lathia(at)gmail(dot)com>, Prabhat Sahu <prabhat(dot)sahu(at)enterprisedb(dot)com>, Peter Geoghegan <pg(at)bowt(dot)ie>, Robert Haas <robertmhaas(at)gmail(dot)com>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>, Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, Haribabu Kommi <kommi(dot)haribabu(at)gmail(dot)com>, Oleg Golovanov <rentech(at)mail(dot)ru>
Subject: Re: [HACKERS] Parallel Hash take II
Date: 2017-11-14 21:24:39
Message-ID: 20171114212439.rfj26sdhhcoohyhz@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 2017-11-14 01:30:30 +1300, Thomas Munro wrote:
> > +-- The "good" case: batches required, but we plan the right number; we
> > +-- plan for 16 batches, and we stick to that number, and peak memory
> > +-- usage says within our work_mem budget
> > +-- non-parallel
> > +set max_parallel_workers_per_gather = 0;
> > +set work_mem = '128kB';
> >
> > So how do we know that's actually the case we're testing rather than
> > something arbitrarily different? There's IIRC tests somewhere that just
> > filter the json explain output to the right parts...
>
> Yeah, good idea. My earlier attempts to dump out the hash join
> dimensions ran into problems with architecture sensitivity and then
> some run-to-run non-determinism in the parallel case (due to varying
> fragmentation depending on how many workers get involved in time).
> The attached version tells you about batch growth without reporting
> the exact numbers, except in the "ugly" case where we know that there
> is only one possible outcome because the extreme skew detector is
> guaranteed to go off after the first nbatch increase (I got rid of all
> other tuples except ones with the same key to make this true).

Hm. The way you access this doesn't quite seem right:
+--
+-- exercises for the hash join code
+--
+begin;
+set min_parallel_table_scan_size = 0;
+set parallel_setup_cost = 0;
+-- Extract bucket and batch counts from an explain analyze plan. In
+-- general we can't make assertions about how many batches (or
+-- buckets) will be required because it can vary, but we can in some
+-- special cases and we can check for growth.
+create or replace function hash_join_batches(query text)
+returns table (original int, final int) language plpgsql
+as
+$$
+declare
+ line text;
+ matches text[];
+begin
+ for line in
+ execute 'explain analyze ' || query
+ loop
+ matches := (regexp_matches(line, ' Batches: ([0-9]+) \(originally ([0-9]+)\)'));
+ if matches is not null then
+ original := matches[2]::int;
+ final := matches[1]::int;
+ return next;
+ else
+ matches := regexp_matches(line, ' Batches: ([0-9]+)');
+ if matches is not null then
+ original := matches[1]::int;
+ final := original;
+ return next;
+ end if;
+ end if;
+ end loop;
+end;
+$$;

Why not use format json and access the output that way? Then you can be
sure you access the right part of the tree and such?

> > + else
> > + {
> > + errno = stat_errno;
> > + elog(LOG, "could not stat file \"%s\": %m", path);
> > + }
> >
> > All these messages are "not expected to ever happen" ones, right?
>
> You'd have to suffer a nasty filesystem failure, remount read-only or
> manually with permissions or something. Not sure where the line is,
> but I've changed all of these new elog calls to ereport.

Oh, I'd been fine keeping them as elogs. The one exception would have
been out-of-space cases which'll occur in practice.

> > + if (vfdP->fdstate & FD_TEMP_FILE_LIMIT)
> > + {
> > + /* Subtract its size from current usage (do first in case of error) */
> > + temporary_files_size -= vfdP->fileSize;
> > + vfdP->fileSize = 0;
> > + }
> >
> > So, is it right to do so unconditionally and without regard for errors?
> > If the file isn't deleted, it shouldn't be subtracted from fileSize. I
> > guess you're managing that through the flag, but that's not entirely
> > obvious.
>
> I think it is. Reasoning: The existing behaviour of fd.c is that if
> we don't manage to delete temporary files, we'll LOG something and
> forget about them (they'll be cleaned up eventually by a clean restart
> or human intervention).

IOW: Never ;)

> > +/*
> > + * Write a tuple. If a meta-data size was provided to sts_initialize, then a
> > + * pointer to meta data of that size must be provided.
> > + */
> > +void
> > +sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
> > + MinimalTuple tuple)
> > +{
> >
> > + /* Do we have space? */
> > + size = accessor->sts->meta_data_size + tuple->t_len;
> > + if (accessor->write_pointer + size >= accessor->write_end)
> > + {
> > + /* Try flushing to see if that creates enough space. */
> > + if (accessor->write_chunk != NULL)
> > + sts_flush_chunk(accessor);
> > +
> > + /*
> > + * It may still not be enough in the case of a gigantic tuple, or if
> > + * we haven't created a chunk buffer at all yet.
> > + */
> > + if (accessor->write_pointer + size >= accessor->write_end)
> > + {
> > + SharedTuplestoreParticipant *participant;
> > + size_t space_needed;
> > + int pages_needed;
> > +
> > + /* How many pages to hold this data and the chunk header? */
> > + space_needed = offsetof(SharedTuplestoreChunk, data) + size;
> > + pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ;
> > + pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES);
> > +
> > + /*
> > + * Double the chunk size until it's big enough, and record that
> > + * fact in the shared expansion log so that readers know about it.
> > + */
> > + participant = &accessor->sts->participants[accessor->participant];
> > + while (accessor->write_pages < pages_needed)
> > + {
> > + accessor->write_pages *= 2;
> > + participant->chunk_expansion_log[participant->chunk_expansions++] =
> > + accessor->write_page;
> > + }
> >
> > Hm. Isn't that going to be pretty unfunny if you have one large and a
> > lot of small tuples?
>
> It will increase the parallel scan grain size, and then keep that size
> for the rest of the contents of one backend's output file. I am aware
> of two downsides to using a large parallel grain:

> 1. It determines the amount of unfairness when we run out of data:
> it's the maximum amount of extra data that the unlucky last worker can
> finish up with after all the others have finished. I think this
> effect is reduced by higher level factors: when a reader runs out of
> data in one backend's file, it'll start reading another backend's
> file. If it's hit the end of all backends' files and this is an outer
> batch, Parallel Hash will just go and work on another batch
> immediately.

Consider e.g. what happens if there's the occasional 500MB datum, and
the rest's very small...

> Better ideas?

Not really. I'm more than a bit suspicous of this solution, but I don't
really have a great suggestion otherwise. One way to combat extreme
size skew would be to put very large datums into different files.

But I think we probably can go with your approach for now, ignoring my
failure prone spidey senses ;)

> > + /* Find the location of a new chunk to read. */
> > + p = &accessor->sts->participants[accessor->read_participant];
> > +
> > + SpinLockAcquire(&p->mutex);
> > + eof = p->read_page >= p->npages;
> > + if (!eof)
> > + {
> > + /*
> > + * Figure out how big this chunk is. It will almost always be the
> > + * same as the last chunk loaded, but if there is one or more
> > + * entry in the chunk expansion log for this page then we know
> > + * that it doubled that number of times. This avoids the need to
> > + * do IO to adjust the read head, so we don't need to hold up
> > + * concurrent readers. (An alternative to this extremely rarely
> > + * run loop would be to use more space storing the new size in the
> > + * log so we'd have 'if' instead of 'while'.)
> > + */
> > + read_page = p->read_page;
> > + while (p->chunk_expansion < p->chunk_expansions &&
> > + p->chunk_expansion_log[p->chunk_expansion] == p->read_page)
> > + {
> > + p->chunk_pages *= 2;
> > + p->chunk_expansion++;
> > + }
> > + chunk_pages = p->chunk_pages;
> > +
> > + /* The next reader will start after this chunk. */
> > + p->read_page += chunk_pages;
> > + }
> > + SpinLockRelease(&p->mutex);
> >
> > This looks more like the job of an lwlock rather than a spinlock.
>
> I switched to the alternative algorithm mentioned in parentheses in
> the comment. It uses a bit more space, but that loop is gone. In my
> mind this is much like Parallel Seq Scan: we acquire a spinlock just
> to advance the block pointer. The added complication is that we also
> check if the chunk size has changed, which clang renders as this many
> instructions:
>
> postgres[0x10047eee0] <+176>: movslq 0x144(%r15,%rbx), %rcx
> postgres[0x10047eee8] <+184>: cmpl 0x140(%r15,%rbx), %ecx
> postgres[0x10047eef0] <+192>: jge 0x10047ef16 ;
> <+230> at sharedtuplestore.c:489
> postgres[0x10047eef2] <+194>: leaq (%r15,%rbx), %rdx
> postgres[0x10047eef6] <+198>: cmpl %r12d, 0x40(%rdx,%rcx,8)
> postgres[0x10047eefb] <+203>: jne 0x10047ef16 ;
> <+230> at sharedtuplestore.c:489
> postgres[0x10047eefd] <+205>: leaq 0x144(%r15,%rbx), %rsi
> postgres[0x10047ef05] <+213>: leal 0x1(%rcx), %edi
> postgres[0x10047ef08] <+216>: movl %edi, (%rsi)
> postgres[0x10047ef0a] <+218>: movl 0x44(%rdx,%rcx,8), %ecx
> postgres[0x10047ef0e] <+222>: movl %ecx, 0x148(%r15,%rbx)
> postgres[0x10047ef16] <+230>: movl 0x148(%r15,%rbx), %r15d
>
> That should be OK, right?

It's not too bad. Personally I'm of the opinion though that pretty much
no new spinlocks should be added - their worst case performance
characteristics are bad enough for that to be only worth the
experimentation in case swhere each cycle really matters and where
contention is unlikely.

> > One day we're going to need a better approach to this. I have no idea
> > how, but this per-node, and now per_node * max_parallelism, approach has
> > only implementation simplicity as its benefit.
>
> I agree, and I am interested in that subject. In the meantime, I
> think it'd be pretty unfair if parallel-oblivious hash join and
> sort-merge join and every other parallel plan get to use work_mem * p
> (and in some cases waste it with duplicate data), but Parallel Hash
> isn't allowed to do the same (and put it to good use).

I'm not sure I care about fairness between pieces of code ;)

> > - node->hj_JoinState = HJ_NEED_NEW_OUTER;
> > + if (hashtable->parallel_state)
> > + {
> > + Barrier *build_barrier;
> > +
> > + build_barrier = &hashtable->parallel_state->build_barrier;
> > + if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
> > + {
> > + /*
> > + * If multi-batch, we need to hash the outer relation
> > + * up front.
> > + */
> > + if (hashtable->nbatch > 1)
> > + ExecParallelHashJoinPartitionOuter(node);
> > + BarrierArriveAndWait(build_barrier,
> > + WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
> > + }
> > + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
> > +
> > + /* Each backend should now select a batch to work on. */
> > + hashtable->curbatch = -1;
> > + node->hj_JoinState = HJ_NEED_NEW_BATCH;
> > +
> > + continue;
> > + }
> > + else
> > + node->hj_JoinState = HJ_NEED_NEW_OUTER;
> >
> > You know what I'm going to say about all these branches, and sigh.
>
> BTW this is not per-tuple code -- it runs once at the end of hashing.
> Not sure what you're looking for here.

It was more a general statement about all the branches in nodeHashjoin,
than about these specific branches. Should've made that clearer. There's
definitely branches in very common parts:
case HJ_NEED_NEW_OUTER:

/*
* We don't have an outer tuple, try to get the next one
*/
if (hashtable->parallel_state)
outerTupleSlot =
ExecParallelHashJoinOuterGetTuple(outerNode, node,
&hashvalue);
else
outerTupleSlot =
ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);

I don't think you should do so now, but I think a reasonable approach
here would be to move the HJ_BUILD_HASHTABLE code into a separate
function (it really can't be hot). Then have specialized ExecHashJoin()
versions for parallel/non-parallel and potentially for outer/inner/anti.

> > If we don't split this into two versions, we at least should store
> > hashNode->parallel_state in a local var, so the compiler doesn't have to
> > pull that out of memory after every external function call (of which
> > there are a lot). In common cases it'll end up in a callee saved
> > registers, and most of the called functions won't be too register
> > starved (on x86-64).
>
> Hmm. Well I did that already in v24 -- in many places there is now a
> local variable called pstate.

See above piece of code, and a few others, in nodeHash.

> > I think it'd be better if we structured the file so we just sat guc's
> > with SET LOCAL inside a transaction.
>
> I wrapped the whole region of join.sql concerned with hash joins in a
> transaction that rolls back, so I don't have to write LOCAL. That's
> just as good, right?

Not really imo. Being able to read a test without going through all
previous ones is a lot better.

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Dmitry Dolgov 2017-11-14 21:25:00 Re: [HACKERS] [PATCH] Generic type subscripting
Previous Message Robert Haas 2017-11-14 20:51:40 Re: [HACKERS] Proposal: Local indexes for partitioned table