From 501d73488159af3e172e117439054bc06dd09edf Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Wed, 21 Oct 2020 21:57:42 +0300
Subject: [PATCH v5 1/2] Refactor LogicalTapeSet/LogicalTape interface.

All the tape functions, like LogicalTapeRead and LogicalTapeWrite, now
take a LogicalTape as argument, instead of LogicalTapeSet+tape number.
You can create any number of LogicalTapes in a single LogicalTapeSet, and
you don't need to decide the number upfront, when you create the tape set.

This makes the tape management in hash agg spilling in nodeAgg.c simpler.
---
 src/backend/executor/nodeAgg.c     | 187 ++++--------
 src/backend/utils/sort/logtape.c   | 456 ++++++++++++-----------------
 src/backend/utils/sort/tuplesort.c | 229 +++++++--------
 src/include/nodes/execnodes.h      |   3 +-
 src/include/utils/logtape.h        |  37 ++-
 5 files changed, 359 insertions(+), 553 deletions(-)

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 39bea204d16..c99a0de4ddb 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -208,7 +208,16 @@
  *
  *	  Spilled data is written to logical tapes. These provide better control
  *	  over memory usage, disk space, and the number of files than if we were
- *	  to use a BufFile for each spill.
+ *	  to use a BufFile for each spill.  We don't know the number of tapes needed
+ *	  at the start of the algorithm (because it can recurse), so a tape set is
+ *	  allocated at the beginning, and individual tapes are created as needed.
+ *	  As a particular tape is read, logtape.c recycles its disk space. When a
+ *	  tape is read to completion, it is destroyed entirely.
+ *
+ *	  Tapes' buffers can take up substantial memory when many tapes are open at
+ *	  once. We only need one tape open at a time in read mode (using a buffer
+ *	  that's a multiple of BLCKSZ); but we need one tape open in write mode (each
+ *	  requiring a buffer of size BLCKSZ) for each partition.
  *
  *	  Note that it's possible for transition states to start small but then
  *	  grow very large; for instance in the case of ARRAY_AGG. In such cases,
@@ -311,27 +320,6 @@
  */
 #define CHUNKHDRSZ 16
 
-/*
- * Track all tapes needed for a HashAgg that spills. We don't know the maximum
- * number of tapes needed at the start of the algorithm (because it can
- * recurse), so one tape set is allocated and extended as needed for new
- * tapes. When a particular tape is already read, rewind it for write mode and
- * put it in the free list.
- *
- * Tapes' buffers can take up substantial memory when many tapes are open at
- * once. We only need one tape open at a time in read mode (using a buffer
- * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
- * requiring a buffer of size BLCKSZ) for each partition.
- */
-typedef struct HashTapeInfo
-{
-	LogicalTapeSet *tapeset;
-	int			ntapes;
-	int		   *freetapes;
-	int			nfreetapes;
-	int			freetapes_alloc;
-} HashTapeInfo;
-
 /*
  * Represents partitioned spill data for a single hashtable. Contains the
  * necessary information to route tuples to the correct partition, and to
@@ -343,9 +331,8 @@ typedef struct HashTapeInfo
  */
 typedef struct HashAggSpill
 {
-	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
 	int			npartitions;	/* number of partitions */
-	int		   *partitions;		/* spill partition tape numbers */
+	LogicalTape **partitions;	/* spill partition tapes */
 	int64	   *ntuples;		/* number of tuples in each partition */
 	uint32		mask;			/* mask to find partition from hash value */
 	int			shift;			/* after masking, shift by this amount */
@@ -365,8 +352,7 @@ typedef struct HashAggBatch
 {
 	int			setno;			/* grouping set */
 	int			used_bits;		/* number of bits of hash already used */
-	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
-	int			input_tapenum;	/* input partition tape */
+	LogicalTape *input_tape;	/* input partition tape */
 	int64		input_tuples;	/* number of tuples in this batch */
 	double		input_card;		/* estimated group cardinality */
 } HashAggBatch;
@@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
 									int npartitions);
 static void hashagg_finish_initial_spills(AggState *aggstate);
 static void hashagg_reset_spill_state(AggState *aggstate);
-static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
-									   int input_tapenum, int setno,
+static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
 									   int64 input_tuples, double input_card,
 									   int used_bits);
 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
-static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
+static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts,
 							   int used_bits, double input_groups,
 							   double hashentrysize);
 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 								TupleTableSlot *slot, uint32 hash);
 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
 								 int setno);
-static void hashagg_tapeinfo_init(AggState *aggstate);
-static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
-									int ndest);
-static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
 									  AggState *aggstate, EState *estate,
@@ -1887,12 +1868,12 @@ hash_agg_enter_spill_mode(AggState *aggstate)
 
 	if (!aggstate->hash_ever_spilled)
 	{
-		Assert(aggstate->hash_tapeinfo == NULL);
+		Assert(aggstate->hash_tapeset == NULL);
 		Assert(aggstate->hash_spills == NULL);
 
 		aggstate->hash_ever_spilled = true;
 
-		hashagg_tapeinfo_init(aggstate);
+		aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
 
 		aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
 
@@ -1901,7 +1882,7 @@ hash_agg_enter_spill_mode(AggState *aggstate)
 			AggStatePerHash perhash = &aggstate->perhash[setno];
 			HashAggSpill *spill = &aggstate->hash_spills[setno];
 
-			hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+			hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
 							   perhash->aggnode->numGroups,
 							   aggstate->hashentrysize);
 		}
@@ -1943,9 +1924,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
 		aggstate->hash_mem_peak = total_mem;
 
 	/* update disk usage */
-	if (aggstate->hash_tapeinfo != NULL)
+	if (aggstate->hash_tapeset != NULL)
 	{
-		uint64		disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
+		uint64		disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
 
 		if (aggstate->hash_disk_used < disk_used)
 			aggstate->hash_disk_used = disk_used;
@@ -2132,7 +2113,7 @@ lookup_hash_entries(AggState *aggstate)
 			TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
 
 			if (spill->partitions == NULL)
-				hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+				hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
 								   perhash->aggnode->numGroups,
 								   aggstate->hashentrysize);
 
@@ -2597,7 +2578,7 @@ agg_refill_hash_table(AggState *aggstate)
 	HashAggBatch *batch;
 	AggStatePerHash perhash;
 	HashAggSpill spill;
-	HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
+	LogicalTapeSet *tapeset = aggstate->hash_tapeset;
 	bool		spill_initialized = false;
 
 	if (aggstate->hash_batches == NIL)
@@ -2693,7 +2674,7 @@ agg_refill_hash_table(AggState *aggstate)
 				 * that we don't assign tapes that will never be used.
 				 */
 				spill_initialized = true;
-				hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
+				hashagg_spill_init(&spill, tapeset, batch->used_bits,
 								   batch->input_card, aggstate->hashentrysize);
 			}
 			/* no memory for a new group, spill */
@@ -2709,7 +2690,7 @@ agg_refill_hash_table(AggState *aggstate)
 		ResetExprContext(aggstate->tmpcontext);
 	}
 
-	hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
+	LogicalTapeClose(batch->input_tape);
 
 	/* change back to phase 0 */
 	aggstate->current_phase = 0;
@@ -2884,67 +2865,6 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
 	return NULL;
 }
 
