diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index f1989b10ea..8c743d7561 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -247,6 +247,7 @@ #include "utils/datum.h" #include "utils/dynahash.h" #include "utils/expandeddatum.h" +#include "utils/logtape.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -288,8 +289,9 @@ typedef struct HashAggSpill int n_partitions; /* number of output partitions */ int partition_bits; /* number of bits for partition mask log2(n_partitions) parent partition bits */ - BufFile **partitions; /* output partition files */ + int *partitions; /* output logtape numbers */ int64 *ntuples; /* number of tuples in each partition */ + LogicalTapeSet *lts; } HashAggSpill; /* @@ -298,11 +300,12 @@ typedef struct HashAggSpill */ typedef struct HashAggBatch { - BufFile *input_file; /* input partition */ + int input_tape; /* input partition */ int input_bits; /* number of bits for input partition mask */ int64 input_tuples; /* number of tuples in this batch */ int setno; /* grouping set */ HashAggSpill spill; /* spill output */ + LogicalTapeSet *lts; } HashAggBatch; static void select_current_set(AggState *aggstate, int setno, bool is_hash); @@ -359,9 +362,8 @@ static void hash_spill_init(HashAggSpill *spill, int input_bits, uint64 input_tuples, double hashentrysize); static Size hash_spill_tuple(HashAggSpill *spill, int input_bits, TupleTableSlot *slot, uint32 hash); -static MinimalTuple hash_read_spilled(BufFile *file, uint32 *hashp); -static HashAggBatch *hash_batch_new(BufFile *input_file, int setno, - int64 input_tuples, int input_bits); +static MinimalTuple hash_read_spilled(LogicalTapeSet *lts, int tapenum, uint32 *hashp); +static HashAggBatch *hash_batch_new(LogicalTapeSet *lts, int tapenum, int setno, int64 input_tuples, int input_bits); static void hash_finish_initial_spills(AggState *aggstate); static void hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int input_bits); @@ -2462,7 +2464,7 @@ agg_refill_hash_table(AggState *aggstate) CHECK_FOR_INTERRUPTS(); - tuple = hash_read_spilled(batch->input_file, &hash); + tuple = hash_read_spilled(batch->lts, batch->input_tape, &hash); if (tuple == NULL) break; @@ -2490,8 +2492,8 @@ agg_refill_hash_table(AggState *aggstate) batch->input_tuples, aggstate->hashentrysize); } - aggstate->hash_disk_used += hash_spill_tuple( - &batch->spill, batch->input_bits, slot, hash); + //aggstate->hash_disk_used += + hash_spill_tuple(&batch->spill, batch->input_bits, slot, hash); } /* Advance the aggregates (or combine functions) */ @@ -2504,8 +2506,6 @@ agg_refill_hash_table(AggState *aggstate) ResetExprContext(aggstate->tmpcontext); } - BufFileClose(batch->input_file); - aggstate->current_phase = 0; aggstate->phase = &aggstate->phases[aggstate->current_phase]; @@ -2690,6 +2690,9 @@ hash_spill_init(HashAggSpill *spill, int input_bits, uint64 input_groups, { int npartitions; int partition_bits; + int i; + int j; + int old_npartitions; npartitions = hash_choose_num_spill_partitions(input_groups, hashentrysize); @@ -2702,10 +2705,33 @@ hash_spill_init(HashAggSpill *spill, int input_bits, uint64 input_groups, /* number of partitions will be a power of two */ npartitions = 1L << partition_bits; - spill->partition_bits = partition_bits; - spill->n_partitions = npartitions; - spill->partitions = palloc0(sizeof(BufFile *) * npartitions); - spill->ntuples = palloc0(sizeof(int64) * npartitions); + if (spill->lts == NULL) + { + spill->partition_bits = partition_bits; + spill->n_partitions = npartitions; + spill->partitions = palloc0(sizeof(int) * npartitions); + for (i = 0; i < spill->n_partitions; ++i) + { + spill->partitions[i] = i; + } + spill->ntuples = palloc0(sizeof(int64) * spill->n_partitions); + spill->lts = LogicalTapeSetCreate(npartitions, NULL, NULL, 0); // TODO: worker is 0? + } + else // respill + { + old_npartitions = LogicalTapeGetNTapes(spill->lts); + spill->partition_bits = my_log2(npartitions); + spill->n_partitions = (1L << spill->partition_bits); + spill->partitions = palloc0(sizeof(int) * npartitions); + j = old_npartitions; + for (i = 0; i < spill->n_partitions; ++i) + { + spill->partitions[i] = j; + j++; + } + spill->ntuples = palloc0(sizeof(int64) * spill->n_partitions); + spill->lts = LogicalTapeSetExtend(spill->lts, spill->n_partitions); + } } /* @@ -2720,8 +2746,6 @@ hash_spill_tuple(HashAggSpill *spill, int input_bits, TupleTableSlot *slot, { int partition; MinimalTuple tuple; - BufFile *file; - int written; int total_written = 0; bool shouldFree; @@ -2743,23 +2767,11 @@ hash_spill_tuple(HashAggSpill *spill, int input_bits, TupleTableSlot *slot, spill->ntuples[partition]++; - if (spill->partitions[partition] == NULL) - spill->partitions[partition] = BufFileCreateTemp(false); - file = spill->partitions[partition]; - - written = BufFileWrite(file, (void *) &hash, sizeof(uint32)); - if (written != sizeof(uint32)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write to HashAgg temporary file: %m"))); - total_written += written; + LogicalTapeWrite(spill->lts, spill->partitions[partition], (void *) &hash, sizeof(uint32)); + total_written += sizeof(uint32); - written = BufFileWrite(file, (void *) tuple, tuple->t_len); - if (written != tuple->t_len) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not write to HashAgg temporary file: %m"))); - total_written += written; + LogicalTapeWrite(spill->lts, spill->partitions[partition], (void *) tuple, tuple->t_len); + total_written += tuple->t_len; if (shouldFree) pfree(tuple); @@ -2772,38 +2784,37 @@ hash_spill_tuple(HashAggSpill *spill, int input_bits, TupleTableSlot *slot, * read the next tuple from a batch file. Return NULL if no more. */ static MinimalTuple -hash_read_spilled(BufFile *file, uint32 *hashp) +hash_read_spilled(LogicalTapeSet *lts, int tapenum, uint32 *hashp) { MinimalTuple tuple; uint32 t_len; size_t nread; uint32 hash; - nread = BufFileRead(file, &hash, sizeof(uint32)); + nread = LogicalTapeRead(lts, tapenum, &hash, sizeof(uint32)); if (nread == 0) return NULL; if (nread != sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from HashAgg temporary file: %m"))); + errmsg("could not read the hash from HashAgg spilled tape: %m"))); if (hashp != NULL) *hashp = hash; - nread = BufFileRead(file, &t_len, sizeof(t_len)); + nread = LogicalTapeRead(lts, tapenum, &t_len, sizeof(t_len)); if (nread != sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from HashAgg temporary file: %m"))); + errmsg("could not read the t_len from HashAgg spilled tape: %m"))); tuple = (MinimalTuple) palloc(t_len); tuple->t_len = t_len; - nread = BufFileRead(file, (void *)((char *)tuple + sizeof(uint32)), - t_len - sizeof(uint32)); + nread = LogicalTapeRead(lts, tapenum, (void *)((char *)tuple + sizeof(uint32)), t_len - sizeof(uint32)); if (nread != t_len - sizeof(uint32)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from HashAgg temporary file: %m"))); + errmsg("could not read the data from HashAgg spilled tape: %m"))); return tuple; } @@ -2815,15 +2826,17 @@ hash_read_spilled(BufFile *file, uint32 *hashp) * be done. Should be called in the aggregate's memory context. */ static HashAggBatch * -hash_batch_new(BufFile *input_file, int setno, int64 input_tuples, +hash_batch_new(LogicalTapeSet *lts, int tapenum, int setno, int64 input_tuples, int input_bits) { HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); - batch->input_file = input_file; + batch->input_tape = tapenum; batch->input_bits = input_bits; batch->input_tuples = input_tuples; batch->setno = setno; + batch->lts = lts; + batch->spill.lts = lts; // share same logical tape set /* batch->spill will be set only after spilling this batch */ @@ -2860,7 +2873,7 @@ hash_finish_initial_spills(AggState *aggstate) /* * hash_spill_finish * - * Transform spill files into new batches. + * Transform spill files into new batches. // XXX so the partitions are empty and ready to be reused */ static void hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int input_bits) @@ -2872,28 +2885,20 @@ hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int input_ for (i = 0; i < spill->n_partitions; i++) { - BufFile *file = spill->partitions[i]; MemoryContext oldContext; HashAggBatch *new_batch; - /* partition is empty */ - if (file == NULL) - continue; - - /* rewind file for reading */ - if (BufFileSeek(file, 0, 0L, SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind HashAgg temporary file: %m"))); - oldContext = MemoryContextSwitchTo(aggstate->ss.ps.state->es_query_cxt); - new_batch = hash_batch_new(file, setno, spill->ntuples[i], - spill->partition_bits + input_bits); + LogicalTapeRewindForRead(spill->lts, spill->partitions[i], 0); + new_batch = hash_batch_new(spill->lts, spill->partitions[i], setno, spill->ntuples[i], + spill->partition_bits + input_bits); aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch); aggstate->hash_batches_used++; MemoryContextSwitchTo(oldContext); } + if (!list_member_ptr(aggstate->lts_list, spill->lts)) + aggstate->lts_list = lappend(aggstate->lts_list, spill->lts); pfree(spill->ntuples); pfree(spill->partitions); } @@ -2904,13 +2909,10 @@ hash_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno, int input_ static void hash_reset_spill(HashAggSpill *spill) { - int i; - for (i = 0; i < spill->n_partitions; i++) + if (spill->lts != NULL) { - BufFile *file = spill->partitions[i]; - - if (file != NULL) - BufFileClose(file); + LogicalTapeSetClose(spill->lts); + spill->lts = NULL; } if (spill->ntuples != NULL) pfree(spill->ntuples); @@ -2940,16 +2942,19 @@ hash_reset_spills(AggState *aggstate) foreach(lc, aggstate->hash_batches) { HashAggBatch *batch = (HashAggBatch*) lfirst(lc); - if (batch->input_file != NULL) - { - BufFileClose(batch->input_file); - batch->input_file = NULL; - } hash_reset_spill(&batch->spill); pfree(batch); } list_free(aggstate->hash_batches); aggstate->hash_batches = NIL; + + foreach(lc, aggstate->lts_list) + { + LogicalTapeSet *lts = (LogicalTapeSet *) lfirst(lc); + LogicalTapeSetClose(lts); + } + list_free(aggstate->lts_list); + aggstate->lts_list = NIL; } diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index 8985b9e095..677b992743 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -202,7 +202,7 @@ struct LogicalTapeSet /* The array of logical tapes. */ int nTapes; /* # of logical tapes in set */ - LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER]; /* has nTapes nentries */ + LogicalTape *tapes; /* has nTapes nentries */ }; static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer); @@ -518,8 +518,8 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, * Create top-level struct including per-tape LogicalTape structs. */ Assert(ntapes > 0); - lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) + - ntapes * sizeof(LogicalTape)); + lts = (LogicalTapeSet *) palloc0(sizeof(LogicalTapeSet)); + lts->tapes = (LogicalTape *)palloc0(ntapes * sizeof(LogicalTape)); lts->nBlocksAllocated = 0L; lts->nBlocksWritten = 0L; lts->nHoleBlocks = 0L; @@ -577,6 +577,45 @@ LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, return lts; } +LogicalTapeSet * +LogicalTapeSetExtend(LogicalTapeSet *lts, int ntoextend) +{ + LogicalTape *lt; + int i; + + /* + * Create top-level struct including per-tape LogicalTape structs. + */ + Assert(ntoextend > 0); + lts->tapes = (LogicalTape *) repalloc(lts->tapes, (lts->nTapes + ntoextend) * sizeof(LogicalTape)); + lts->nTapes = lts->nTapes + ntoextend; + + /* + * Initialize per-tape structs. Note we allocate the I/O buffer and the + * first block for a tape only when it is first actually written to. This + * avoids wasting memory space when we overestimate the number of tapes needed. + */ + for (i = lts->nTapes - ntoextend; i < lts->nTapes; i++) + { + lt = <s->tapes[i]; + lt->writing = true; + lt->frozen = false; + lt->dirty = false; + lt->firstBlockNumber = -1L; + lt->curBlockNumber = -1L; + lt->nextBlockNumber = -1L; + lt->offsetBlockNumber = 0L; + lt->buffer = NULL; + lt->buffer_size = 0; + /* palloc() larger than MaxAllocSize would fail */ + lt->max_size = MaxAllocSize; + lt->pos = 0; + lt->nbytes = 0; + } + + return lts; +} + /* * Close a logical tape set and release all resources. */ @@ -1083,3 +1122,9 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts) { return lts->nBlocksAllocated - lts->nHoleBlocks; } + +int +LogicalTapeGetNTapes(LogicalTapeSet *lts) +{ + return lts->nTapes; +} diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 8d4a36a353..d45473101c 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2083,6 +2083,7 @@ typedef struct AggState uint64 hash_disk_used; /* bytes of disk space used */ int hash_batches_used; /* batches used during entire execution */ List *hash_batches; /* hash batches remaining to be processed */ + List *lts_list; AggStatePerHash perhash; /* array of per-hashtable data */ AggStatePerGroup *hash_pergroup; /* grouping set indexed array of diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 081b03880a..c2f5c72665 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -56,6 +56,7 @@ typedef struct TapeShare extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, TapeShare *shared, SharedFileSet *fileset, int worker); +extern LogicalTapeSet * LogicalTapeSetExtend(LogicalTapeSet *lts, int ntoextend); extern void LogicalTapeSetClose(LogicalTapeSet *lts); extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts); extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, @@ -74,5 +75,6 @@ extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset); extern long LogicalTapeSetBlocks(LogicalTapeSet *lts); +extern int LogicalTapeGetNTapes(LogicalTapeSet *lts); #endif /* LOGTAPE_H */