From d67c82e870c640e6f4ba25b3da5acf54df7165d2 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Fri, 8 May 2020 02:17:32 -0500
Subject: [PATCH v6 1/3] Allow INSERT SELECT to use a BulkInsertState

---
 src/backend/executor/nodeModifyTable.c | 22 ++++++++++++++++++++--
 src/backend/parser/gram.y              |  7 ++++++-
 src/backend/tcop/utility.c             |  4 ++++
 src/backend/utils/misc/guc.c           | 11 +++++++++++
 src/include/executor/nodeModifyTable.h |  2 ++
 src/include/nodes/execnodes.h          |  3 +++
 src/include/parser/kwlist.h            |  1 +
 src/test/regress/expected/insert.out   | 23 +++++++++++++++++++++++
 src/test/regress/sql/insert.sql        | 13 +++++++++++++
 9 files changed, 83 insertions(+), 3 deletions(-)

diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 29e07b7228..26ff964105 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -72,6 +72,8 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate,
 											   ResultRelInfo *targetRelInfo,
 											   TupleTableSlot *slot,
 											   ResultRelInfo **partRelInfo);
+/* guc */
+bool insert_in_bulk = false;
 
 /*
  * Verify that the tuples to be produced by INSERT or UPDATE match the
@@ -594,7 +596,7 @@ ExecInsert(ModifyTableState *mtstate,
 			table_tuple_insert_speculative(resultRelationDesc, slot,
 										   estate->es_output_cid,
 										   0,
-										   NULL,
+										   NULL, /* Bulk insert not supported */
 										   specToken);
 
 			/* insert index entries for tuple */
@@ -631,10 +633,17 @@ ExecInsert(ModifyTableState *mtstate,
 		}
 		else
 		{
+			if (proute && mtstate->prevResultRelInfo != resultRelInfo)
+			{
+				if (mtstate->bistate)
+					ReleaseBulkInsertStatePin(mtstate->bistate);
+				mtstate->prevResultRelInfo = resultRelInfo;
+			}
+
 			/* insert the tuple normally */
 			table_tuple_insert(resultRelationDesc, slot,
 							   estate->es_output_cid,
-							   0, NULL);
+							   0, mtstate->bistate);
 
 			/* insert index entries for tuple */
 			if (resultRelInfo->ri_NumIndices > 0)
@@ -2232,6 +2241,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
 	mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans);
 	mtstate->mt_nplans = nplans;
+	mtstate->bistate = NULL;
+	if (operation == CMD_INSERT && insert_in_bulk)
+		mtstate->bistate = GetBulkInsertState();
 
 	/* set up epqstate with dummy subplan data for the moment */
 	EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam);
@@ -2698,6 +2710,12 @@ ExecEndModifyTable(ModifyTableState *node)
 														   resultRelInfo);
 	}
 
+	if (node->bistate)
+	{
+		FreeBulkInsertState(node->bistate);
+		table_finish_bulk_insert(node->rootResultRelInfo->ri_RelationDesc, 0);
+	}
+
 	/*
 	 * Close all the partitioned tables, leaf partitions, and their indices
 	 * and release the slot used for tuple routing, if set.
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index efc9c99754..5915c8c414 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -631,7 +631,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	ASSERTION ASSIGNMENT ASYMMETRIC AT ATTACH ATTRIBUTE AUTHORIZATION
 
 	BACKWARD BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT
-	BOOLEAN_P BOTH BY
+	BOOLEAN_P BOTH BULK BY
 
 	CACHE CALL CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P
 	CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
@@ -9886,6 +9886,9 @@ transaction_mode_item:
 			| NOT DEFERRABLE
 					{ $$ = makeDefElem("transaction_deferrable",
 									   makeIntConst(false, @1), @1); }
+			| BULK
+					{ $$ = makeDefElem("bulk",
+									   makeIntConst(true, @1), @1); }
 		;
 
 /* Syntax with commas is SQL-spec, without commas is Postgres historical */
@@ -15157,6 +15160,7 @@ unreserved_keyword:
 			| BACKWARD
 			| BEFORE
 			| BEGIN_P
