diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 799a22e9d5..c957043599 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es) hinstrument.nbatch, es); ExplainPropertyInteger("Original Hash Batches", NULL, hinstrument.nbatch_original, es); + ExplainPropertyInteger("In-Memory Hash Batches", NULL, + hinstrument.nbatch_original, es); ExplainPropertyInteger("Peak Memory Usage", "kB", spacePeakKb, es); } @@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es) hinstrument.nbuckets_original != hinstrument.nbuckets) { appendStringInfoSpaces(es->str, es->indent * 2); - appendStringInfo(es->str, - "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", - hinstrument.nbuckets, - hinstrument.nbuckets_original, - hinstrument.nbatch, - hinstrument.nbatch_original, - spacePeakKb); + if (hinstrument.nbatch != hinstrument.nbatch_inmemory) + appendStringInfo(es->str, + "Buckets: %d (originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage: %ldkB\n", + hinstrument.nbuckets, + hinstrument.nbuckets_original, + hinstrument.nbatch, + hinstrument.nbatch_original, + hinstrument.nbatch_inmemory, + spacePeakKb); + else + appendStringInfo(es->str, + "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", + hinstrument.nbuckets, + hinstrument.nbuckets_original, + hinstrument.nbatch, + hinstrument.nbatch_original, + spacePeakKb); } else { appendStringInfoSpaces(es->str, es->indent * 2); - appendStringInfo(es->str, - "Buckets: %d Batches: %d Memory Usage: %ldkB\n", - hinstrument.nbuckets, hinstrument.nbatch, - spacePeakKb); + if (hinstrument.nbatch != hinstrument.nbatch_inmemory) + appendStringInfo(es->str, + "Buckets: %d Batches: %d (in-memory: %d) Memory Usage: %ldkB\n", + hinstrument.nbuckets, hinstrument.nbatch, + hinstrument.nbatch_inmemory, + spacePeakKb); + else + appendStringInfo(es->str, + "Buckets: %d Batches: %d Memory Usage: %ldkB\n", + hinstrument.nbuckets, hinstrument.nbatch, + spacePeakKb); } } } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6ffaa751f2..4364eb7cdd 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); +static void ExecHashUpdateSpacePeak(HashJoinTable hashtable); /* ---------------------------------------------------------------- * ExecHash @@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node) if (hashtable->nbuckets != hashtable->nbuckets_optimal) ExecHashIncreaseNumBuckets(hashtable); - /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ - hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); hashtable->partialTuples = hashtable->totalTuples; } @@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) size_t space_allowed; int nbuckets; int nbatch; + int nbatch_inmemory; double rows; int num_skew_mcvs; int log2_nbuckets; @@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) state->parallel_state != NULL ? state->parallel_state->nparticipants - 1 : 0, &space_allowed, - &nbuckets, &nbatch, &num_skew_mcvs); + &nbuckets, &nbatch, &nbatch_inmemory, + &num_skew_mcvs); /* nbuckets must be a power of 2 */ log2_nbuckets = my_log2(nbuckets); @@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) hashtable->nSkewBuckets = 0; hashtable->skewBucketNums = NULL; hashtable->nbatch = nbatch; + hashtable->nbatch_inmemory = nbatch_inmemory; hashtable->curbatch = 0; hashtable->nbatch_original = nbatch; hashtable->nbatch_outstart = nbatch; @@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) hashtable->skewTuples = 0; hashtable->innerBatchFile = NULL; hashtable->outerBatchFile = NULL; + hashtable->innerOverflowFiles = NULL; + hashtable->outerOverflowFiles = NULL; hashtable->spaceUsed = 0; hashtable->spacePeak = 0; hashtable->spaceAllowed = space_allowed; @@ -559,16 +563,30 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls) if (nbatch > 1 && hashtable->parallel_state == NULL) { + int cnt = Min(nbatch, nbatch_inmemory); + /* * allocate and initialize the file arrays in hashCxt (not needed for * parallel case which uses shared tuplestores instead of raw files) */ hashtable->innerBatchFile = (BufFile **) - palloc0(nbatch * sizeof(BufFile *)); + palloc0(cnt * sizeof(BufFile *)); hashtable->outerBatchFile = (BufFile **) - palloc0(nbatch * sizeof(BufFile *)); + palloc0(cnt * sizeof(BufFile *)); /* The files will not be opened until needed... */ /* ... but make sure we have temp tablespaces established for them */ + + /* also allocate files for overflow batches */ + if (nbatch > nbatch_inmemory) + { + int nslices = (nbatch / nbatch_inmemory); + + hashtable->innerOverflowFiles = (BufFile **) + palloc0(nslices * sizeof(BufFile *)); + hashtable->outerOverflowFiles = (BufFile **) + palloc0(nslices * sizeof(BufFile *)); + } + PrepareTempTablespaces(); } @@ -665,6 +683,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs) { int tupsize; @@ -675,6 +694,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, long max_pointers; long mppow2; int nbatch = 1; + int nbatch_inmemory = 1; int nbuckets; double dbuckets; @@ -795,6 +815,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, space_allowed, numbuckets, numbatches, + numbatches_inmemory, num_skew_mcvs); return; } @@ -831,11 +852,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, nbatch <<= 1; } + /* + * See how many batches we can fit into memory (driven mostly by size + * of BufFile, with PGAlignedBlock being the largest part of that). + * We need one BufFile for inner and outer side, so we count it twice + * for each batch, and we stop once we exceed (work_mem/2). + */ + while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2 + <= (work_mem * 1024L / 2)) + nbatch_inmemory *= 2; + Assert(nbuckets > 0); Assert(nbatch > 0); *numbuckets = nbuckets; *numbatches = nbatch; + *numbatches_inmemory = nbatch_inmemory; } @@ -857,13 +889,27 @@ ExecHashTableDestroy(HashJoinTable hashtable) */ if (hashtable->innerBatchFile != NULL) { - for (i = 1; i < hashtable->nbatch; i++) + int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory); + + for (i = 1; i < n; i++) { if (hashtable->innerBatchFile[i]) BufFileClose(hashtable->innerBatchFile[i]); if (hashtable->outerBatchFile[i]) BufFileClose(hashtable->outerBatchFile[i]); } + + /* number of batch slices */ + n = hashtable->nbatch / hashtable->nbatch_inmemory; + + for (i = 1; i < n; i++) + { + if (hashtable->innerOverflowFiles[i]) + BufFileClose(hashtable->innerOverflowFiles[i]); + + if (hashtable->outerOverflowFiles[i]) + BufFileClose(hashtable->outerOverflowFiles[i]); + } } /* Release working memory (batchCxt is a child, so it goes away too) */ @@ -909,6 +955,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) if (hashtable->innerBatchFile == NULL) { + /* XXX nbatch=1, no need to deal with nbatch_inmemory here */ + /* we had no file arrays before */ hashtable->innerBatchFile = (BufFile **) palloc0(nbatch * sizeof(BufFile *)); @@ -919,15 +967,50 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory); + /* enlarge arrays and zero out added entries */ hashtable->innerBatchFile = (BufFile **) - repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *)); + repalloc(hashtable->innerBatchFile, nbatch_tmp * sizeof(BufFile *)); hashtable->outerBatchFile = (BufFile **) - repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *)); - MemSet(hashtable->innerBatchFile + oldnbatch, 0, - (nbatch - oldnbatch) * sizeof(BufFile *)); - MemSet(hashtable->outerBatchFile + oldnbatch, 0, - (nbatch - oldnbatch) * sizeof(BufFile *)); + repalloc(hashtable->outerBatchFile, nbatch_tmp * sizeof(BufFile *)); + + if (oldnbatch < nbatch_tmp) + { + MemSet(hashtable->innerBatchFile + oldnbatch, 0, + (nbatch_tmp - oldnbatch) * sizeof(BufFile *)); + MemSet(hashtable->outerBatchFile + oldnbatch, 0, + (nbatch_tmp - oldnbatch) * sizeof(BufFile *)); + } + + if (nbatch_tmp > hashtable->nbatch_inmemory) + { + int nslices = (nbatch / hashtable->nbatch_inmemory); + + if (hashtable->innerOverflowFiles == NULL) + { + hashtable->innerOverflowFiles = (BufFile **) + palloc0(nslices * sizeof(BufFile *)); + hashtable->outerOverflowFiles = (BufFile **) + palloc0(nslices * sizeof(BufFile *)); + } + else + { + hashtable->innerOverflowFiles = (BufFile **) + repalloc(hashtable->innerOverflowFiles, + nslices * sizeof(BufFile *)); + hashtable->outerOverflowFiles = (BufFile **) + repalloc(hashtable->outerOverflowFiles, + nslices * sizeof(BufFile *)); + + /* we double the number of batches, so we know the old + * value was nslices/2 exactly */ + memset(hashtable->innerOverflowFiles + nslices/2, 0, + (nslices/2) * sizeof(BufFile *)); + memset(hashtable->outerOverflowFiles + nslices/2, 0, + (nslices/2) * sizeof(BufFile *)); + } + } } MemoryContextSwitchTo(oldcxt); @@ -999,11 +1082,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* dump it out */ Assert(batchno > curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + hashtable->innerOverflowFiles); + ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, - &hashtable->innerBatchFile[batchno]); + batchFile); hashtable->spaceUsed -= hashTupleSize; nfreed++; @@ -1647,22 +1737,33 @@ ExecHashTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + + /* Consider increasing number of batches if we filled work_mem. */ if (hashtable->spaceUsed + - hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + hashtable->nbuckets_optimal * sizeof(HashJoinTuple) + + Min(hashtable->nbatch, hashtable->nbatch_inmemory) * sizeof(PGAlignedBlock) * 2 /* inner + outer */ > hashtable->spaceAllowed) ExecHashIncreaseNumBatches(hashtable); } else { + BufFile **batchFile; + /* * put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + hashtable->innerOverflowFiles); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + batchFile); } } @@ -1893,6 +1994,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable, } } +int +ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno) +{ + int slice, + curslice; + + if (hashtable->nbatch <= hashtable->nbatch_inmemory) + return batchno; + + slice = batchno / hashtable->nbatch_inmemory; + curslice = hashtable->curbatch / hashtable->nbatch_inmemory; + + /* slices can't go backwards */ + Assert(slice >= curslice); + + /* overflow slice */ + if (slice > curslice) + return -1; + + /* current slice, compute index in the current array */ + return (batchno % hashtable->nbatch_inmemory); +} + +BufFile ** +ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **batchFiles, BufFile **overflowFiles) +{ + int idx = ExecHashGetBatchIndex(hashtable, batchno); + + /* get the right overflow file */ + if (idx == -1) + { + int slice = (batchno / hashtable->nbatch_inmemory); + + return &overflowFiles[slice]; + } + + /* batch file in the current slice */ + return &batchFiles[idx]; +} + +void +ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable) +{ + int slice = (hashtable->curbatch / hashtable->nbatch_inmemory); + + memset(hashtable->innerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice]; + hashtable->innerOverflowFiles[slice] = NULL; + + memset(hashtable->outerBatchFile, 0, + hashtable->nbatch_inmemory * sizeof(BufFile *)); + + hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice]; + hashtable->outerOverflowFiles[slice] = NULL; +} + +int +ExecHashSwitchToNextBatch(HashJoinTable hashtable) +{ + int batchidx; + + hashtable->curbatch++; + + /* see if we skipped to the next batch slice */ + batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch); + + /* Can't be -1, current batch is in the current slice by definition. */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); + + /* + * If we skipped to the next slice of batches, reset the array of files + * and use the overflow file as the first batch. + */ + if (batchidx == 0) + ExecHashSwitchToNextBatchSlice(hashtable); + + return hashtable->curbatch; +} + +static void +ExecHashUpdateSpacePeak(HashJoinTable hashtable) +{ + Size spaceUsed = hashtable->spaceUsed; + + /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */ + spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple); + + /* Account for memory used for batch files (inner + outer) */ + spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + /* Account for slice files (inner + outer) */ + spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) * + sizeof(PGAlignedBlock) * 2; + + if (spaceUsed > hashtable->spacePeak) + hashtable->spacePeak = spaceUsed; +} + /* * ExecScanHashBucket * scan a hash bucket for matches to the current outer tuple @@ -2272,8 +2475,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) + mcvsToUse * sizeof(int); hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) + mcvsToUse * sizeof(int); - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); /* * Create a skew bucket for each MCV hash value. @@ -2322,8 +2526,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) hashtable->nSkewBuckets++; hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); } free_attstatsslot(&sslot); @@ -2411,8 +2616,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable, /* Account for space used, and back off if we've used too much */ hashtable->spaceUsed += hashTupleSize; hashtable->spaceUsedSkew += hashTupleSize; - if (hashtable->spaceUsed > hashtable->spacePeak) - hashtable->spacePeak = hashtable->spaceUsed; + + /* refresh info about peak used memory */ + ExecHashUpdateSpacePeak(hashtable); + while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) ExecHashRemoveNextSkewBucket(hashtable); @@ -2488,10 +2695,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } else { + BufFile **batchFile; + /* Put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->innerBatchFile, + hashtable->innerOverflowFiles); + ExecHashJoinSaveTuple(tuple, hashvalue, - &hashtable->innerBatchFile[batchno]); + batchFile); pfree(hashTuple); hashtable->spaceUsed -= tupleSize; hashtable->spaceUsedSkew -= tupleSize; @@ -2640,6 +2854,7 @@ ExecHashGetInstrumentation(HashInstrumentation *instrument, instrument->nbuckets_original = hashtable->nbuckets_original; instrument->nbatch = hashtable->nbatch; instrument->nbatch_original = hashtable->nbatch_original; + instrument->nbatch_inmemory = Min(hashtable->nbatch, hashtable->nbatch_inmemory); instrument->space_peak = hashtable->spacePeak; } diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5922e60eed..a8db71925b 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (batchno != hashtable->curbatch && node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO) { + BufFile **batchFile; + /* * Need to postpone this outer tuple to a later batch. * Save it in the corresponding outer-batch file. */ Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); + + batchFile = ExecHashGetBatchFile(hashtable, batchno, + hashtable->outerBatchFile, + hashtable->outerOverflowFiles); + ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot), hashvalue, - &hashtable->outerBatchFile[batchno]); + batchFile); /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; @@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, } else if (curbatch < hashtable->nbatch) { - BufFile *file = hashtable->outerBatchFile[curbatch]; + BufFile **file = ExecHashGetBatchFile(hashtable, curbatch, + hashtable->outerBatchFile, + hashtable->outerOverflowFiles); /* * In outer-join cases, we could get here even though the batch file * is empty. */ - if (file == NULL) + if (*file == NULL) return NULL; slot = ExecHashJoinGetSavedTuple(hjstate, - file, + *file, hashvalue, hjstate->hj_OuterTupleSlot); if (!TupIsNull(slot)) @@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) BufFile *innerFile; TupleTableSlot *slot; uint32 hashvalue; + int batchidx; + int curbatch_old; nbatch = hashtable->nbatch; curbatch = hashtable->curbatch; + curbatch_old = curbatch; + + /* index of the old batch */ + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + + /* has to be in the current slice of batches */ + Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory); if (curbatch > 0) { @@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * We no longer need the previous outer batch file; close it right * away to free disk space. */ - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; } else /* we just finished the first batch */ { @@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * scan, we have to rescan outer batches in case they contain tuples that * need to be reassigned. */ - curbatch++; + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); + while (curbatch < nbatch && - (hashtable->outerBatchFile[curbatch] == NULL || - hashtable->innerBatchFile[curbatch] == NULL)) + (hashtable->outerBatchFile[batchidx] == NULL || + hashtable->innerBatchFile[batchidx] == NULL)) { - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && HJ_FILL_OUTER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && HJ_FILL_INNER(hjstate)) break; /* must process due to rule 1 */ - if (hashtable->innerBatchFile[curbatch] && + if (hashtable->innerBatchFile[batchidx] && nbatch != hashtable->nbatch_original) break; /* must process due to rule 2 */ - if (hashtable->outerBatchFile[curbatch] && + if (hashtable->outerBatchFile[batchidx] && nbatch != hashtable->nbatch_outstart) break; /* must process due to rule 3 */ /* We can ignore this batch. */ /* Release associated temp files right away. */ - if (hashtable->innerBatchFile[curbatch]) - BufFileClose(hashtable->innerBatchFile[curbatch]); - hashtable->innerBatchFile[curbatch] = NULL; - if (hashtable->outerBatchFile[curbatch]) - BufFileClose(hashtable->outerBatchFile[curbatch]); - hashtable->outerBatchFile[curbatch] = NULL; - curbatch++; + if (hashtable->innerBatchFile[batchidx]) + BufFileClose(hashtable->innerBatchFile[batchidx]); + hashtable->innerBatchFile[batchidx] = NULL; + if (hashtable->outerBatchFile[batchidx]) + BufFileClose(hashtable->outerBatchFile[batchidx]); + hashtable->outerBatchFile[batchidx] = NULL; + + curbatch = ExecHashSwitchToNextBatch(hashtable); + batchidx = ExecHashGetBatchIndex(hashtable, curbatch); } if (curbatch >= nbatch) + { + hashtable->curbatch = curbatch_old; return false; /* no more batches */ - - hashtable->curbatch = curbatch; + } /* * Reload the hash table with the new inner batch (which could be empty) */ ExecHashTableReset(hashtable); - innerFile = hashtable->innerBatchFile[curbatch]; + innerFile = hashtable->innerBatchFile[batchidx]; if (innerFile != NULL) { @@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) * needed */ BufFileClose(innerFile); - hashtable->innerBatchFile[curbatch] = NULL; + hashtable->innerBatchFile[batchidx] = NULL; } /* * Rewind outer batch file (if present), so that we can start reading it. */ - if (hashtable->outerBatchFile[curbatch] != NULL) + if (hashtable->outerBatchFile[batchidx] != NULL) { - if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET)) + if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L, SEEK_SET)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not rewind hash-join temporary file: %m"))); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index c7400941ee..e324869c09 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, int num_hashclauses = list_length(hashclauses); int numbuckets; int numbatches; + int numbatches_inmemory; int num_skew_mcvs; size_t space_allowed; /* unused */ @@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, &space_allowed, &numbuckets, &numbatches, + &numbatches_inmemory, &num_skew_mcvs); /* diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index a9f9872a78..311a0980ee 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -308,6 +308,7 @@ typedef struct HashJoinTableData int *skewBucketNums; /* array indexes of active skew buckets */ int nbatch; /* number of batches */ + int nbatch_inmemory; /* max number of in-memory batches */ int curbatch; /* current batch #; 0 during 1st pass */ int nbatch_original; /* nbatch when we started inner scan */ @@ -329,6 +330,9 @@ typedef struct HashJoinTableData BufFile **innerBatchFile; /* buffered virtual temp file per batch */ BufFile **outerBatchFile; /* buffered virtual temp file per batch */ + BufFile **innerOverflowFiles; /* temp file for inner overflow batches */ + BufFile **outerOverflowFiles; /* temp file for outer overflow batches */ + /* * Info about the datatype-specific hash functions for the datatypes being * hashed. These are arrays of the same length as the number of hash join diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 8d700c06c5..bb6b24a1b4 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -16,6 +16,7 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +#include "storage/buffile.h" struct SharedHashJoinBatch; @@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, int *batchno); +extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno); +extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno, + BufFile **batchFiles, BufFile **overflowFiles); +extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable); +extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable); extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); @@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, size_t *space_allowed, int *numbuckets, int *numbatches, + int *numbatches_inmemory, int *num_skew_mcvs); extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 9959c9e31f..6c53c5abd2 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation int nbuckets_original; /* planned number of buckets */ int nbatch; /* number of batches at end of execution */ int nbatch_original; /* planned number of batches */ + int nbatch_inmemory; /* number of batches kept in memory */ size_t space_peak; /* speak memory usage in bytes */ } HashInstrumentation;