-/*
- * Initialize HashTapeInfo
- */
-static void
-hashagg_tapeinfo_init(AggState *aggstate)
-{
-	HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
-	int			init_tapes = 16;	/* expanded dynamically */
-
-	tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
-	tapeinfo->ntapes = init_tapes;
-	tapeinfo->nfreetapes = init_tapes;
-	tapeinfo->freetapes_alloc = init_tapes;
-	tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
-	for (int i = 0; i < init_tapes; i++)
-		tapeinfo->freetapes[i] = i;
-
-	aggstate->hash_tapeinfo = tapeinfo;
-}
-
-/*
- * Assign unused tapes to spill partitions, extending the tape set if
- * necessary.
- */
-static void
-hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
-						int npartitions)
-{
-	int			partidx = 0;
-
-	/* use free tapes if available */
-	while (partidx < npartitions && tapeinfo->nfreetapes > 0)
-		partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
-
-	if (partidx < npartitions)
-	{
-		LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
-
-		while (partidx < npartitions)
-			partitions[partidx++] = tapeinfo->ntapes++;
-	}
-}
-
-/*
- * After a tape has already been written to and then read, this function
- * rewinds it for writing and adds it to the free list.
- */
-static void
-hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
-{
-	/* rewinding frees the buffer while not in use */
-	LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
-	if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
-	{
-		tapeinfo->freetapes_alloc <<= 1;
-		tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
-									   tapeinfo->freetapes_alloc * sizeof(int));
-	}
-	tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
-}
-
 /*
  * hashagg_spill_init
  *
@@ -2952,7 +2872,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
  * of partitions to create, and initializes them.
  */
 static void
-hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
+hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
 				   double input_groups, double hashentrysize)
 {
 	int			npartitions;
@@ -2961,13 +2881,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
 	npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
 											 used_bits, &partition_bits);
 
-	spill->partitions = palloc0(sizeof(int) * npartitions);
+	spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
 	spill->ntuples = palloc0(sizeof(int64) * npartitions);
 	spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
 
-	hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
+	for (int i = 0; i < npartitions; i++)
+		spill->partitions[i] = LogicalTapeCreate(tapeset);
 
-	spill->tapeset = tapeinfo->tapeset;
 	spill->shift = 32 - used_bits - partition_bits;
 	spill->mask = (npartitions - 1) << spill->shift;
 	spill->npartitions = npartitions;
@@ -2986,11 +2906,10 @@ static Size
 hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 					TupleTableSlot *inputslot, uint32 hash)
 {
-	LogicalTapeSet *tapeset = spill->tapeset;
 	TupleTableSlot *spillslot;
 	int			partition;
 	MinimalTuple tuple;
-	int			tapenum;
+	LogicalTape *tape;
 	int			total_written = 0;
 	bool		shouldFree;
 
@@ -3029,12 +2948,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 	 */
 	addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
 
-	tapenum = spill->partitions[partition];
+	tape = spill->partitions[partition];
 
-	LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
+	LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32));
 	total_written += sizeof(uint32);
 
-	LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
+	LogicalTapeWrite(tape, (void *) tuple, tuple->t_len);
 	total_written += tuple->t_len;
 
 	if (shouldFree)
@@ -3050,15 +2969,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
  * be done.
  */
 static HashAggBatch *
-hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
+hashagg_batch_new(LogicalTape *input_tape, int setno,
 				  int64 input_tuples, double input_card, int used_bits)
 {
 	HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
 
 	batch->setno = setno;
 	batch->used_bits = used_bits;
-	batch->tapeset = tapeset;
-	batch->input_tapenum = tapenum;
+	batch->input_tape = input_tape;
 	batch->input_tuples = input_tuples;
 	batch->input_card = input_card;
 
@@ -3072,42 +2990,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
 static MinimalTuple
 hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
 {
-	LogicalTapeSet *tapeset = batch->tapeset;
-	int			tapenum = batch->input_tapenum;
+	LogicalTape *tape = batch->input_tape;
 	MinimalTuple tuple;
 	uint32		t_len;
 	size_t		nread;
 	uint32		hash;
 
-	nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
+	nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
 	if (nread == 0)
 		return NULL;
 	if (nread != sizeof(uint32))
 		ereport(ERROR,
 				(errcode_for_file_access(),
-				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
-						tapenum, sizeof(uint32), nread)));
+				 errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+						tape, sizeof(uint32), nread)));
 	if (hashp != NULL)
 		*hashp = hash;
 
-	nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
+	nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
 	if (nread != sizeof(uint32))
 		ereport(ERROR,
 				(errcode_for_file_access(),
-				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
-						tapenum, sizeof(uint32), nread)));
+				 errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+						tape, sizeof(uint32), nread)));
 
 	tuple = (MinimalTuple) palloc(t_len);
 	tuple->t_len = t_len;
 
-	nread = LogicalTapeRead(tapeset, tapenum,
+	nread = LogicalTapeRead(tape,
 							(void *) ((char *) tuple + sizeof(uint32)),
 							t_len - sizeof(uint32));
 	if (nread != t_len - sizeof(uint32))
 		ereport(ERROR,
 				(errcode_for_file_access(),
-				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
-						tapenum, t_len - sizeof(uint32), nread)));
+				 errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+						tape, t_len - sizeof(uint32), nread)));
 
 	return tuple;
 }
@@ -3164,8 +3081,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
 
 	for (i = 0; i < spill->npartitions; i++)
 	{
-		LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
-		int			tapenum = spill->partitions[i];
+		LogicalTape *tape = spill->partitions[i];
 		HashAggBatch *new_batch;
 		double		cardinality;
 
@@ -3177,10 +3093,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
 		freeHyperLogLog(&spill->hll_card[i]);
 
 		/* rewinding frees the buffer while not in use */
-		LogicalTapeRewindForRead(tapeset, tapenum,
-								 HASHAGG_READ_BUFFER_SIZE);
+		LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
 
-		new_batch = hashagg_batch_new(tapeset, tapenum, setno,
+		new_batch = hashagg_batch_new(tape, setno,
 									  spill->ntuples[i], cardinality,
 									  used_bits);
 		aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
@@ -3227,14 +3142,10 @@ hashagg_reset_spill_state(AggState *aggstate)
 	aggstate->hash_batches = NIL;
 
 	/* close tape set */
-	if (aggstate->hash_tapeinfo != NULL)
+	if (aggstate->hash_tapeset != NULL)
 	{
-		HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
-
-		LogicalTapeSetClose(tapeinfo->tapeset);
-		pfree(tapeinfo->freetapes);
-		pfree(tapeinfo);
-		aggstate->hash_tapeinfo = NULL;
+		LogicalTapeSetClose(aggstate->hash_tapeset);
+		aggstate->hash_tapeset = NULL;
 	}
 }
 
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index debf12e1b0b..6d7f862fb5c 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -9,8 +9,7 @@
  * there is an annoying problem: the peak space usage is at least twice
  * the volume of actual data to be sorted.  (This must be so because each
  * datum will appear in both the input and output tapes of the final
- * merge pass.  For seven-tape polyphase merge, which is otherwise a
- * pretty good algorithm, peak usage is more like 4x actual data volume.)
+ * merge pass.)
  *
  * We can work around this problem by recognizing that any one tape
  * dataset (with the possible exception of the final output) is written
