From 3e8e43232b08eac40bb5f634be158b8577fc844f Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 20 Jan 2019 13:28:26 -0800
Subject: [PATCH v23] tableam: multi_insert and slotify COPY.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/heap/heapam.c         |  23 +-
 src/backend/access/heap/heapam_handler.c |   1 +
 src/backend/commands/copy.c              | 322 ++++++++++++-----------
 src/include/access/heapam.h              |   3 +-
 src/include/access/tableam.h             |  14 +
 src/include/nodes/execnodes.h            |   6 +
 6 files changed, 204 insertions(+), 165 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f3812dd5871..71d14789c98 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2106,7 +2106,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
  * temporary context before calling this, if that's a problem.
  */
 void
-heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate)
 {
 	TransactionId xid = GetCurrentTransactionId();
@@ -2127,11 +2127,18 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
-	/* Toast and set header data in all the tuples */
+	/* Toast and set header data in all the slots */
 	heaptuples = palloc(ntuples * sizeof(HeapTuple));
 	for (i = 0; i < ntuples; i++)
-		heaptuples[i] = heap_prepare_insert(relation, tuples[i],
-											xid, cid, options);
+	{
+		HeapTuple tuple;
+
+		tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL);
+		slots[i]->tts_tableOid = RelationGetRelid(relation);
+		tuple->t_tableOid = slots[i]->tts_tableOid;
+		heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid,
+											options);
+	}
 
 	/*
 	 * We're about to do the actual inserts -- but check for conflict first,
@@ -2361,13 +2368,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
 	}
 
-	/*
-	 * Copy t_self fields back to the caller's original tuples. This does
-	 * nothing for untoasted tuples (tuples[i] == heaptuples[i)], but it's
-	 * probably faster to always copy than check.
-	 */
+	/* copy t_self fields back to the caller's slots */
 	for (i = 0; i < ntuples; i++)
-		tuples[i]->t_self = heaptuples[i]->t_self;
+		slots[i]->tts_tid = heaptuples[i]->t_self;
 
 	pgstat_count_heap_insert(relation, ntuples);
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 0e1a1fe7b6f..692b949d6fc 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -539,6 +539,7 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
+	.multi_insert = heap_multi_insert,
 	.tuple_lock = heapam_tuple_lock,
 
 	.tuple_fetch_row_version = heapam_fetch_row_version,
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 705df8900ba..5ecc4f42835 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -315,13 +315,12 @@ static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 static void EndCopyTo(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
-static void CopyOneRowTo(CopyState cstate,
-			 Datum *values, bool *nulls);
+static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
 					CommandId mycid, int hi_options,
-					ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+					ResultRelInfo *resultRelInfo,
 					BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
+					int nBufferedTuples, TupleTableSlot **bufferedSlots,
 					uint64 firstBufferedLineNo);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
