We saw a case recently where a hash join was using much more memory than
it was supposed to, causing failure when the server ran out of memory.
The hash join code is supposed to spill tuples to disk when the
hashtable exceeds work_mem, but this failed to save us because the
algorithm is not adaptive. What it really does is to divide the hash
key space into N batches where N is chosen at query startup based on the
planner's estimate of the number of rows to be processed. If that
estimate is way too small then an individual batch can be way too large,
but the code can't recover by adjusting N after the fact.
A somewhat related problem is that the HashAgg code doesn't have a way
to spill hashtable entries to disk at all, so it too can blow out memory
if the planner's estimate of the number of entries is way off. We've
seen reports of that happening, too.
Here's what I'm thinking of doing to fix it:
* Determine the number of in-memory hash buckets, K, at plan startup.
This is dependent on work_mem more than it is on the planner's
estimates, so there's no need for it to be adaptive. A tuple with
hash value H will go into bucket (H mod K) when it is processed.
* Estimate the number of batches N using the planner's estimate.
We will always choose N a power of 2. A tuple's batch number is
((H div K) mod N).
* Begin loading the hash table from the inner input. Tuples of batch
zero go into the hash table, tuples of higher batch numbers go into
holding files, one per batch.
* If the hash table size exceeds work_mem, double N (creating a bunch
of new empty holding files). Scan through the hash table for tuples
whose batch number is no longer zero according to the new calculation,
and dump them out to the appropriate one of the new holding files.
This should get rid of about half of the hash table entries if the
hash values are well dispersed. Essentially, we are looking at one
more bit of the hash value than we were using before.
* Lather, rinse, repeat until inner join input is completely read.
* Now begin scanning the outer join input. Tuples of batch number
zero (according to the current calculation) can be matched to the
current hashtable contents. Tuples of higher batch numbers are dropped
into holding files for the outer input, one per batch.
* After exhausting the outer input, we still have to match up tuples
of corresponding batches. To do this, we clear the in-memory hash table
and load it from tuples in the first unprocessed inner batch file.
If we had to increase N on-the-fly then it is possible that some of
these tuples no longer belong to batch 1, but to some higher batch
number --- write such tuples to the proper batch file instead of putting
them into the hash table.
* If some batches are more heavily populated than others, it is possible
that we exceed work_mem here. No problem: we can play the same game of
increasing N even at this stage. This works because increasing N can
only cause tuples to be reassigned to later batches, never to earlier
ones. (Of course, each on-the-fly increase in N means extra writes and
reads of tuples that were initially put in the wrong batch, so it's
still best to get as good an estimate as we can to start with.)
* While reading from an outer batch file, we have to check whether each
tuple is still considered to belong to the current batch, and dump it
out to the proper later batch file if not.
We can use basically the same ideas to fix HashAgg. Here, the aggregate
state values play the part of the "inner join tuples", and the incoming
data to be aggregated plays the part of the "outer join tuples". When
the hash table gets too big, we double the number of batches and dump
currently accumulated aggregate states into a per-batch holding file.
Incoming data that hashes into the current batch can be accumulated into
its aggregate value and discarded. Incoming data that hashes into a
later batch is put into a "to-do" file. After we scan all the input, we
emit the current aggregate states, load up the hash table from
the next batch holding file, and then scan the current to-do file for
inputs of the new batch. Note that we'll use only one "to-do" file in
each pass, not one per batch. This implies more I/O but it is the only
way to preserve the guarantee that incoming values are accumulated into
their aggregate in order of arrival. The standard aggregates don't care
about that, but user-defined aggregate functions often do.
regards, tom lane
pgsql-hackers by date
|Next:||From: Josh Berkus||Date: 2005-03-03 22:14:50|
|Subject: Re: logging as inserts|
|Previous:||From: Merlin Moncure||Date: 2005-03-03 21:28:01|
|Subject: Re: unexpected and reproducable crash in pl/pgsql function |