Re: Avoiding hash join batch explosions with extreme skew and weird stats

From: Melanie Plageman <melanieplageman(at)gmail(dot)com>
To: Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>
Cc: Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Jeff Davis <pgsql(at)j-davis(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Jesse Zhang <sbjesse(at)gmail(dot)com>, Heikki Linnakangas <hlinnaka(at)iki(dot)fi>, david(dot)g(dot)kimura(at)gmail(dot)com, soumyadeep2007(at)gmail(dot)com
Subject: Re: Avoiding hash join batch explosions with extreme skew and weird stats
Date: 2020-08-31 22:13:06
Message-ID: CAAKRu_aLMRHX6_y=K5i5wBMTMQvoPMO8DT3eyCziTHjsY11cVA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Attached is the current version of adaptive hash join with two
significant changes as compared to v10:

1) Implements spilling of batch 0 for parallel-aware parallel hash join.
2) Moves "striping" of fallback batches from "build" to "load" stage
It includes several smaller changes as well.

Batch 0 spilling is necessary when the hash table for batch 0 cannot fit
in memory and allows us to use the "hashloop" strategy for batch 0.

Spilling of batch 0 necessitated the addition of a few new pieces of
code. The most noticeable one is probably the hash table eviction phase
machine. If batch 0 was marked as a "fallback" batch in
ExecParallelHashIncreaseNumBatches() PHJ_GROW_BATCHES_DECIDING phase,
any future attempt to insert a tuple that would exceed the space_allowed
triggers eviction of the hash table.
ExecParallelHashTableEvictBatch0() will evict all batch 0 tuples in
memory into spill files in a batch 0 inner SharedTuplestore.

This means that when repartitioning batch 0 in the future, both the
batch 0 spill file and the hash table need to be drained and relocated
into the new generation of batches and the hash table. If enough memory
is freed up from batch 0 tuples relocating to other batches, then it is
possible that tuples from the batch 0 spill files will go back into the
hash table.
After batch 0 is evicted, the build stage proceeds as normal.

The main alternative to this design that we considered was to "close" the
hash table after it is full. That is, if batch 0 has been marked to fall
back, once it is full, all subsequent tuples pulled from the outer child
would bypass the hash table altogether and go directly into a spill
file.

We chose the hash table eviction route because I thought it might be
better to write chunks of the hashtable into a file together rather than
sporadically write new batch 0 tuples to spill files as they are
pulled out of the child node. However, since the same sts_puttuple() API
is used in both cases, it is highly possible this won't actually matter
and we will do the same amount of I/O.
Both designs involved changing the flow of the code for inserting and
repartitioning tuples, so I figured that I would choose one, do some
testing, and try the other one later after more discussion and review.

This patch also introduces a significant change to how tuples are split
into stripes. Previously, during the build stage, tuples were written to
spill files in the SharedTuplestore with a stripe number in the metadata
section of the MinimalTuple.
For a batch that had been designated a "fallback" batch,
once the space_allowed had been exhausted, the shared stripe number
would be incremented and the new stripe number was written in the tuple
metadata to the files. Then, during loading, tuples were only loaded
into the hashtable if their stripe number matched the current stripe number.

This had several downsides. It introduced a couple new shared variables --
the current stripe number for the batch and its size.
In master, during the normal mode of the "build" stage, shared variables
for the size or estimated_size of the batch are checked on each
allocation of a STS Chunk or HashMemoryChunk, however, during
repartitioning, because bailing out early was not an option, workers
could use backend-local variables to keep track of size and merge them
at the end of repartitioning. This wasn't possible if we needed accurate
stripe numbers written into the tuples. This meant that we had to add
new shared variable accesses to repartitioning.

To avoid this, Deep and I worked on moving the "striping" logic from the
"build" stage to the "load" stage for batches. Serial hash join already
did striping in this way. This patch now pauses loading once the
space_allowed has been exhausted for parallel hash join as well. The
tricky part was keeping track of multiple read_pages for a given file.

When tuples had explicit stripe numbers, we simply rewound the read_page
in the SharedTuplestoreParticipant to the earliest SharedTuplestoreChunk
that anyone had read and relied on the stripe numbers to avoid loading
tuples more than once. Now, each worker participating in reading from
the SharedTuplestore could have received a read_page "assignment" (four
blocks, currently) and then failed to allocate a HashMemoryChunk. We
cannot risk rewinding the read_page because there could be
SharedTuplestoreChunks that have already been loaded in between ones
that have not.

