From d9ad05ea0197b0d79e281cf2c4a366d596085077 Mon Sep 17 00:00:00 2001
From: amit <amitlangote09@gmail.com>
Date: Fri, 6 Jan 2017 10:28:04 +0900
Subject: [PATCH 3/3] Support bulk-insert mode for partitioned tables

Currently, the heap layer (hio.c) supports a bulk-insert mode, which
is currently used by certain callers in copy.c, createas.c, etc.
Callers must pass a BulkInsertState object down to heapam routines
like heap_insert() or heap_multi_insert() along with the input row(s)
to engage this mode.

A single BulkInsertState object is good only for a given heap relation.
In case of a partitioned table, successive input rows may be mapped to
different partitions, so different heap relations.  We must use a separate
BulkInsertState object for each partition and switch to the same every
time a given partition is selected.

Also, if we are able to use multi-insert mode in CopyFrom() and hence
will buffer tuples, we must maintain separate buffer spaces and buffered
tuples counts for every partition.  Although, maximum limits on the
number of buffered tuples and buffered tuple size (across partitions)
are still the old compile-time constants, not scaled based on, say,
number of partitions.  It might be possible to raise that limit so that
enough tuples are buffered per partition in the worst case that input
tuples are randomly ordered.
---
 src/backend/commands/copy.c | 168 ++++++++++++++++++++++++++++----------------
 1 file changed, 109 insertions(+), 59 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9624c93f6b..c4397b4f78 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2296,15 +2296,20 @@ CopyFrom(CopyState cstate)
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			hi_options = 0; /* start with default heap_insert options */
-	BulkInsertState bistate = NULL;
 	uint64		processed = 0;
-	bool		useHeapMultiInsert;
-	int			nBufferedTuples = 0;
 
-#define MAX_BUFFERED_TUPLES 1000
-	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
-	Size		bufferedTuplesSize = 0;
-	int			firstBufferedLineNo = 0;
+#define MAX_BUFFERED_TUPLES			1000
+#define MAX_BUFFERED_TUPLES_SIZE	65535
+	int		num_heaps;
+	bool   *useHeapMultiInsert = NULL;
+	BulkInsertState *bistate = NULL;
+	HeapTuple **bufferedTuples = NULL;	/* initialize to silence warning */
+	Size	   *bufferedTuplesSize = NULL;
+	int		   *firstBufferedLineNo = NULL;
+	int		   *nBufferedTuples = NULL;
+	Size		bufferedTuplesSize_total = 0;
+	int			nBufferedTuples_total = 0;
+	int			i;
 
 	Assert(cstate->rel);
 
@@ -2449,21 +2454,44 @@ CopyFrom(CopyState cstate)
 	 * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
 	 * expressions. Such triggers or expressions might query the table we're
 	 * inserting to, and act differently if the tuples that have already been
-	 * processed and prepared for insertion are not there.  We also can't do
-	 * it if the table is partitioned.
+	 * processed and prepared for insertion are not there.
+	 *
+	 * In case of a regular table there is only one heap, whereas in case of
+	 * a partitioned table, there are as many heaps as there are partitions.
+	 * We must manage buffered tuples separately for each heap.
 	 */