@@ -137,6 +136,8 @@ typedef struct TapeBlockTrailer
  */
 typedef struct LogicalTape
 {
+	LogicalTapeSet *tapeSet;	/* tape set this tape is part of */
+
 	bool		writing;		/* T while in write phase */
 	bool		frozen;			/* T if blocks should not be freed when read */
 	bool		dirty;			/* does buffer need to be written? */
@@ -180,11 +181,14 @@ typedef struct LogicalTape
  * This data structure represents a set of related "logical tapes" sharing
  * space in a single underlying file.  (But that "file" may be multiple files
  * if needed to escape OS limits on file size; buffile.c handles that for us.)
- * The number of tapes is fixed at creation.
+ * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
+ * demand.
  */
 struct LogicalTapeSet
 {
 	BufFile    *pfile;			/* underlying file for whole tape set */
+	SharedFileSet *fileset;
+	int			worker;			/* worker # if shared, -1 for leader/serial */
 
 	/*
 	 * File size tracking.  nBlocksWritten is the size of the underlying file,
@@ -213,22 +217,16 @@ struct LogicalTapeSet
 	long		nFreeBlocks;	/* # of currently free blocks */
 	Size		freeBlocksLen;	/* current allocated length of freeBlocks[] */
 	bool		enable_prealloc;	/* preallocate write blocks? */
-
-	/* The array of logical tapes. */
-	int			nTapes;			/* # of logical tapes in set */
-	LogicalTape *tapes;			/* has nTapes nentries */
 };
 
+static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
 static long ltsGetFreeBlock(LogicalTapeSet *lts);
 static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
-static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
-								 SharedFileSet *fileset);
-static void ltsInitTape(LogicalTape *lt);
-static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
+static void ltsInitReadBuffer(LogicalTape *lt);
 
 
 /*
@@ -304,7 +302,7 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
  * Returns true if anything was read, 'false' on EOF.
  */
 static bool
-ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsReadFillBuffer(LogicalTape *lt)
 {
 	lt->pos = 0;
 	lt->nbytes = 0;
@@ -321,9 +319,9 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
 		datablocknum += lt->offsetBlockNumber;
 
 		/* Read the block */
-		ltsReadBlock(lts, datablocknum, (void *) thisbuf);
+		ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf);
 		if (!lt->frozen)
-			ltsReleaseBlock(lts, datablocknum);
+			ltsReleaseBlock(lt->tapeSet, datablocknum);
 		lt->curBlockNumber = lt->nextBlockNumber;
 
 		lt->nbytes += TapeBlockGetNBytes(thisbuf);
@@ -530,126 +528,13 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
 	}
 }
 
-/*
- * Claim ownership of a set of logical tapes from existing shared BufFiles.
- *
- * Caller should be leader process.  Though tapes are marked as frozen in
- * workers, they are not frozen when opened within leader, since unfrozen tapes
- * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
- * for random access.)
- */
-static void
-ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
-					 SharedFileSet *fileset)
-{
-	LogicalTape *lt = NULL;
-	long		tapeblocks = 0L;
-	long		nphysicalblocks = 0L;
-	int			i;
-
-	/* Should have at least one worker tape, plus leader's tape */
-	Assert(lts->nTapes >= 2);
-
-	/*
-	 * Build concatenated view of all BufFiles, remembering the block number
-	 * where each source file begins.  No changes are needed for leader/last
-	 * tape.
-	 */
-	for (i = 0; i < lts->nTapes - 1; i++)
-	{
-		char		filename[MAXPGPATH];
-		BufFile    *file;
-		int64		filesize;
-
-		lt = &lts->tapes[i];
-
-		pg_itoa(i, filename);
-		file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false);
-		filesize = BufFileSize(file);
-
-		/*
-		 * Stash first BufFile, and concatenate subsequent BufFiles to that.
-		 * Store block offset into each tape as we go.
-		 */
-		lt->firstBlockNumber = shared[i].firstblocknumber;
-		if (i == 0)
-		{
-			lts->pfile = file;
-			lt->offsetBlockNumber = 0L;
-		}
-		else
-		{
-			lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
-		}
-		/* Don't allocate more for read buffer than could possibly help */
-		lt->max_size = Min(MaxAllocSize, filesize);
-		tapeblocks = filesize / BLCKSZ;
-		nphysicalblocks += tapeblocks;
-	}
-
-	/*
-	 * Set # of allocated blocks, as well as # blocks written.  Use extent of
-	 * new BufFile space (from 0 to end of last worker's tape space) for this.
-	 * Allocated/written blocks should include space used by holes left
-	 * between concatenated BufFiles.
-	 */
-	lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
-	lts->nBlocksWritten = lts->nBlocksAllocated;
-
-	/*
-	 * Compute number of hole blocks so that we can later work backwards, and
-	 * instrument number of physical blocks.  We don't simply use physical
-	 * blocks directly for instrumentation because this would break if we ever
-	 * subsequently wrote to the leader tape.
-	 *
-	 * Working backwards like this keeps our options open.  If shared BufFiles
-	 * ever support being written to post-export, logtape.c can automatically
-	 * take advantage of that.  We'd then support writing to the leader tape
-	 * while recycling space from worker tapes, because the leader tape has a
-	 * zero offset (write routines won't need to have extra logic to apply an
-	 * offset).
-	 *
-	 * The only thing that currently prevents writing to the leader tape from
-	 * working is the fact that BufFiles opened using BufFileOpenFileSet() are
-	 * read-only by definition, but that could be changed if it seemed
-	 * worthwhile.  For now, writing to the leader tape will raise a "Bad file
-	 * descriptor" error, so tuplesort must avoid writing to the leader tape
-	 * altogether.
-	 */
-	lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
-}
-
-/*
- * Initialize per-tape struct.  Note we allocate the I/O buffer lazily.
- */
-static void
-ltsInitTape(LogicalTape *lt)
-{
-	lt->writing = true;
-	lt->frozen = false;
-	lt->dirty = false;
-	lt->firstBlockNumber = -1L;
-	lt->curBlockNumber = -1L;
-	lt->nextBlockNumber = -1L;
-	lt->offsetBlockNumber = 0L;
-	lt->buffer = NULL;
-	lt->buffer_size = 0;
-	/* palloc() larger than MaxAllocSize would fail */
-	lt->max_size = MaxAllocSize;
-	lt->pos = 0;
-	lt->nbytes = 0;
-	lt->prealloc = NULL;
-	lt->nprealloc = 0;
-	lt->prealloc_size = 0;
-}
-
 /*
  * Lazily allocate and initialize the read buffer. This avoids waste when many
  * tapes are open at once, but not all are active between rewinding and
  * reading.
  */
 static void
-ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsInitReadBuffer(LogicalTape *lt)
 {
 	Assert(lt->buffer_size > 0);
 	lt->buffer = palloc(lt->buffer_size);
@@ -658,40 +543,32 @@ ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
 	lt->nextBlockNumber = lt->firstBlockNumber;
 	lt->pos = 0;
 	lt->nbytes = 0;
-	ltsReadFillBuffer(lts, lt);
+	ltsReadFillBuffer(lt);
 }
 
 /*
- * Create a set of logical tapes in a temporary underlying file.
+ * Create a tape set, backed by a temporary underlying file.
  *
- * Each tape is initialized in write state.  Serial callers pass ntapes,
- * NULL argument for shared, and -1 for worker.  Parallel worker callers
- * pass ntapes, a shared file handle, NULL shared argument,  and their own
- * worker number.  Leader callers, which claim shared worker tapes here,
- * must supply non-sentinel values for all arguments except worker number,
- * which should be -1.
+ * The tape set is initially empty. Use LogicalTapeCreate() to create
+ * tapes in it.
  *
- * Leader caller is passing back an array of metadata each worker captured
- * when LogicalTapeFreeze() was called for their final result tapes.  Passed
- * tapes array is actually sized ntapes - 1, because it includes only
- * worker tapes, whereas leader requires its own leader tape.  Note that we
- * rely on the assumption that reclaimed worker tapes will only be read
- * from once by leader, and never written to again (tapes are initialized
- * for writing, but that's only to be consistent).  Leader may not write to
- * its own tape purely due to a restriction in the shared buffile
- * infrastructure that may be lifted in the future.
+ * Serial callers pass NULL argument for shared, and -1 for worker.  Parallel
+ * worker callers pass a shared file handle and their own worker number.
+ *
+ * Leader callers pass a shared file handle and -1 for worker. After creating
+ * the tape set, use LogicalTapeImport() to import the worker tapes into it.
+ *
+ * Currently, the leader will only import worker tapes into the set, it does
+ * not create tapes of its own, although in principle that should work.
  */
 LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
