diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6ffaa751f2..4d5a6872cc 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); +static void ExecHashUpdateSpacePeak(HashJoinTable hashtable); /* ---------------------------------------------------------------- * ExecHash @@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node) if (hashtable->nbuckets != hashtable->nbuckets_optimal) ExecHashIncreaseNumBuckets(hashtable); - /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); hashtable->partialTuples = hashtable->totalTuples; } @@ -1647,12 +1646,56 @@ ExecHashTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + + /* + * Consider increasing number of batches. + * + * Each batch requires a non-trivial amount of memory, because BufFile + * includes a PGAlignedBlock (typically 8kB buffer). So when doubling + * the number of batches, we need to be careful and only allow that if + * it actually has a chance of reducing memory usage. + * + * In particular, doubling the number of batches is pointless when + * + * (spaceUsed / 2) < (nbatches * sizeof(BufFile)) + * + * because we expect to save roughly 1/2 of memory currently used for + * data (rows) at the price of doubling the memory used for BufFile. + * + * We can't stop adding batches entirely, because that would just mean + * the batches would need more and more memory. So we need to increase + * the number of batches, even if we can't enforce work_mem properly. + * The goal is to minimize the overall memory usage of the hash join. + * + * Note: This applies mostly to cases of significant underestimates, + * resulting in an explosion of the number of batches. The properly + * estimated cases should generally end up using merge join based on + * high cost of the batched hash join. + */ if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + + hashtable->nbatch * sizeof(PGAlignedBlock) * 2 > hashtable->spaceAllowed) + { ExecHashIncreaseNumBatches(hashtable); + + /* + * Consider increasing the resize threshold. + * + * For well estimated cases this does nothing, because batches are + * expected to account only for small fraction of work_mem. But if + * we significantly underestimate the number of batches, we may end + * up in a situation where BufFile alone exceed work_mem. So move + * the threshold a bit, until the next point where it'll make sense + * to consider adding batches again. + */ + hashtable->spaceAllowed + = Max(hashtable->spaceAllowed, + hashtable->nbatch * sizeof(PGAlignedBlock) * 3); + } } else { @@ -1893,6 +1936,21 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } } +static void +ExecHashUpdateSpacePeak(HashJoinTable hashtable) +{ + Size spaceUsed = hashtable->spaceUsed; + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + + /* Account for memory used for batch files (inner + outer) */ + spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2; + + if (spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = spaceUsed; +} + /* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple @@ -2272,8 +2330,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) + mcvsToUse * sizeof(int); hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) + mcvsToUse * sizeof(int); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); /* * Create a skew bucket for each MCV hash value. @@ -2322,8 +2381,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); } free_attstatsslot(&sslot); @@ -2411,8 +2471,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; hashtable->spaceUsedSkew += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) ExecHashRemoveNextSkewBucket(hashtable);