The design we went with was to "overflow" the tuples from this
SharedTuplestoreChunk onto the end of the write_file which this worker
wrote--if it participated in writing this STS--or by making a new
write_file if it did not participate in writing. This entailed keeping
track of who participated in the write phase. SharedTuplestore
participation now has three "modes"-- reading, writing, and appending.
During appending, workers can write to their own file and read from any
file.

One of the alternative designs I considered was to store the offset and
length of leftover blocks that still needed to be loaded into the hash
table in the SharedTuplestoreParticipant data structure. Then, workers
would pick up these "assignments". It is basically a
SharedTuplestoreParticipant work queue.
The main stumbling block I faced here was allocating a variable number of
things in shared memory. You don't know how many read participants will
read from the file and how many stripes there will be (until you've
loaded the file). In the worst case, you would need space for
nparticipants * nstripes - 1 offset/length combos.
Since I don't know how many stripes I have until I've loaded the file, I
can't allocate shared memory for this up front.

The downside of the "append overflow" design is that, currently, all
workers participating in loading a fallback batch write an overflow
chunk for every fallback stripe.
It seems like something could be done to check if there is space in the
hashtable before accepting an assignment of blocks to read from the
SharedTuplestore and moving the shared variable read_page. It might
reduce instances in which workers have to overflow. However, I tried
this and it is very intrusive on the SharedTuplestore API (it would have
to know about the hash table). Also, oversized tuples would not be
addressed by this pre-assignment check since memory is allocated a
HashMemoryChunk at a time. So, even if this was solved, you would need
overflow functionality

One note is that I had to comment out a test in join_hash.sql which
inserts tuples larger than work_mem in size (each), because it no longer
successfully executes.
Also, the stripe number is not deterministic, so sometimes the tests that
compare fallback batches' number of stripes fail (also in join_hash.sql).

Major outstanding TODOs:
--
- Potential redesign of stripe loading pausing and resumption
- The instrumentation for parallel fallback batches has some problems
- Deadlock hazard avoidance design of the stripe barrier still needs work
- Assorted smaller TODOs in the code

On Thu, Jun 25, 2020 at 5:22 PM Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>
wrote:

> On Thu, Jun 25, 2020 at 03:09:44PM -0700, Melanie Plageman wrote:
> >On Tue, Jun 23, 2020 at 3:24 PM Tomas Vondra <
> tomas(dot)vondra(at)2ndquadrant(dot)com>
> >wrote:
> >
> >> I assume
> >> you're working on a patch addressing the remaining TODOS, right?
> >>
> >
> >I wanted to get some feedback on the patch before working through the
> >TODOs to make sure I was on the right track.
> >
> >Now that you are reviewing this, I will focus all my attention
> >on addressing your feedback. If there are any TODOs that you feel are
> >most important, let me know, so I can start with those.
> >
> >Otherwise, I will prioritize parallel batch 0 spilling.
>
> Feel free to work on the batch 0 spilling, please. I still need to get
> familiar with various parts of the parallel hash join etc. so I don't
> have any immediate feedback which TODOs to work on first.
>
> >David Kimura plans to do a bit of work on on parallel hash join batch 0
> >spilling tomorrow. Whatever is left after that, I will pick up next
> >week. Parallel hash join batch 0 spilling is the last large TODO that I
> >had.
> >
> >My plan was to then focus on the feedback (either about which TODOs are
> >most important or outside of the TODOs I've identified) I get from you
> >and anyone else who reviews this.
> >
>
> OK.
>

See list of patch contents above.

Tomas, I wasn't sure if you would want a patchset which included a
commit with just the differences between this version and v10 since you
had already started reviewing it.
This commit [1] is on a branch off of my fork that has just the delta
between v10 and v11.
As a warning, I have added a few updates to comments and such after
squashing the two in my current branch (which is what is in this patch).
I didn't intend to maintain the commits separately as I felt it would be
more confusing for other reviewers.

>
> >
> >>
> >> nodeHash.c
> >> ----------
> >>
> >>
> >> 1) MultiExecPrivateHash says this
> >>
> >> /*
> >> * Not subject to skew optimization, so either insert normally
> >> * or save to batch file if it belongs to another stripe
> >> */
> >>
> >> I wonder what it means to "belong to another stripe". I understand what
> >> that means for batches, which are identified by batchno computed from
> >> the hash value. But I thought "stripes" are just work_mem-sized pieces
> >> of a batch, so I don't quite understand this. Especially when the code
> >> does not actually check "which stripe" the row belongs to.
> >>
> >>
> >I agree this was confusing.
> >
> >"belongs to another stripe" meant here that if batch 0 falls back and we
> >are still loading it, once we've filled up work_mem, we need to start
> >saving those tuples to a spill file for batch 0. I've changed the
> >comment to this:
> >
> >- * or save to batch file if it belongs to another stripe
> >+ * or save to batch file if batch 0 falls back and we have
> >+ * already filled the hashtable up to space_allowed.
> >
>
> OK. Silly question - what does "batch 0 falls back" mean? Does it mean
> that we realized the hash table for batch 0 would not fit into work_mem,
> so we switched to the "hashloop" strategy?
>