-					 SharedFileSet *fileset, int worker)
+LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
 {
 	LogicalTapeSet *lts;
-	int			i;
 
 	/*
 	 * Create top-level struct including per-tape LogicalTape structs.
 	 */
-	Assert(ntapes > 0);
 	lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
 	lts->nBlocksAllocated = 0L;
 	lts->nBlocksWritten = 0L;
@@ -701,22 +578,21 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
 	lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
 	lts->nFreeBlocks = 0;
 	lts->enable_prealloc = preallocate;
-	lts->nTapes = ntapes;
-	lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));
 
-	for (i = 0; i < ntapes; i++)
-		ltsInitTape(&lts->tapes[i]);
+	lts->fileset = fileset;
+	lts->worker = worker;
 
 	/*
 	 * Create temp BufFile storage as required.
 	 *
-	 * Leader concatenates worker tapes, which requires special adjustment to
-	 * final tapeset data.  Things are simpler for the worker case and the
+	 * In leader, we hijack the BufFile of the first tape that's imported, and
+	 * concatenate the BufFiles of any subsequent tapes to that. Hence don't
+	 * create a BufFile here. Things are simpler for the worker case and the
 	 * serial case, though.  They are generally very similar -- workers use a
 	 * shared fileset, whereas serial sorts use a conventional serial BufFile.
 	 */
-	if (shared)
-		ltsConcatWorkerTapes(lts, shared, fileset);
+	if (fileset && worker == -1)
+		lts->pfile = NULL;
 	else if (fileset)
 	{
 		char		filename[MAXPGPATH];
@@ -731,26 +607,145 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
 }
 
 /*
- * Close a logical tape set and release all resources.
+ * Claim ownership of a logical tape from an existing shared BufFile.
+ *
+ * Caller should be leader process.  Though tapes are marked as frozen in
+ * workers, they are not frozen when opened within leader, since unfrozen tapes
+ * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
+ * for random access.)
  */
-void
-LogicalTapeSetClose(LogicalTapeSet *lts)
+LogicalTape *
+LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
 {
 	LogicalTape *lt;
-	int			i;
+	long		tapeblocks;
+	char		filename[MAXPGPATH];
+	BufFile    *file;
+	int64		filesize;
 
-	BufFileClose(lts->pfile);
-	for (i = 0; i < lts->nTapes; i++)
+	lt = ltsCreateTape(lts);
+
+	/*
+	 * build concatenated view of all buffiles, remembering the block number
+	 * where each source file begins.
+	 */
+	pg_itoa(worker, filename);
+	file = BufFileOpenFileSet(&lts->fileset->fs, filename, O_RDONLY, false);
+	filesize = BufFileSize(file);
+
+	/*
+	 * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
+	 * block offset into each tape as we go.
+	 */
+	lt->firstBlockNumber = shared->firstblocknumber;
+	if (lts->pfile == NULL)
 	{
-		lt = &lts->tapes[i];
-		if (lt->buffer)
-			pfree(lt->buffer);
+		lts->pfile = file;
+		lt->offsetBlockNumber = 0L;
 	}
-	pfree(lts->tapes);
+	else
+	{
+		lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
+	}
+	/* Don't allocate more for read buffer than could possibly help */
+	lt->max_size = Min(MaxAllocSize, filesize);
+	tapeblocks = filesize / BLCKSZ;
+
+	/*
+	 * Update # of allocated blocks and # blocks written to reflect the
+	 * imported BufFile.  Allocated/written blocks include space used by holes
+	 * left between concatenated BufFiles.  Also track the number of hole
+	 * blocks so that we can later work backwards to calculate the number of
+	 * physical blocks for instrumentation.
+	 */
+	lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;
+
+	lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
+	lts->nBlocksWritten = lts->nBlocksAllocated;
+
+	return lt;
+}
+
+/*
+ * Close a logical tape set and release all resources.
+ *
+ * NOTE: This doesn't close any of the tapes!  You must close them
+ * first, or you can let them be destroyed along with the memory context.
+ */
+void
+LogicalTapeSetClose(LogicalTapeSet *lts)
+{
+	BufFileClose(lts->pfile);
 	pfree(lts->freeBlocks);
 	pfree(lts);
 }
 
+/*
+ * Create a logical tape in the given tapeset.
+ *
+ * The tape is initialized in write state.
+ */
+LogicalTape *
+LogicalTapeCreate(LogicalTapeSet *lts)
+{
+	/*
+	 * The only thing that currently prevents creating new tapes in leader is
+	 * the fact that BufFiles opened using BufFileOpenShared() are read-only
+	 * by definition, but that could be changed if it seemed worthwhile.  For
+	 * now, writing to the leader tape will raise a "Bad file descriptor"
+	 * error, so tuplesort must avoid writing to the leader tape altogether.
+	 */
+	if (lts->fileset && lts->worker == -1)
+		elog(ERROR, "cannot create new tapes in leader process");
+
+	return ltsCreateTape(lts);
+}
+
+static LogicalTape *
+ltsCreateTape(LogicalTapeSet *lts)
+{
+	LogicalTape *lt;
+
+	/*
+	 * Create per-tape struct.  Note we allocate the I/O buffer lazily.
+	 */
+	lt = palloc(sizeof(LogicalTape));
+	lt->tapeSet = lts;
+	lt->writing = true;
+	lt->frozen = false;
+	lt->dirty = false;
+	lt->firstBlockNumber = -1L;
+	lt->curBlockNumber = -1L;
+	lt->nextBlockNumber = -1L;
+	lt->offsetBlockNumber = 0L;
+	lt->buffer = NULL;
+	lt->buffer_size = 0;
+	/* palloc() larger than MaxAllocSize would fail */
+	lt->max_size = MaxAllocSize;
+	lt->pos = 0;
+	lt->nbytes = 0;
+	lt->prealloc = NULL;
+	lt->nprealloc = 0;
+	lt->prealloc_size = 0;
+
+	return lt;
+}
+
+/*
+ * Close a logical tape.
+ *
+ * Note: This doesn't return any blocks to the free list!  You must read
+ * the tape to the end first, to reuse the space.  In current use, though,
+ * we only close tapes after fully reading them.
+ */
+void
+LogicalTapeClose(LogicalTape *lt)
+{
+	if (lt->buffer)
+		pfree(lt->buffer);
+	pfree(lt);
+}
+
 /*
  * Mark a logical tape set as not needing management of free space anymore.
  *
@@ -772,14 +767,11 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
  * There are no error returns; we ereport() on failure.
  */
 void
-LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
-				 void *ptr, size_t size)
+LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
 {
-	LogicalTape *lt;
+	LogicalTapeSet *lts = lt->tapeSet;
 	size_t		nthistime;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->writing);
 	Assert(lt->offsetBlockNumber == 0L);
 