@@ -2072,33 +2071,27 @@ CopyTo(CopyState cstate)
 
 	if (cstate->rel)
 	{
-		Datum	   *values;
-		bool	   *nulls;
+		TupleTableSlot *slot;
 		TableScanDesc scandesc;
-		HeapTuple	tuple;
-
-		values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
-		nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
 
 		scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
+		slot = table_slot_create(cstate->rel, NULL);
 
 		processed = 0;
-		while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
+		while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
 		{
 			CHECK_FOR_INTERRUPTS();
 
-			/* Deconstruct the tuple ... faster than repeated heap_getattr */
-			heap_deform_tuple(tuple, tupDesc, values, nulls);
+			/* Deconstruct the tuple ... */
+			slot_getallattrs(slot);
 
 			/* Format and send the data */
-			CopyOneRowTo(cstate, values, nulls);
+			CopyOneRowTo(cstate, slot);
 			processed++;
 		}
 
+		ExecDropSingleTupleTableSlot(slot);
 		table_endscan(scandesc);
-
-		pfree(values);
-		pfree(nulls);
 	}
 	else
 	{
@@ -2124,7 +2117,7 @@ CopyTo(CopyState cstate)
  * Emit one row during CopyTo().
  */
 static void
-CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
+CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
 {
 	bool		need_delim = false;
 	FmgrInfo   *out_functions = cstate->out_functions;
@@ -2141,11 +2134,14 @@ CopyOneRowTo(CopyState cstate, Datum *values, bool *nulls)
 		CopySendInt16(cstate, list_length(cstate->attnumlist));
 	}
 
+	/* Make sure the tuple is fully deconstructed */
+	slot_getallattrs(slot);
+
 	foreach(cur, cstate->attnumlist)
 	{
 		int			attnum = lfirst_int(cur);
-		Datum		value = values[attnum - 1];
-		bool		isnull = nulls[attnum - 1];
+		Datum		value = slot->tts_values[attnum - 1];
+		bool		isnull = slot->tts_isnull[attnum - 1];
 
 		if (!cstate->binary)
 		{
@@ -2310,19 +2306,14 @@ limit_printout_length(const char *str)
 uint64
 CopyFrom(CopyState cstate)
 {
-	HeapTuple	tuple;
-	TupleDesc	tupDesc;
-	Datum	   *values;
-	bool	   *nulls;
 	ResultRelInfo *resultRelInfo;
 	ResultRelInfo *target_resultRelInfo;
 	ResultRelInfo *prevResultRelInfo = NULL;
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
-	TupleTableSlot *myslot;
+	TupleTableSlot *singleslot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
-	MemoryContext batchcontext;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
@@ -2338,8 +2329,7 @@ CopyFrom(CopyState cstate)
 
 #define MAX_BUFFERED_TUPLES 1000
 #define RECHECK_MULTI_INSERT_THRESHOLD 1000
-	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
-	Size		bufferedTuplesSize = 0;
+	Size		bufferedInputSize = 0;
 	uint64		firstBufferedLineNo = 0;
 	uint64		lastPartitionSampleLineNo = 0;
 	uint64		nPartitionChanges = 0;
@@ -2381,8 +2371,6 @@ CopyFrom(CopyState cstate)
 							RelationGetRelationName(cstate->rel))));
 	}
 
-	tupDesc = RelationGetDescr(cstate->rel);
-
 	/*----------
 	 * Check to see if we can avoid writing WAL
 	 *
@@ -2517,10 +2505,6 @@ CopyFrom(CopyState cstate)
 
 	ExecInitRangeTable(estate, cstate->range_table);
 
-	/* Set up a tuple slot too */
-	myslot = ExecInitExtraTupleSlot(estate, tupDesc,
-									&TTSOpsHeapTuple);
-
 	/*
 	 * Set up a ModifyTableState so we can let FDW(s) init themselves for
 	 * foreign-table result relation(s).
@@ -2642,7 +2626,20 @@ CopyFrom(CopyState cstate)
 		else
 			insertMethod = CIM_MULTI;
 
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		resultRelInfo->ri_batchInsertSlots =
+			palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
+	}
+
+	/*
+	 * If not using batch mode (which allocates slots as needed) set up a
+	 * tuple slot too. When inserting into a partitioned table, we also need
+	 * one, even if we might batch insert, to read the tuple in the root
+	 * partition's form.
+	 */
+	if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
+	{
+		singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+									   &estate->es_tupleTable);
 	}
 
 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
@@ -2659,9 +2656,6 @@ CopyFrom(CopyState cstate)
 	 */
 	ExecBSInsertTriggers(estate, resultRelInfo);
 
-	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-
 	bistate = GetBulkInsertState();
 	econtext = GetPerTupleExprContext(estate);
 