-	if ((resultRelInfo->ri_TrigDesc != NULL &&
-		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
-		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
-		cstate->volatile_defexprs ||
-		cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
-	{
-		useHeapMultiInsert = false;
-	}
-	else
+	num_heaps = cstate->num_partitions > 0 ? cstate->num_partitions : 1;
+
+	bufferedTuples = (HeapTuple **) palloc0(num_heaps * sizeof(HeapTuple *));
+	useHeapMultiInsert = (bool *) palloc(num_heaps * sizeof(bool));
+	nBufferedTuples = (int *) palloc0(num_heaps * sizeof(int));
+	bufferedTuplesSize = (Size *) palloc0(num_heaps * sizeof(Size));
+	firstBufferedLineNo = (int *) palloc0(num_heaps * sizeof(int));
+
+	/* Also, maintain separate bulk-insert state for every heap */
+	bistate = (BulkInsertState *) palloc(num_heaps * sizeof(BulkInsertState));
+
+	for (i = 0; i < num_heaps; i++)
 	{
-		useHeapMultiInsert = true;
-		bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+		/*
+		 * In case of a partitioned table, we check the individual partition's
+		 * TriggerDesc and not the root parent table's.
+		 */
+		ResultRelInfo *cur_rel = cstate->partitions ? cstate->partitions + i
+													: resultRelInfo;
+
+		if ((cur_rel->ri_TrigDesc != NULL &&
+			(cur_rel->ri_TrigDesc->trig_insert_before_row ||
+			 cur_rel->ri_TrigDesc->trig_insert_instead_row)) ||
+			cstate->volatile_defexprs)
+			useHeapMultiInsert[i] = false;
+		else
+			useHeapMultiInsert[i] = true;
+
+		if (useHeapMultiInsert[i])
+			bufferedTuples[i] = palloc(MAX_BUFFERED_TUPLES *
+									   sizeof(HeapTuple));
+		bistate[i] = GetBulkInsertState();
 	}
 
 	/* Prepare to catch AFTER triggers. */
@@ -2480,14 +2508,6 @@ CopyFrom(CopyState cstate)
 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
 
-	/*
-	 * FIXME: We don't engage the bulk-insert mode for partitioned tables,
-	 * because the the heap relation is most likely change from one row to
-	 * next due to tuple-routing.
-	 */
-	if (cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		bistate = GetBulkInsertState();
-
 	econtext = GetPerTupleExprContext(estate);
 
 	/* Set up callback to identify error line number */
@@ -2502,15 +2522,16 @@ CopyFrom(CopyState cstate)
 					   *oldslot;
 		bool		skip_tuple;
 		Oid			loaded_oid = InvalidOid;
+		int			cur_heap = 0;
 
 		CHECK_FOR_INTERRUPTS();
 
-		if (nBufferedTuples == 0)
+		if (nBufferedTuples_total == 0)
 		{
 			/*
 			 * Reset the per-tuple exprcontext. We can only do this if the
-			 * tuple buffer is empty. (Calling the context the per-tuple
-			 * memory context is a bit of a misnomer now.)
+			 * there are no buffered tuples. (Calling the context the
+			 * per-tuple memory context is a bit of a misnomer now.)
 			 */
 			ResetPerTupleExprContext(estate);
 		}
@@ -2561,6 +2582,7 @@ CopyFrom(CopyState cstate)
 												estate);
 			Assert(leaf_part_index >= 0 &&
 				   leaf_part_index < cstate->num_partitions);
+			cur_heap = leaf_part_index;
 
 			/*
 			 * Save the old ResultRelInfo and switch to the one corresponding
@@ -2589,7 +2611,13 @@ CopyFrom(CopyState cstate)
 			{
 				Relation	partrel = resultRelInfo->ri_RelationDesc;
 
+				/*
+				 * Allocate memory for the converted tuple in the per-tuple
+				 * context just like the original tuple.
+				 */
+				MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 				tuple = do_convert_tuple(tuple, map);
+				MemoryContextSwitchTo(oldcontext);
 
 				/*
 				 * We must use the partition's tuple descriptor from this
@@ -2599,7 +2627,7 @@ CopyFrom(CopyState cstate)
 				slot = cstate->partition_tuple_slot;
 				Assert(slot != NULL);
 				ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
-				ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+				ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 			}
 
 			tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
@@ -2634,29 +2662,47 @@ CopyFrom(CopyState cstate)
 					resultRelInfo->ri_PartitionCheck)
 					ExecConstraints(resultRelInfo, slot, oldslot, estate);
 
-				if (useHeapMultiInsert)
+				if (useHeapMultiInsert[cur_heap])
 				{
-					/* Add this tuple to the tuple buffer */
-					if (nBufferedTuples == 0)
-						firstBufferedLineNo = cstate->cur_lineno;
-					bufferedTuples[nBufferedTuples++] = tuple;
-					bufferedTuplesSize += tuple->t_len;
+					/* Add this tuple to the corresponding tuple buffer */
+					if (nBufferedTuples[cur_heap] == 0)
+						firstBufferedLineNo[cur_heap] = cstate->cur_lineno;
+					bufferedTuples[cur_heap][nBufferedTuples[cur_heap]++] =
+																	tuple;
+					bufferedTuplesSize[cur_heap] += tuple->t_len;
+
+					/* Count the current tuple toward the totals */
+					nBufferedTuples_total += nBufferedTuples[cur_heap];
+					bufferedTuplesSize_total += bufferedTuplesSize[cur_heap];
 
 					/*
-					 * If the buffer filled up, flush it.  Also flush if the
-					 * total size of all the tuples in the buffer becomes
-					 * large, to avoid using large amounts of memory for the
-					 * buffer when the tuples are exceptionally wide.
+					 * If enough tuples are buffered, flush them from the
+					 * individual buffers. Also flush if the total size of
+					 * all the buffered tuples becomes large, to avoid using
+					 * large amounts of buffer memory when the tuples are
+					 * exceptionally wide.
 					 */
-					if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
-						bufferedTuplesSize > 65535)
+					if (nBufferedTuples_total == MAX_BUFFERED_TUPLES ||
+						bufferedTuplesSize_total > MAX_BUFFERED_TUPLES_SIZE)
 					{
-						CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-											resultRelInfo, myslot, bistate,
-											nBufferedTuples, bufferedTuples,
-											firstBufferedLineNo);
-						nBufferedTuples = 0;
-						bufferedTuplesSize = 0;
+						for (i = 0; i < num_heaps; i++)
+						{
+							ResultRelInfo *cur_rel = cstate->partitions
+												   ? cstate->partitions + i
+												   : resultRelInfo;
+
+							CopyFromInsertBatch(cstate, estate, mycid,
+												hi_options, cur_rel,
+												myslot, bistate[i],
+												nBufferedTuples[i],
+												bufferedTuples[i],
+												firstBufferedLineNo[i]);
+							nBufferedTuples[i] = 0;
+							bufferedTuplesSize[i] = 0;
+						}
+
+						nBufferedTuples_total = 0;
+						bufferedTuplesSize_total = 0;
 					}
 				}
 				else