@@ -818,11 +810,11 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 			 * First allocate the next block, so that we can store it in the
 			 * 'next' pointer of this block.
 			 */
-			nextBlockNumber = ltsGetBlock(lts, lt);
+			nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
 
 			/* set the next-pointer and dump the current block. */
 			TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
-			ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+			ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
 
 			/* initialize the prev-pointer of the next block */
 			TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
@@ -860,12 +852,9 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
  * byte buffer is used.
  */
 void
-LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
+LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
 {
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
+	LogicalTapeSet *lts = lt->tapeSet;
 
 	/*
 	 * Round and cap buffer_size if needed.
@@ -907,7 +896,7 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
 									  lt->buffer_size - lt->nbytes);
 
 			TapeBlockSetNBytes(lt->buffer, lt->nbytes);
-			ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+			ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
 		}
 		lt->writing = false;
 	}
@@ -939,61 +928,28 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
 	}
 }
 
-/*
- * Rewind logical tape and switch from reading to writing.
- *
- * NOTE: we assume the caller has read the tape to the end; otherwise
- * untouched data will not have been freed. We could add more code to free
- * any unread blocks, but in current usage of this module it'd be useless
- * code.
- */
-void
-LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
-{
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
-
-	Assert(!lt->writing && !lt->frozen);
-	lt->writing = true;
-	lt->dirty = false;
-	lt->firstBlockNumber = -1L;
-	lt->curBlockNumber = -1L;
-	lt->pos = 0;
-	lt->nbytes = 0;
-	if (lt->buffer)
-		pfree(lt->buffer);
-	lt->buffer = NULL;
-	lt->buffer_size = 0;
-}
-
 /*
  * Read from a logical tape.
  *
  * Early EOF is indicated by return value less than #bytes requested.
  */
 size_t
-LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
-				void *ptr, size_t size)
+LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
 {
-	LogicalTape *lt;
 	size_t		nread = 0;
 	size_t		nthistime;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(!lt->writing);
 
 	if (lt->buffer == NULL)
-		ltsInitReadBuffer(lts, lt);
+		ltsInitReadBuffer(lt);
 
 	while (size > 0)
 	{
 		if (lt->pos >= lt->nbytes)
 		{
 			/* Try to load more data into buffer. */
-			if (!ltsReadFillBuffer(lts, lt))
+			if (!ltsReadFillBuffer(lt))
 				break;			/* EOF */
 		}
 
@@ -1031,12 +987,10 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
  * Serial sorts should set share to NULL.
  */
 void
-LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
+LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
 {
-	LogicalTape *lt;
+	LogicalTapeSet *lts = lt->tapeSet;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->writing);
 	Assert(lt->offsetBlockNumber == 0L);
 
@@ -1058,8 +1012,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
 								  lt->buffer_size - lt->nbytes);
 
 		TapeBlockSetNBytes(lt->buffer, lt->nbytes);
-		ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
-		lt->writing = false;
+		ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
 	}
 	lt->writing = false;
 	lt->frozen = true;
@@ -1086,7 +1039,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
 
 	if (lt->firstBlockNumber == -1L)
 		lt->nextBlockNumber = -1L;
-	ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+	ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
 	if (TapeBlockIsLast(lt->buffer))
 		lt->nextBlockNumber = -1L;
 	else
@@ -1101,25 +1054,6 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
 	}
 }
 
-/*
- * Add additional tapes to this tape set. Not intended to be used when any
- * tapes are frozen.
- */
-void
-LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
-{
-	int			i;
-	int			nTapesOrig = lts->nTapes;
-
-	lts->nTapes += nAdditional;
-
-	lts->tapes = (LogicalTape *) repalloc(lts->tapes,
-										  lts->nTapes * sizeof(LogicalTape));
-
-	for (i = nTapesOrig; i < lts->nTapes; i++)
-		ltsInitTape(&lts->tapes[i]);
-}
-
 /*
  * Backspace the tape a given number of bytes.  (We also support a more
  * general seek interface, see below.)
@@ -1134,18 +1068,15 @@ LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
  * that case.
  */
 size_t
-LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
+LogicalTapeBackspace(LogicalTape *lt, size_t size)
 {
-	LogicalTape *lt;
 	size_t		seekpos = 0;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
 	Assert(lt->buffer_size == BLCKSZ);
 
 	if (lt->buffer == NULL)
-		ltsInitReadBuffer(lts, lt);
+		ltsInitReadBuffer(lt);
 
 	/*
 	 * Easy case for seek within current block.
@@ -1175,7 +1106,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
 			return seekpos;
 		}
 
-		ltsReadBlock(lts, prev, (void *) lt->buffer);
+		ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer);
 
 		if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
 			elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
@@ -1208,23 +1139,18 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
  * LogicalTapeTell().
  */
 void
-LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
-				long blocknum, int offset)
+LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
 {
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
 	Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
 	Assert(lt->buffer_size == BLCKSZ);
 
 	if (lt->buffer == NULL)
-		ltsInitReadBuffer(lts, lt);
+		ltsInitReadBuffer(lt);
 
 	if (blocknum != lt->curBlockNumber)
 	{
-		ltsReadBlock(lts, blocknum, (void *) lt->buffer);
+		ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer);
 		lt->curBlockNumber = blocknum;
 		lt->nbytes = TapeBlockPayloadSize;
 		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
@@ -1242,16 +1168,10 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
  * the position for a seek after freezing.  Not clear if anyone needs that.
  */
 void
-LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
-				long *blocknum, int *offset)
+LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
 {
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
-
 	if (lt->buffer == NULL)
-		ltsInitReadBuffer(lts, lt);
+		ltsInitReadBuffer(lt);
 
 	Assert(lt->offsetBlockNumber == 0L);
 
@@ -1271,13 +1191,5 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
 long
 LogicalTapeSetBlocks(LogicalTapeSet *lts)
 {
-#ifdef USE_ASSERT_CHECKING
-	for (int i = 0; i < lts->nTapes; i++)
-	{
-		LogicalTape *lt = &lts->tapes[i];
-
-		Assert(!lt->writing || lt->buffer == NULL);
-	}
-#endif
 	return lts->nBlocksWritten - lts->nHoleBlocks;
 }
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index b17347b2141..d5930f258d9 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -262,6 +262,7 @@ struct Tuplesortstate
 	MemoryContext sortcontext;	/* memory context holding most sort data */
 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
+	LogicalTape **tapes;
 
 	/*
 	 * These function pointers decouple the routines that must know what kind
@@ -290,7 +291,7 @@ struct Tuplesortstate
 	 * SortTuple struct!), and increase state->availMem by the amount of
 	 * memory space thereby released.
 	 */
-	void		(*writetup) (Tuplesortstate *state, int tapenum,
+	void		(*writetup) (Tuplesortstate *state, LogicalTape *tape,
 							 SortTuple *stup);
 
 	/*
@@ -299,7 +300,7 @@ struct Tuplesortstate
 	 * from the slab memory arena, or is palloc'd, see readtup_alloc().
 	 */
 	void		(*readtup) (Tuplesortstate *state, SortTuple *stup,
-							int tapenum, unsigned int len);
+							LogicalTape *tape, unsigned int len);
 
 	/*
 	 * This array holds the tuples now in sort memory.  If we are in state
@@ -393,7 +394,7 @@ struct Tuplesortstate
 	 * the next tuple to return.  (In the tape case, the tape's current read
 	 * position is also critical state.)
 	 */
-	int			result_tape;	/* actual tape number of finished output */
+	LogicalTape *result_tape;	/* tape of finished output */
 	int			current;		/* array index (only used if SORTEDINMEM) */
 	bool		eof_reached;	/* reached EOF (needed for cursors) */
 
@@ -599,9 +600,9 @@ struct Sharedsort
  */
 
 /* When using this macro, beware of double evaluation of len */
-#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
+#define LogicalTapeReadExact(tape, ptr, len) \
 	do { \
-		if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
+		if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
 			elog(ERROR, "unexpected end of data"); \
 	} while(0)
 
@@ -619,7 +620,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
 static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void make_bounded_heap(Tuplesortstate *state);
 static void sort_bounded_heap(Tuplesortstate *state);
@@ -628,39 +629,39 @@ static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
 static void tuplesort_heap_delete_top(Tuplesortstate *state);
 static void reversedirection(Tuplesortstate *state);
-static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
-static void markrunend(Tuplesortstate *state, int tapenum);
+static unsigned int getlen(LogicalTape *tape, bool eofOK);
+static void markrunend(LogicalTape *tape);
 static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
 static int	comparetup_heap(const SortTuple *a, const SortTuple *b,
 							Tuplesortstate *state);
 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_heap(Tuplesortstate *state, int tapenum,
+static void writetup_heap(Tuplesortstate *state, LogicalTape *tape,
 						  SortTuple *stup);
 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
-						 int tapenum, unsigned int len);
+						 LogicalTape *tape, unsigned int len);
 static int	comparetup_cluster(const SortTuple *a, const SortTuple *b,
 							   Tuplesortstate *state);
 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_cluster(Tuplesortstate *state, int tapenum,
+static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape,
 							 SortTuple *stup);
 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