@@ -2671,17 +2665,9 @@ CopyFrom(CopyState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
-	/*
-	 * Set up memory context for batches. For cases without batching we could
-	 * use the per-tuple context, but it's simpler to just use it every time.
-	 */
-	batchcontext = AllocSetContextCreate(CurrentMemoryContext,
-										 "batch context",
-										 ALLOCSET_DEFAULT_SIZES);
-
 	for (;;)
 	{
-		TupleTableSlot *slot;
+		TupleTableSlot *myslot;
 		bool		skip_tuple;
 
 		CHECK_FOR_INTERRUPTS();
@@ -2692,20 +2678,37 @@ CopyFrom(CopyState cstate)
 		 */
 		ResetPerTupleExprContext(estate);
 
+		if (insertMethod == CIM_SINGLE || proute)
+		{
+			myslot = singleslot;
+			Assert(myslot != NULL);
+		}
+		else
+		{
+			Assert(resultRelInfo == target_resultRelInfo);
+
+			if (resultRelInfo->ri_batchInsertSlots[nBufferedTuples] == NULL)
+			{
+				resultRelInfo->ri_batchInsertSlots[nBufferedTuples] =
+					table_slot_create(resultRelInfo->ri_RelationDesc,
+									  &estate->es_tupleTable);
+			}
+			myslot = resultRelInfo->ri_batchInsertSlots[nBufferedTuples];
+		}
+
 		/*
 		 * Switch to per-tuple context before calling NextCopyFrom, which does
 		 * evaluate default expressions etc. and requires per-tuple context.
 		 */
 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 
-		if (!NextCopyFrom(cstate, econtext, values, nulls))
+		ExecClearTuple(myslot);
+
+		/* Directly store the values/nulls array in the slot */
+		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
 			break;
 
-		/* Switch into per-batch memory context before forming the tuple. */
-		MemoryContextSwitchTo(batchcontext);
-
-		/* And now we can form the input tuple. */
-		tuple = heap_form_tuple(tupDesc, values, nulls);
+		ExecStoreVirtualTuple(myslot);
 
 		/*
 		 * Constraints might reference the tableoid column, so (re-)initialize
@@ -2716,10 +2719,6 @@ CopyFrom(CopyState cstate)
 		/* Triggers and stuff need to be invoked in query context. */
 		MemoryContextSwitchTo(oldcontext);
 
-		/* Place tuple in tuple slot --- but slot shouldn't free it */
-		slot = myslot;
-		ExecStoreHeapTuple(tuple, slot, false);
-
 		if (cstate->whereClause)
 		{
 			econtext->ecxt_scantuple = myslot;
@@ -2738,7 +2737,7 @@ CopyFrom(CopyState cstate)
 			 * if the found partition is not suitable for INSERTs.
 			 */
 			resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
-											  proute, slot, estate);
+											  proute, myslot, estate);
 
 			if (prevResultRelInfo != resultRelInfo)
 			{
@@ -2752,38 +2751,15 @@ CopyFrom(CopyState cstate)
 					 */
 					if (nBufferedTuples > 0)
 					{
-						MemoryContext	oldcontext;
+						TupleTableSlot **slots =
+							prevResultRelInfo->ri_batchInsertSlots;
 
 						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											prevResultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
+											prevResultRelInfo, bistate,
+											nBufferedTuples,
+											slots,
 											firstBufferedLineNo);
 						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/*
-						 * The tuple is already allocated in the batch context, which
-						 * we want to reset.  So to keep the tuple we copy it into the
-						 * short-lived (per-tuple) context, reset the batch context
-						 * and then copy it back into the per-batch one.
-						 */
-						oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/* cleanup the old batch */
-						MemoryContextReset(batchcontext);
-
-						/* copy the tuple back to the per-batch context */
-						oldcontext = MemoryContextSwitchTo(batchcontext);
-						tuple = heap_copytuple(tuple);
-						MemoryContextSwitchTo(oldcontext);
-
-						/*
-						 * Also push the tuple copy to the slot (resetting the context
-						 * invalidated the slot contents).
-						 */
-						ExecStoreHeapTuple(tuple, slot, false);
 					}
 
 					nPartitionChanges++;
@@ -2878,26 +2854,64 @@ CopyFrom(CopyState cstate)
 			 * rowtype.
 			 */
 			map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
-			if (map != NULL)
+			if (insertMethod == CIM_SINGLE ||
+				(insertMethod == CIM_MULTI_CONDITIONAL && !leafpart_use_multi_insert))
 			{
-				TupleTableSlot *new_slot;
-				MemoryContext oldcontext;
+				/* non batch insert */
 
-				new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
-				Assert(new_slot != NULL);
-
-				slot = execute_attr_map_slot(map->attrMap, slot, new_slot);
+				if (map != NULL)
+				{
+					TupleTableSlot *new_slot;
 
+					new_slot = resultRelInfo->ri_PartitionInfo->pi_PartitionTupleSlot;
+					myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
+				}
+			}
+			else
+			{
 				/*
-				 * Get the tuple in the per-batch context, so that it will be
-				 * freed after each batch insert.
+				 * Batch insert into partitioned table.
 				 */
-				oldcontext = MemoryContextSwitchTo(batchcontext);
-				tuple = ExecCopySlotHeapTuple(slot);
-				MemoryContextSwitchTo(oldcontext);
+
+				TupleTableSlot **slots =
+					resultRelInfo->ri_batchInsertSlots;
+				TupleTableSlot *new_slot;
+
+				/* no other path available for partitioned table */
+				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
+
+				/* Ensure partition ResultRelInfo has a batchInsertSlots array */
+				if (!slots)
+				{
+					slots = palloc0(MAX_BUFFERED_TUPLES * sizeof(TupleTableSlot *));
+					resultRelInfo->ri_batchInsertSlots = slots;
+				}
+				if (slots[nBufferedTuples] == NULL)
+				{
+					slots[nBufferedTuples] =
+						table_slot_create(resultRelInfo->ri_RelationDesc,
+										  &estate->es_tupleTable);
+				}
+				new_slot = slots[nBufferedTuples];
+
+				if (map != NULL)
+					myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
+				else
+				{
+					/*
+					 * This looks more expensive than it is (Believe me, I
+					 * optimized it away. Twice). The input is in virtual
+					 * form, and we'll materialize the slot below - for most
+					 * slot types the copy performs the work materialization
+					 * would later require anyway.
+					 */
+					ExecCopySlot(new_slot, myslot);
+					myslot = new_slot;
+				}
 			}
 
-			slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+			/* ensure that triggers etc see the right relation  */
+			myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 		}
 
 		skip_tuple = false;