@@ -2665,7 +2711,7 @@ CopyFrom(CopyState cstate)
 
 					/* OK, store the tuple and create index entries for it */
 					heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
-								hi_options, bistate);
+								hi_options, bistate[cur_heap]);
 
 					if (resultRelInfo->ri_NumIndices > 0)
 						recheckIndexes = ExecInsertIndexTuples(slot,
@@ -2699,18 +2745,22 @@ CopyFrom(CopyState cstate)
 	}
 
 	/* Flush any remaining buffered tuples */
-	if (nBufferedTuples > 0)
+	for (i = 0; i < num_heaps; i++)
+	{
+		ResultRelInfo *cur_rel = cstate->partitions ? cstate->partitions + i
+													: resultRelInfo;
+
 		CopyFromInsertBatch(cstate, estate, mycid, hi_options,
-							resultRelInfo, myslot, bistate,
-							nBufferedTuples, bufferedTuples,
-							firstBufferedLineNo);
+							cur_rel, myslot, bistate[i],
+							nBufferedTuples[i], bufferedTuples[i],
+							firstBufferedLineNo[i]);
+
+		FreeBulkInsertState(bistate[i]);
+	}
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	if (bistate != NULL)
-		FreeBulkInsertState(bistate);
-
 	MemoryContextSwitchTo(oldcontext);
 
 	/*
@@ -2803,7 +2853,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
 	 * before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	heap_multi_insert(cstate->rel,
+	heap_multi_insert(resultRelInfo->ri_RelationDesc,
 					  bufferedTuples,
 					  nBufferedTuples,
 					  mycid,
-- 
2.11.0