-							int tapenum, unsigned int len);
+							LogicalTape *tape, unsigned int len);
 static int	comparetup_index_btree(const SortTuple *a, const SortTuple *b,
 								   Tuplesortstate *state);
 static int	comparetup_index_hash(const SortTuple *a, const SortTuple *b,
 								  Tuplesortstate *state);
 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_index(Tuplesortstate *state, int tapenum,
+static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
 						   SortTuple *stup);
 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
-						  int tapenum, unsigned int len);
+						  LogicalTape *tape, unsigned int len);
 static int	comparetup_datum(const SortTuple *a, const SortTuple *b,
 							 Tuplesortstate *state);
 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_datum(Tuplesortstate *state, int tapenum,
+static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
 						   SortTuple *stup);
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
-						  int tapenum, unsigned int len);
+						  LogicalTape *tape, unsigned int len);
 static int	worker_get_identifier(Tuplesortstate *state);
 static void worker_freeze_result_tape(Tuplesortstate *state);
 static void worker_nomergeruns(Tuplesortstate *state);
@@ -888,7 +889,7 @@ tuplesort_begin_batch(Tuplesortstate *state)
 	 * inittapes(), if needed
 	 */
 
-	state->result_tape = -1;	/* flag that result tape has not been formed */
+	state->result_tape = NULL;	/* flag that result tape has not been formed */
 
 	MemoryContextSwitchTo(oldcontext);
 }
@@ -2221,7 +2222,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 				if (state->eof_reached)
 					return false;
 