@@ -2905,7 +2919,7 @@ CopyFrom(CopyState cstate)
 		/* BEFORE ROW INSERT Triggers */
 		if (has_before_insert_row_trig)
 		{
-			if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
 				skip_tuple = true;	/* "do nothing" */
 		}
 
@@ -2918,7 +2932,7 @@ CopyFrom(CopyState cstate)
 			 */
 			if (has_instead_insert_row_trig)
 			{
-				ExecIRInsertTriggers(estate, resultRelInfo, slot);
+				ExecIRInsertTriggers(estate, resultRelInfo, myslot);
 			}
 			else
 			{
@@ -2928,7 +2942,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_FdwRoutine == NULL &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr)
-					ExecConstraints(resultRelInfo, slot, estate);
+					ExecConstraints(resultRelInfo, myslot, estate);
 
 				/*
 				 * Also check the tuple against the partition constraint, if
@@ -2938,7 +2952,7 @@ CopyFrom(CopyState cstate)
 				 */
 				if (resultRelInfo->ri_PartitionCheck &&
 					(proute == NULL || has_before_insert_row_trig))
-					ExecPartitionCheck(resultRelInfo, slot, estate, true);
+					ExecPartitionCheck(resultRelInfo, myslot, estate, true);
 
 				/*
 				 * Perform multi-inserts when enabled, or when loading a
@@ -2950,8 +2964,17 @@ CopyFrom(CopyState cstate)
 					/* Add this tuple to the tuple buffer */
 					if (nBufferedTuples == 0)
 						firstBufferedLineNo = cstate->cur_lineno;
-					bufferedTuples[nBufferedTuples++] = tuple;
-					bufferedTuplesSize += tuple->t_len;
+
+					/*
+					 * The slot previously might point into the per-tuple
+					 * context. For batching it needs to be longer lived.
+					 */
+					ExecMaterializeSlot(myslot);
+
+					Assert(resultRelInfo->ri_batchInsertSlots[nBufferedTuples] == myslot);
+
+					nBufferedTuples++;
+					bufferedInputSize += cstate->line_buf.len;
 
 					/*
 					 * If the buffer filled up, flush it.  Also flush if the
@@ -2960,17 +2983,15 @@ CopyFrom(CopyState cstate)
 					 * buffer when the tuples are exceptionally wide.
 					 */
 					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-						bufferedTuplesSize > 65535)