Exactly.

> >
> >> 2) I find the fields hashloop_fallback rather confusing. We have one in
> >> HashJoinTable (and it's array of BufFile items) and another one in
> >> ParallelHashJoinBatch (this time just bool).
> >>
> >> I think HashJoinTable should be renamed to hashloopBatchFile (similarly
> >> to the other BufFile arrays).
> >
> >
> >I think you are right about the name. I've changed the name in
> >HashJoinTableData to hashloopBatchFile.
> >
> >The array of BufFiles hashloop_fallback was only used by serial
> >hashjoin. The boolean hashloop_fallback variable is used only by
> >parallel hashjoin.
> >
> >The reason I had them named the same thing is that I thought it would be
> >nice to have a variable with the same name to indicate if a batch "fell
> >back" for both parallel and serial hashjoin--especially since we check
> >it in the main hashjoin state machine used by parallel and serial
> >hashjoin.
> >
> >In serial hashjoin, the BufFiles aren't identified by name, so I kept
> >them in that array. In parallel hashjoin, each ParallelHashJoinBatch has
> >the status saved (in the struct).
> >So, both represented the fall back status of a batch.
> >
> >However, I agree with you, so I've renamed the serial one to
> >hashloopBatchFile.
> >
>
> OK
>
> >>
> >> Although I'm not sure why we even need
> >> this file, when we have innerBatchFile? BufFile(s) are not exactly free,
> >> in fact it's one of the problems for hashjoins with many batches.
> >>
> >>
> >Interesting -- it didn't even occur to me to combine the bitmap with the
> >inner side batch file data.
> >It definitely seems like a good idea to save the BufFile given that so
> >little data will likely go in it and that it has a 1-1 relationship with
> >inner side batches.
> >
> >How might it work? Would you reserve some space at the beginning of the
> >file? When would you reserve the bytes (before adding tuples you won't
> >know how many bytes you need, so it might be hard to make sure there is
> >enough space.) Would all inner side files have space reserved or just
> >fallback batches?
> >
>
> Oh! So the hashloopBatchFile is only used for the bitmap? I haven't
> realized that. In that case it probably makes sense to keep it separate
> from the files with spilled tuples, interleaving that somehow would be
> way too complex, I think.
>
> However, do we need an array of those files? I thought we only need the
> bitmap until we process all rows from each "stripe" and then we can
> throw it away, right? Which would also mean we don't need to worry about
> the memory usage too much, because the 8kB buffer will go away after
> calling BufFileClose.
>
>
Good point! I will try this change.

Regards,
Melanie (VMWare)

[1]
https://github.com/melanieplageman/postgres/commit/c6843ef9e0767f80d928d87bdb1078c9d20346e3

Attachment Content-Type Size
v11-0001-Implement-Adaptive-Hashjoin.patch text/x-patch 254.1 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Bruce Momjian 2020-08-31 22:48:53 Re: Documentation patch for backup manifests in protocol.sgml
Previous Message David Rowley 2020-08-31 21:59:05 Re: Hybrid Hash/Nested Loop joins and caching results from subplans