-				if ((tuplen = getlen(state, state->result_tape, true)) != 0)
+				if ((tuplen = getlen(state->result_tape, true)) != 0)
 				{
 					READTUP(state, stup, state->result_tape, tuplen);
 
@@ -2254,8 +2255,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 				 * end of file; back up to fetch last tuple's ending length
 				 * word.  If seek fails we must have a completely empty file.
 				 */
-				nmoved = LogicalTapeBackspace(state->tapeset,
-											  state->result_tape,
+				nmoved = LogicalTapeBackspace(state->result_tape,
 											  2 * sizeof(unsigned int));
 				if (nmoved == 0)
 					return false;
@@ -2269,20 +2269,18 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 				 * Back up and fetch previously-returned tuple's ending length
 				 * word.  If seek fails, assume we are at start of file.
 				 */
-				nmoved = LogicalTapeBackspace(state->tapeset,
-											  state->result_tape,
+				nmoved = LogicalTapeBackspace(state->result_tape,
 											  sizeof(unsigned int));
 				if (nmoved == 0)
 					return false;
 				else if (nmoved != sizeof(unsigned int))
 					elog(ERROR, "unexpected tape position");
-				tuplen = getlen(state, state->result_tape, false);
+				tuplen = getlen(state->result_tape, false);
 
 				/*
 				 * Back up to get ending length word of tuple before it.
 				 */
-				nmoved = LogicalTapeBackspace(state->tapeset,
-											  state->result_tape,
+				nmoved = LogicalTapeBackspace(state->result_tape,
 											  tuplen + 2 * sizeof(unsigned int));
 				if (nmoved == tuplen + sizeof(unsigned int))
 				{
@@ -2299,15 +2297,14 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 					elog(ERROR, "bogus tuple length in backward scan");
 			}
 
-			tuplen = getlen(state, state->result_tape, false);
+			tuplen = getlen(state->result_tape, false);
 
 			/*
 			 * Now we have the length of the prior tuple, back up and read it.
 			 * Note: READTUP expects we are positioned after the initial
 			 * length word of the tuple, so back up to that point.
 			 */
-			nmoved = LogicalTapeBackspace(state->tapeset,
-										  state->result_tape,
+			nmoved = LogicalTapeBackspace(state->result_tape,
 										  tuplen);
 			if (nmoved != tuplen)
 				elog(ERROR, "bogus tuple length in backward scan");
@@ -2365,11 +2362,10 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 					tuplesort_heap_delete_top(state);
 
 					/*
-					 * Rewind to free the read buffer.  It'd go away at the
-					 * end of the sort anyway, but better to release the
-					 * memory early.
+					 * Close the tape.  It'd go away at the end of the sort
+					 * anyway, but better to release the memory early.
 					 */
-					LogicalTapeRewindForWrite(state->tapeset, srcTape);
+					LogicalTapeClose(state->tapes[srcTape]);
 					return true;
 				}
 				newtup.srctape = srcTape;
@@ -2667,9 +2663,12 @@ inittapes(Tuplesortstate *state, bool mergeruns)
 	/* Create the tape set and allocate the per-tape data arrays */
 	inittapestate(state, maxTapes);
 	state->tapeset =
-		LogicalTapeSetCreate(maxTapes, false, NULL,
+		LogicalTapeSetCreate(false,
 							 state->shared ? &state->shared->fileset : NULL,
 							 state->worker);
+	state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
+	for (j = 0; j < maxTapes; j++)
+		state->tapes[j] = LogicalTapeCreate(state->tapeset);
 
 	state->currentRun = 0;
 
@@ -2919,7 +2918,7 @@ mergeruns(Tuplesortstate *state)
 
 	/* End of step D2: rewind all output tapes to prepare for merging */
 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-		LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
+		LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
 
 	for (;;)
 	{
@@ -2981,11 +2980,14 @@ mergeruns(Tuplesortstate *state)
 		/* Step D6: decrease level */
 		if (--state->Level == 0)
 			break;
+
 		/* rewind output tape T to use as new input */
-		LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
+		LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
 								 state->read_buffer_size);
-		/* rewind used-up input tape P, and prepare it for write pass */
-		LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
+
+		/* close used-up input tape P, and create a new one for write pass */
+		LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
+		state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
 		state->tp_runs[state->tapeRange - 1] = 0;
 
 		/*
@@ -3013,18 +3015,21 @@ mergeruns(Tuplesortstate *state)
 	 * output tape while rewinding it.  The last iteration of step D6 would be
 	 * a waste of cycles anyway...
 	 */
-	state->result_tape = state->tp_tapenum[state->tapeRange];
+	state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
 	if (!WORKER(state))
-		LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+		LogicalTapeFreeze(state->result_tape, NULL);
 	else
 		worker_freeze_result_tape(state);
 	state->status = TSS_SORTEDONTAPE;
 
-	/* Release the read buffers of all the other tapes, by rewinding them. */
+	/* Close all the other tapes, to release their read buffers. */
 	for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
 	{
-		if (tapenum != state->result_tape)
-			LogicalTapeRewindForWrite(state->tapeset, tapenum);
+		if (state->tapes[tapenum] != state->result_tape)
+		{
+			LogicalTapeClose(state->tapes[tapenum]);
+			state->tapes[tapenum] = NULL;
+		}
 	}
 }
 
@@ -3037,7 +3042,8 @@ mergeruns(Tuplesortstate *state)
 static void
 mergeonerun(Tuplesortstate *state)
 {
-	int			destTape = state->tp_tapenum[state->tapeRange];
+	int			destTapeNum = state->tp_tapenum[state->tapeRange];
+	LogicalTape *destTape = state->tapes[destTapeNum];
 	int			srcTape;
 
 	/*
@@ -3080,7 +3086,7 @@ mergeonerun(Tuplesortstate *state)
 	 * When the heap empties, we're done.  Write an end-of-run marker on the
 	 * output tape, and increment its count of real runs.
 	 */
-	markrunend(state, destTape);
+	markrunend(destTape);
 	state->tp_runs[state->tapeRange]++;
 
 #ifdef TRACE_SORT
@@ -3146,17 +3152,18 @@ beginmerge(Tuplesortstate *state)
  * Returns false on EOF.
  */
 static bool
-mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
 {
+	LogicalTape *srcTape = state->tapes[srcTapeIndex];
 	unsigned int tuplen;
 
-	if (!state->mergeactive[srcTape])
+	if (!state->mergeactive[srcTapeIndex])
 		return false;			/* tape's run is already exhausted */
 
 	/* read next tuple, if any */
-	if ((tuplen = getlen(state, srcTape, true)) == 0)
+	if ((tuplen = getlen(srcTape, true)) == 0)
 	{
-		state->mergeactive[srcTape] = false;
+		state->mergeactive[srcTapeIndex] = false;
 		return false;
 	}
 	READTUP(state, stup, srcTape, tuplen);
@@ -3173,6 +3180,7 @@ mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
 static void
 dumptuples(Tuplesortstate *state, bool alltuples)
 {
+	LogicalTape *destTape;
 	int			memtupwrite;
 	int			i;
 
@@ -3239,10 +3247,10 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 #endif
 
 	memtupwrite = state->memtupcount;
+	destTape = state->tapes[state->tp_tapenum[state->destTape]];
 	for (i = 0; i < memtupwrite; i++)
 	{
-		WRITETUP(state, state->tp_tapenum[state->destTape],
-				 &state->memtuples[i]);
+		WRITETUP(state, destTape, &state->memtuples[i]);
 		state->memtupcount--;
 	}
 
@@ -3255,7 +3263,7 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 	 */
 	MemoryContextReset(state->tuplecontext);
 
-	markrunend(state, state->tp_tapenum[state->destTape]);
+	markrunend(destTape);
 	state->tp_runs[state->destTape]++;
 	state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
 
@@ -3289,9 +3297,7 @@ tuplesort_rescan(Tuplesortstate *state)
 			state->markpos_eof = false;
 			break;
 		case TSS_SORTEDONTAPE:
-			LogicalTapeRewindForRead(state->tapeset,
-									 state->result_tape,
-									 0);
+			LogicalTapeRewindForRead(state->result_tape, 0);
 			state->eof_reached = false;
 			state->markpos_block = 0L;
 			state->markpos_offset = 0;
@@ -3322,8 +3328,7 @@ tuplesort_markpos(Tuplesortstate *state)
 			state->markpos_eof = state->eof_reached;
 			break;
 		case TSS_SORTEDONTAPE:
-			LogicalTapeTell(state->tapeset,
-							state->result_tape,
+			LogicalTapeTell(state->result_tape,
 							&state->markpos_block,
 							&state->markpos_offset);
 			state->markpos_eof = state->eof_reached;
@@ -3354,8 +3359,7 @@ tuplesort_restorepos(Tuplesortstate *state)
 			state->eof_reached = state->markpos_eof;
 			break;
 		case TSS_SORTEDONTAPE:
-			LogicalTapeSeek(state->tapeset,
-							state->result_tape,
+			LogicalTapeSeek(state->result_tape,
 							state->markpos_block,
 							state->markpos_offset);
 			state->eof_reached = state->markpos_eof;
@@ -3697,11 +3701,11 @@ reversedirection(Tuplesortstate *state)
  */
 
 static unsigned int
-getlen(Tuplesortstate *state, int tapenum, bool eofOK)
+getlen(LogicalTape *tape, bool eofOK)
 {
 	unsigned int len;
 
-	if (LogicalTapeRead(state->tapeset, tapenum,
+	if (LogicalTapeRead(tape,
 						&len, sizeof(len)) != sizeof(len))
 		elog(ERROR, "unexpected end of tape");
 	if (len == 0 && !eofOK)
@@ -3710,11 +3714,11 @@ getlen(Tuplesortstate *state, int tapenum, bool eofOK)
 }
 
 static void
-markrunend(Tuplesortstate *state, int tapenum)
+markrunend(LogicalTape *tape)
 {
 	unsigned int len = 0;
 
-	LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
+	LogicalTapeWrite(tape, (void *) &len, sizeof(len));
 }
 
 /*
@@ -3892,7 +3896,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
 	MinimalTuple tuple = (MinimalTuple) stup->tuple;
 
@@ -3903,13 +3907,10 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
 	/* total on-disk footprint: */
 	unsigned int tuplen = tupbodylen + sizeof(int);
 
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) tupbody, tupbodylen);
+	LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+	LogicalTapeWrite(tape, (void *) tupbody, tupbodylen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
-						 (void *) &tuplen, sizeof(tuplen));
+		LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
 
 	if (!state->slabAllocatorUsed)
 	{
@@ -3920,7 +3921,7 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_heap(Tuplesortstate *state, SortTuple *stup,
-			 int tapenum, unsigned int len)
+			 LogicalTape *tape, unsigned int len)
 {
 	unsigned int tupbodylen = len - sizeof(int);
 	unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
@@ -3930,11 +3931,9 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
 
 	/* read in the tuple proper */
 	tuple->t_len = tuplen;
-	LogicalTapeReadExact(state->tapeset, tapenum,
-						 tupbody, tupbodylen);
+	LogicalTapeReadExact(tape, tupbody, tupbodylen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &tuplen, sizeof(tuplen));
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
 	stup->tuple = (void *) tuple;
 	/* set up first-column key value */
 	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
@@ -4135,21 +4134,17 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
 	HeapTuple	tuple = (HeapTuple) stup->tuple;
 	unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
 
 	/* We need to store t_self, but not other fields of HeapTupleData */
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 &tuple->t_self, sizeof(ItemPointerData));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 tuple->t_data, tuple->t_len);
+	LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+	LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData));
+	LogicalTapeWrite(tape, tuple->t_data, tuple->t_len);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
-						 &tuplen, sizeof(tuplen));
+		LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
 
 	if (!state->slabAllocatorUsed)
 	{
@@ -4160,7 +4155,7 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
-				int tapenum, unsigned int tuplen)
+				LogicalTape *tape, unsigned int tuplen)
 {
 	unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
 	HeapTuple	tuple = (HeapTuple) readtup_alloc(state,
@@ -4169,16 +4164,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
 	/* Reconstruct the HeapTupleData header */
 	tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
 	tuple->t_len = t_len;
-	LogicalTapeReadExact(state->tapeset, tapenum,
-						 &tuple->t_self, sizeof(ItemPointerData));
+	LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData));
 	/* We don't currently bother to reconstruct t_tableOid */
 	tuple->t_tableOid = InvalidOid;
 	/* Read in the tuple body */
-	LogicalTapeReadExact(state->tapeset, tapenum,
-						 tuple->t_data, tuple->t_len);
+	LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &tuplen, sizeof(tuplen));
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
 	stup->tuple = (void *) tuple;
 	/* set up first-column key value, if it's a simple column */
 	if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
@@ -4392,19 +4384,16 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
 	IndexTuple	tuple = (IndexTuple) stup->tuple;
 	unsigned int tuplen;
 
 	tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) tuple, IndexTupleSize(tuple));
+	LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+	LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple));
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
-						 (void *) &tuplen, sizeof(tuplen));
+		LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
 
 	if (!state->slabAllocatorUsed)
 	{
@@ -4415,16 +4404,14 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_index(Tuplesortstate *state, SortTuple *stup,
-			  int tapenum, unsigned int len)
+			  LogicalTape *tape, unsigned int len)
 {
 	unsigned int tuplen = len - sizeof(unsigned int);
 	IndexTuple	tuple = (IndexTuple) readtup_alloc(state, tuplen);
 
-	LogicalTapeReadExact(state->tapeset, tapenum,
-						 tuple, tuplen);
+	LogicalTapeReadExact(tape, tuple, tuplen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &tuplen, sizeof(tuplen));
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
 	stup->tuple = (void *) tuple;
 	/* set up first-column key value */
 	stup->datum1 = index_getattr(tuple,
@@ -4466,7 +4453,7 @@ copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
 	void	   *waddr;
 	unsigned int tuplen;
@@ -4491,13 +4478,10 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 	writtenlen = tuplen + sizeof(unsigned int);
 
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) &writtenlen, sizeof(writtenlen));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 waddr, tuplen);
+	LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
+	LogicalTapeWrite(tape, waddr, tuplen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
-						 (void *) &writtenlen, sizeof(writtenlen));
+		LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
 
 	if (!state->slabAllocatorUsed && stup->tuple)
 	{
@@ -4508,7 +4492,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_datum(Tuplesortstate *state, SortTuple *stup,
-			  int tapenum, unsigned int len)
+			  LogicalTape *tape, unsigned int len)
 {
 	unsigned int tuplen = len - sizeof(unsigned int);
 
@@ -4522,8 +4506,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
 	else if (!state->tuples)
 	{
 		Assert(tuplen == sizeof(Datum));
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &stup->datum1, tuplen);
+		LogicalTapeReadExact(tape, &stup->datum1, tuplen);
 		stup->isnull1 = false;
 		stup->tuple = NULL;
 	}