+						bufferedInputSize > 65535)
 					{
 						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											resultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
+											resultRelInfo, bistate,
+											nBufferedTuples,
+											resultRelInfo->ri_batchInsertSlots,
 											firstBufferedLineNo);
 						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
-
-						/* free memory occupied by tuples from the batch */
-						MemoryContextReset(batchcontext);
+						bufferedInputSize = 0;
 					}
 				}
 				else
@@ -2980,12 +3001,12 @@ CopyFrom(CopyState cstate)
 					/* OK, store the tuple */
 					if (resultRelInfo->ri_FdwRoutine != NULL)
 					{
-						slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
-																			   resultRelInfo,
-																			   slot,
-																			   NULL);
+						myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
+																				 resultRelInfo,
+																				 myslot,
+																				 NULL);
 
-						if (slot == NULL)	/* "do nothing" */
+						if (myslot == NULL)	/* "do nothing" */
 							continue;	/* next tuple please */
 
 						/*
@@ -2993,27 +3014,26 @@ CopyFrom(CopyState cstate)
 						 * column, so (re-)initialize tts_tableOid before
 						 * evaluating them.
 						 */
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
 					else
 					{
-						tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
-						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
-									mycid, hi_options, bistate);
-						ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
-						slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+						/* OK, store the tuple and create index entries for it */
+						table_insert(resultRelInfo->ri_RelationDesc, myslot,
+									 mycid, hi_options, bistate);
 					}
 
+
 					/* And create index entries for it */
 					if (resultRelInfo->ri_NumIndices > 0)
-						recheckIndexes = ExecInsertIndexTuples(slot,
+						recheckIndexes = ExecInsertIndexTuples(myslot,
 															   estate,
 															   false,
 															   NULL,
 															   NIL);
 
 					/* AFTER ROW INSERT Triggers */
-					ExecARInsertTriggers(estate, resultRelInfo, slot,
+					ExecARInsertTriggers(estate, resultRelInfo, myslot,
 										 recheckIndexes, cstate->transition_capture);
 
 					list_free(recheckIndexes);
@@ -3035,26 +3055,26 @@ CopyFrom(CopyState cstate)
 		if (insertMethod == CIM_MULTI_CONDITIONAL)
 		{
 			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-								prevResultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
+								prevResultRelInfo, bistate,
+								nBufferedTuples,
+								prevResultRelInfo->ri_batchInsertSlots,
 								firstBufferedLineNo);
 		}
 		else
 			CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-								resultRelInfo, myslot, bistate,
-								nBufferedTuples, bufferedTuples,
+								resultRelInfo, bistate,
+								nBufferedTuples,
+								resultRelInfo->ri_batchInsertSlots,
 								firstBufferedLineNo);
 	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	FreeBulkInsertState(bistate);
+	ReleaseBulkInsertStatePin(bistate);
 
 	MemoryContextSwitchTo(oldcontext);
 
-	MemoryContextDelete(batchcontext);
-
 	/*
 	 * In the old protocol, tell pqcomm that we can process normal protocol
 	 * messages again.
@@ -3068,9 +3088,6 @@ CopyFrom(CopyState cstate)
 	/* Handle queued AFTER triggers */
 	AfterTriggerEndQuery(estate);
 
-	pfree(values);
-	pfree(nulls);
-
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
 	/* Allow the FDW to shut down */
@@ -3108,8 +3125,7 @@ CopyFrom(CopyState cstate)
 static void
 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 					int hi_options, ResultRelInfo *resultRelInfo,