+			| BULK
 			| BY
 			| CACHE
 			| CALL
@@ -15668,6 +15672,7 @@ bare_label_keyword:
 			| BIT
 			| BOOLEAN_P
 			| BOTH
+			| BULK
 			| BY
 			| CACHE
 			| CALL
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 81ac9b1cb2..a0a4034409 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -611,6 +611,10 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 									SetPGVariable("transaction_deferrable",
 												  list_make1(item->arg),
 												  true);
+								else if (strcmp(item->defname, "bulk") == 0)
+									SetPGVariable("bulk_insert",
+												  list_make1(item->arg),
+												  true);
 							}
 						}
 						break;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index bb34630e8e..170c96e749 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -54,6 +54,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/cost.h"
+#include "executor/nodeModifyTable.h"
 #include "optimizer/geqo.h"
 #include "optimizer/optimizer.h"
 #include "optimizer/paths.h"
@@ -2036,6 +2037,16 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"bulk_insert", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("Sets the transaction to bulk insert mode."),
+			gettext_noop("A ring buffer of limited size will be used."),
+		},
+		&insert_in_bulk,
+		false,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h
index 46a2dc9511..09c312a052 100644
--- a/src/include/executor/nodeModifyTable.h
+++ b/src/include/executor/nodeModifyTable.h
@@ -15,6 +15,8 @@
 
 #include "nodes/execnodes.h"
 
+extern PGDLLIMPORT bool insert_in_bulk;
+
 extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo,
 									   EState *estate, TupleTableSlot *slot,
 									   CmdType cmdtype);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index f6824bf2e1..c79d13aa44 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -14,6 +14,7 @@
 #ifndef EXECNODES_H
 #define EXECNODES_H
 
+#include "access/heapam.h"
 #include "access/tupconvert.h"
 #include "executor/instrument.h"
 #include "fmgr.h"
@@ -1176,6 +1177,8 @@ typedef struct ModifyTableState
 	List	  **mt_arowmarks;	/* per-subplan ExecAuxRowMark lists */
 	EPQState	mt_epqstate;	/* for evaluating EvalPlanQual rechecks */
 	bool		fireBSTriggers; /* do we need to fire stmt triggers? */
+	BulkInsertState	bistate;	/* state for bulk insert like INSERT SELECT */
+	ResultRelInfo	*prevResultRelInfo; /* last child inserted with bistate */
 
 	/*
 	 * Slot for storing tuples in the root partitioned table's rowtype during
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 71dcdf2889..0991da11e7 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -60,6 +60,7 @@ PG_KEYWORD("binary", BINARY, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("bit", BIT, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("boolean", BOOLEAN_P, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("both", BOTH, RESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("bulk", BULK, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("by", BY, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("cache", CACHE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("call", CALL, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index da50ee3b67..da0dae6240 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -462,6 +462,29 @@ Partitions: part_aa_bb FOR VALUES IN ('aa', 'bb'),
             part_xx_yy FOR VALUES IN ('xx', 'yy'), PARTITIONED,
             part_default DEFAULT, PARTITIONED
 
+-- bulk inserts
+truncate hash_parted;
+begin bulk;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+ a  
+----
+ 12
+ 11
+(2 rows)
+
+-- exercise bulk insert to partitions
+insert into hash_parted select generate_series(1,9999);
+select count(1) from hash_parted;
+ count 
+-------
+ 10001
+(1 row)
+
+commit;
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index 963faa1614..d3a94f053b 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -280,6 +280,19 @@ from hash_parted order by part;
 -- partitions
 \d+ list_parted
 
+-- bulk inserts
+truncate hash_parted;
+begin bulk;
+create index on hash_parted(a);
+-- make sure small inserts are flushed
+insert into hash_parted values(11);
+insert into hpart0 values(12);
+select * from hash_parted;
+-- exercise bulk insert to partitions
+insert into hash_parted select generate_series(1,9999);
+select count(1) from hash_parted;
+commit;
+
 -- cleanup
 drop table range_parted, list_parted;
 drop table hash_parted;
-- 
2.17.0