@@ -4531,16 +4514,14 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
 	{
 		void	   *raddr = readtup_alloc(state, tuplen);
 
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 raddr, tuplen);
+		LogicalTapeReadExact(tape, raddr, tuplen);
 		stup->datum1 = PointerGetDatum(raddr);
 		stup->isnull1 = false;
 		stup->tuple = raddr;
 	}
 
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &tuplen, sizeof(tuplen));
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
 }
 
 /*
@@ -4652,7 +4633,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
 	TapeShare	output;
 
 	Assert(WORKER(state));
-	Assert(state->result_tape != -1);
+	Assert(state->result_tape != NULL);
 	Assert(state->memtupcount == 0);
 
 	/*
@@ -4668,7 +4649,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
 	 * Parallel worker requires result tape metadata, which is to be stored in
 	 * shared memory for leader
 	 */
-	LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+	LogicalTapeFreeze(state->result_tape, &output);
 
 	/* Store properties of output tape, and update finished worker count */
 	SpinLockAcquire(&shared->mutex);
@@ -4687,9 +4668,9 @@ static void
 worker_nomergeruns(Tuplesortstate *state)
 {
 	Assert(WORKER(state));
-	Assert(state->result_tape == -1);
+	Assert(state->result_tape == NULL);
 
-	state->result_tape = state->tp_tapenum[state->destTape];
+	state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
 	worker_freeze_result_tape(state);
 }
 
@@ -4733,9 +4714,13 @@ leader_takeover_tapes(Tuplesortstate *state)
 	 * randomAccess is disallowed for parallel sorts.
 	 */
 	inittapestate(state, nParticipants + 1);
-	state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
-										  shared->tapes, &shared->fileset,
+	state->tapeset = LogicalTapeSetCreate(false,
+										  &shared->fileset,
 										  state->worker);
+	state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
+	for (j = 0; j < nParticipants; j++)
+		state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
+	/* tapes[nParticipants] represents the "leader tape", which is not used */
 
 	/* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
 	state->currentRun = nParticipants;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 37cb4f3d59a..2e8cbee69ff 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -41,6 +41,7 @@ struct ExprContext;
 struct RangeTblEntry;			/* avoid including parsenodes.h here */
 struct ExprEvalStep;			/* avoid including execExpr.h everywhere */
 struct CopyMultiInsertBuffer;
+struct LogicalTapeSet;
 
 
 /* ----------------
@@ -2316,7 +2317,7 @@ typedef struct AggState
 	bool		table_filled;	/* hash table filled yet? */
 	int			num_hashes;
 	MemoryContext hash_metacxt; /* memory for hash table itself */
-	struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
+	struct LogicalTapeSet *hash_tapeset;	/* tape set for hash spill tapes */
 	struct HashAggSpill *hash_spills;	/* HashAggSpill for each grouping set,
 										 * exists only during first pass */
 	TupleTableSlot *hash_spill_rslot;	/* for reading spill files */
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index 85d2e03c631..758a19779c6 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -18,9 +18,13 @@
 
 #include "storage/sharedfileset.h"
 
-/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
-
+/*
+ * LogicalTapeSet and LogicalTape are opaque types whose details are not
+ * known outside logtape.c.
+ */
 typedef struct LogicalTapeSet LogicalTapeSet;
+typedef struct LogicalTape LogicalTape;
+
 
 /*
  * The approach tuplesort.c takes to parallel external sorts is that workers,
@@ -54,27 +58,20 @@ typedef struct TapeShare
  * prototypes for functions in logtape.c
  */
 
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, bool preallocate,
-											TapeShare *shared,
+extern LogicalTapeSet *LogicalTapeSetCreate(bool preallocate,
 											SharedFileSet *fileset, int worker);
+extern void LogicalTapeClose(LogicalTape *lt);
 extern void LogicalTapeSetClose(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeCreate(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared);
 extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
-extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
-							  void *ptr, size_t size);
-extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
-							 void *ptr, size_t size);
-extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
-									 size_t buffer_size);
-extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
-							  TapeShare *share);
-extern void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional);
-extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
-								   size_t size);
-extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
-							long blocknum, int offset);
-extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
-							long *blocknum, int *offset);
+extern size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size);
+extern void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share);
+extern size_t LogicalTapeBackspace(LogicalTape *lt, size_t size);
+extern void LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset);
+extern void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset);
 extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
 
 #endif							/* LOGTAPE_H */
-- 
2.30.2