-					TupleTableSlot *myslot, BulkInsertState bistate,
-					int nBufferedTuples, HeapTuple *bufferedTuples,
+					BulkInsertState bistate, int nBufferedTuples, TupleTableSlot **bufferedSlots,
 					uint64 firstBufferedLineNo)
 {
 	MemoryContext oldcontext;
@@ -3129,12 +3145,12 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 * before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(resultRelInfo->ri_RelationDesc,
-					  bufferedTuples,
-					  nBufferedTuples,
-					  mycid,
-					  hi_options,
-					  bistate);
+	table_multi_insert(resultRelInfo->ri_RelationDesc,
+					   bufferedSlots,
+					   nBufferedTuples,
+					   mycid,
+					   hi_options,
+					   bistate);
 	MemoryContextSwitchTo(oldcontext);
 
 	/*
@@ -3148,12 +3164,11 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 			List	   *recheckIndexes;
 
 			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
 			recheckIndexes =
-				ExecInsertIndexTuples(myslot,
-									  estate, false, NULL, NIL);
+				ExecInsertIndexTuples(bufferedSlots[i], estate, false, NULL,
+									  NIL);
 			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
+								 bufferedSlots[i],
 								 recheckIndexes, cstate->transition_capture);
 			list_free(recheckIndexes);
 		}
@@ -3170,13 +3185,15 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 		for (i = 0; i < nBufferedTuples; i++)
 		{
 			cstate->cur_lineno = firstBufferedLineNo + i;
-			ExecStoreHeapTuple(bufferedTuples[i], myslot, false);
 			ExecARInsertTriggers(estate, resultRelInfo,
-								 myslot,
+								 bufferedSlots[i],
 								 NIL, cstate->transition_capture);
 		}
 	}
 
+	for (i = 0; i < nBufferedTuples; i++)
+		ExecClearTuple(bufferedSlots[i]);
+
 	/* reset cur_lineno and line_buf_valid to what they were */
 	cstate->line_buf_valid = line_buf_valid;
 	cstate->cur_lineno = save_cur_lineno;
@@ -4966,11 +4983,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 	DR_copy    *myState = (DR_copy *) self;
 	CopyState	cstate = myState->cstate;
 
-	/* Make sure the tuple is fully deconstructed */
-	slot_getallattrs(slot);
-
-	/* And send the data */
-	CopyOneRowTo(cstate, slot->tts_values, slot->tts_isnull);
+	/* Send the data */
+	CopyOneRowTo(cstate, slot);
 	myState->processed++;
 
 	return true;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4c077755d54..ed0e2de144d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -36,6 +36,7 @@
 #define HEAP_INSERT_SPECULATIVE 0x0010
 
 typedef struct BulkInsertStateData *BulkInsertState;
+struct TupleTableSlot;
 
 #define MaxLockTupleMode	LockTupleExclusive
 
@@ -143,7 +144,7 @@ extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
 extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			int options, BulkInsertState bistate);
-extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 37890dc2f5c..2bd92cbd530 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -350,6 +350,9 @@ typedef struct TableAmRoutine
 								 LockTupleMode *lockmode,
 								 bool *update_indexes);
 
+	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
+								 CommandId cid, int options, struct BulkInsertStateData *bistate);
+
 	/* see table_insert() for reference about parameters */
 	TM_Result	(*tuple_lock) (Relation rel,
 							   ItemPointer tid,
@@ -875,6 +878,17 @@ table_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 										 lockmode, update_indexes);
 }
 
+/*
+ *	table_multi_insert	- insert multiple tuple into a table
+ */
+static inline void
+table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
+				   CommandId cid, int options, struct BulkInsertStateData *bistate)
+{
+	rel->rd_tableam->multi_insert(rel, slots, nslots,
+								  cid, options, bistate);
+}
+
 /*
  * Lock a tuple in the specified mode.
  *
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 869c303e157..1311b854a99 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -476,6 +476,12 @@ typedef struct ResultRelInfo
 	/* relation descriptor for root partitioned table */
 	Relation	ri_PartitionRoot;
 
+	/*
+	 * When batch inserting into a table, it might be necessary to have one
+	 * slot per input row, up to the largest possible batch size.
+	 */
+	TupleTableSlot **ri_batchInsertSlots;
+
 	/* Additional information specific to partition tuple routing */
 	struct PartitionRoutingInfo *ri_PartitionInfo;
 } ResultRelInfo;
-- 
2.21.0.dirty

