From 47ece23cb0f818365e97f923dd2d03e898d8e2a4 Mon Sep 17 00:00:00 2001
From: Koval Dmitry <d.koval@postgrespro.ru>
Date: Thu, 26 Oct 2023 03:35:24 +0300
Subject: [PATCH v22 4/4] SPLIT PARTITION optimization

---
 src/backend/commands/tablecmds.c              | 810 +++++++++++++-----
 src/test/regress/expected/partition_split.out | 307 +++++++
 src/test/regress/sql/partition_split.sql      | 152 ++++
 3 files changed, 1040 insertions(+), 229 deletions(-)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index b0d2e4af91..52e8ad2b35 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -20654,20 +20654,47 @@ typedef struct SplitPartitionContext
 	BulkInsertState bistate;	/* state of bulk inserts for partition */
 	TupleTableSlot *dstslot;	/* slot for insert row into partition */
 	Relation	partRel;		/* relation for partition */
+	SinglePartitionSpec *sps;	/* info about single partition (from SQL
+								 * command) */
 }			SplitPartitionContext;
 
+/*
+ * Struct with context of SPLIT PARTITION operation
+ */
+typedef struct SplitInfo
+{
+	PartitionCmd *cmd;			/* SPLIT PARTITION command info */
+
+	Relation	rel;			/* partitioned table */
+	Relation	splitRel;		/* split partition */
+
+	Oid			defaultPartOid; /* identifier of DEFAULT-partition in rel (if
+								 * exists) */
+	List	   *partContexts;	/* list of structs SplitPartitionContext (each
+								 * struct for each new partition) */
+	SplitPartitionContext *defaultPartCtx;	/* pointer to DEFAULT-partition in
+											 * partContexts list (if exists) */
+	EState	   *estate;			/* working state */
+}			SplitInfo;
 
 /*
- * createSplitPartitionContext: create context for partition and fill it
+ * createSplitPartitionContext: create context for partition
  */
 static SplitPartitionContext *
-createSplitPartitionContext(Relation partRel)
+createSplitPartitionContext(SinglePartitionSpec * sps)
 {
-	SplitPartitionContext *pc;
+	SplitPartitionContext *pc = (SplitPartitionContext *) palloc0(sizeof(SplitPartitionContext));
 
-	pc = (SplitPartitionContext *) palloc0(sizeof(SplitPartitionContext));
-	pc->partRel = partRel;
+	pc->sps = sps;
+	return pc;
+}
 
+/*
+ * fillSplitPartitionContext: fill partition context
+ */
+static void
+fillSplitPartitionContext(SplitPartitionContext * pc)
+{
 	/*
 	 * Prepare a BulkInsertState for table_tuple_insert. The FSM is empty, so
 	 * don't bother using it.
@@ -20678,67 +20705,66 @@ createSplitPartitionContext(Relation partRel)
 	pc->dstslot = MakeSingleTupleTableSlot(RelationGetDescr(pc->partRel),
 										   table_slot_callbacks(pc->partRel));
 	ExecStoreAllNullTuple(pc->dstslot);
-
-	return pc;
 }
 
 /*
  * deleteSplitPartitionContext: delete context for partition
  */
 static void
-deleteSplitPartitionContext(SplitPartitionContext * pc, int ti_options)
+deleteSplitPartitionContext(SplitPartitionContext * pc)
 {
-	ExecDropSingleTupleTableSlot(pc->dstslot);
-	FreeBulkInsertState(pc->bistate);
+	if (pc->dstslot)
+		ExecDropSingleTupleTableSlot(pc->dstslot);
 
-	table_finish_bulk_insert(pc->partRel, ti_options);
+	if (pc->bistate)
+	{
+		/* The FSM is empty, so don't bother using it. */
+		int			ti_options = TABLE_INSERT_SKIP_FSM;
+
+		FreeBulkInsertState(pc->bistate);
+		table_finish_bulk_insert(pc->partRel, ti_options);
+	}
 
 	pfree(pc);
 }
 
 /*
- * moveSplitTableRows: scan split partition (splitRel) of partitioned table
- * (rel) and move rows into new partitions.
+ * createSplitInfo: create SPLIT PARTITION command context, contexts for new
+ *					partitions and generate constraints for them.
+ *					We need to use constraints for optimization.
  *
- * New partitions description:
- * partlist: list of pointers to SinglePartitionSpec structures.
- * newPartRels: list of Relation's.
+ * cmd: SPLIT PARTITION command info.
+ * rel: partitioned table.
+ * splitRel: split partition.
  * defaultPartOid: oid of DEFAULT partition, for table rel.
  */
-static void
-moveSplitTableRows(Relation rel, Relation splitRel, List *partlist, List *newPartRels, Oid defaultPartOid)
+static SplitInfo *
+createSplitInfo(PartitionCmd *cmd, Relation rel, Relation splitRel,
+				Oid defaultPartOid)
 {
-	/* The FSM is empty, so don't bother using it. */
-	int			ti_options = TABLE_INSERT_SKIP_FSM;
-	CommandId	mycid;
-	EState	   *estate;
-	ListCell   *listptr,
-			   *listptr2;
-	TupleTableSlot *srcslot;
-	ExprContext *econtext;
-	TableScanDesc scan;
-	Snapshot	snapshot;
-	MemoryContext oldCxt;
 	List	   *partContexts = NIL;
-	TupleConversionMap *tuple_map;
-	SplitPartitionContext *defaultPartCtx = NULL,
-			   *pc;
-	bool		isOldDefaultPart = false;
+	SplitInfo  *si;
+	ListCell   *listptr;
 
-	mycid = GetCurrentCommandId(true);
+	si = (SplitInfo *) palloc0(sizeof(SplitInfo));
 
-	estate = CreateExecutorState();
+	si->cmd = cmd;
+	si->rel = rel;
+	si->splitRel = splitRel;
 
-	forboth(listptr, partlist, listptr2, newPartRels)
+	si->defaultPartOid = defaultPartOid;
+	si->estate = CreateExecutorState();
+
+	/* Create context for each new partition and fill it. */
+	foreach(listptr, cmd->partlist)
 	{
 		SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
-
-		pc = createSplitPartitionContext((Relation) lfirst(listptr2));
+		SplitPartitionContext *pc = createSplitPartitionContext(sps);
 
 		if (sps->bound->is_default)
 		{
 			/* We should not create constraint for detached DEFAULT partition. */
-			defaultPartCtx = pc;
+			si->defaultPartCtx = pc;
 		}
 		else
 		{
@@ -20746,9 +20772,8 @@ moveSplitTableRows(Relation rel, Relation splitRel, List *partlist, List *newPar
 
 			/* Build expression execution states for partition check quals. */
 			partConstraint = get_qual_from_partbound(rel, sps->bound);
-			partConstraint =
-				(List *) eval_const_expressions(NULL,
-												(Node *) partConstraint);
+			partConstraint = (List *) eval_const_expressions(NULL, (Node *) partConstraint);
+
 			/* Make boolean expression for ExecCheck(). */
 			partConstraint = list_make1(make_ands_explicit(partConstraint));
 
@@ -20756,11 +20781,10 @@ moveSplitTableRows(Relation rel, Relation splitRel, List *partlist, List *newPar
 			 * Map the vars in the constraint expression from rel's attnos to
 			 * splitRel's.
 			 */
-			partConstraint = map_partition_varattnos(partConstraint,
-													 1, splitRel, rel);
+			partConstraint = map_partition_varattnos(partConstraint, 1, splitRel, rel);
 
 			pc->partqualstate =
-				ExecPrepareExpr((Expr *) linitial(partConstraint), estate);
+				ExecPrepareExpr((Expr *) linitial(partConstraint), si->estate);
 			Assert(pc->partqualstate != NULL);
 		}
 
@@ -20768,129 +20792,75 @@ moveSplitTableRows(Relation rel, Relation splitRel, List *partlist, List *newPar
 		partContexts = lappend(partContexts, pc);
 	}
 
-	/*
-	 * Create partition context for DEFAULT partition. We can insert values
-	 * into this partition in case spaces with values between new partitions.
-	 */
-	if (!defaultPartCtx && OidIsValid(defaultPartOid))
+	si->partContexts = partContexts;
+
+	return si;
+}
+
+/*
+ * deleteSplitInfo: delete SPLIT PARTITION command context
+ */
+static void
+deleteSplitInfo(SplitInfo * si)
+{
+	ListCell   *listptr;
+
+	FreeExecutorState(si->estate);
+
+	foreach(listptr, si->partContexts)
+		deleteSplitPartitionContext((SplitPartitionContext *) lfirst(listptr));
+
+	pfree(si);
+}
+
+/*
+ * checkNewPartitions: simple check of the new partitions.
+ *
+ * cmd: SPLIT PARTITION command info.
+ * splitRelOid: split partition Oid.
+ *
+ * Returns true if one of the new partitions has the same name as the split
+ * partition.
+ */
+static bool
+checkNewPartitions(PartitionCmd *cmd, Oid splitRelOid)
+{
+	Oid			namespaceId;
+	ListCell   *listptr;
+	bool		isSameName = false;
+	char		relname[NAMEDATALEN];
+
+	foreach(listptr, cmd->partlist)
 	{
-		/* Indicate that we allocate context for old DEFAULT partition */
-		isOldDefaultPart = true;
-		defaultPartCtx = createSplitPartitionContext(table_open(defaultPartOid, AccessExclusiveLock));
+		Oid			existing_relid;
+		SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
+
+		strlcpy(relname, sps->name->relname, NAMEDATALEN);
+
+		/*
+		 * Look up the namespace in which we are supposed to create the
+		 * partition, check we have permission to create there, lock it
+		 * against concurrent drop, and mark stmt->relation as
+		 * RELPERSISTENCE_TEMP if a temporary namespace is selected.
+		 */
+		namespaceId =
+			RangeVarGetAndCheckCreationNamespace(sps->name, NoLock, NULL);
+
+		/*
+		 * This would fail later on anyway, if the relation already exists.
+		 * But by catching it here we can emit a nicer error message.
+		 */
+		existing_relid = get_relname_relid(relname, namespaceId);
+		if (existing_relid == splitRelOid && !isSameName)
+			/* One new partition can have the same name as split partition. */
+			isSameName = true;
+		else if (existing_relid != InvalidOid)
+			ereport(ERROR,
+					(errcode(ERRCODE_DUPLICATE_TABLE),
+					 errmsg("relation \"%s\" already exists", relname)));
 	}
 
-	econtext = GetPerTupleExprContext(estate);
-
-	/* Create necessary tuple slot. */
-	srcslot = MakeSingleTupleTableSlot(RelationGetDescr(splitRel),
-									   table_slot_callbacks(splitRel));
-
-	/*
-	 * Map computing for moving attributes of split partition to new partition
-	 * (for first new partition but other new partitions can use the same
-	 * map).
-	 */
-	pc = (SplitPartitionContext *) lfirst(list_head(partContexts));
-	tuple_map = convert_tuples_by_name(RelationGetDescr(splitRel),
-									   RelationGetDescr(pc->partRel));
-
-	/* Scan through the rows. */
-	snapshot = RegisterSnapshot(GetLatestSnapshot());
-	scan = table_beginscan(splitRel, snapshot, 0, NULL);
-
-	/*
-	 * Switch to per-tuple memory context and reset it for each tuple
-	 * produced, so we don't leak memory.
-	 */
-	oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-
-	while (table_scan_getnextslot(scan, ForwardScanDirection, srcslot))
-	{
-		bool		found = false;
-		TupleTableSlot *insertslot;
-
-		/* Extract data from old tuple. */
-		slot_getallattrs(srcslot);
-
-		econtext->ecxt_scantuple = srcslot;
-
-		/* Search partition for current slot srcslot. */
-		foreach(listptr, partContexts)
-		{
-			pc = (SplitPartitionContext *) lfirst(listptr);
-
-			if (pc->partqualstate /* skip DEFAULT partition */ &&
-				ExecCheck(pc->partqualstate, econtext))
-			{
-				found = true;
-				break;
-			}
-			ResetExprContext(econtext);
-		}
-		if (!found)
-		{
-			/* Use DEFAULT partition if it exists. */
-			if (defaultPartCtx)
-				pc = defaultPartCtx;
-			else
-				ereport(ERROR,
-						(errcode(ERRCODE_CHECK_VIOLATION),
-						 errmsg("can not find partition for split partition row"),
-						 errtable(splitRel)));
-		}
-
-		if (tuple_map)
-		{
-			/* Need to use map for copy attributes. */
-			insertslot = execute_attr_map_slot(tuple_map->attrMap, srcslot, pc->dstslot);
-		}
-		else
-		{
-			/* Copy attributes directly. */
-			insertslot = pc->dstslot;
-
-			ExecClearTuple(insertslot);
-
-			memcpy(insertslot->tts_values, srcslot->tts_values,
-				   sizeof(Datum) * srcslot->tts_nvalid);
-			memcpy(insertslot->tts_isnull, srcslot->tts_isnull,
-				   sizeof(bool) * srcslot->tts_nvalid);
-
-			ExecStoreVirtualTuple(insertslot);
-		}
-
-		/* Write the tuple out to the new relation. */
-		table_tuple_insert(pc->partRel, insertslot, mycid, ti_options, pc->bistate);
-
-		ResetExprContext(econtext);
-
-		CHECK_FOR_INTERRUPTS();
-	}
-
-	MemoryContextSwitchTo(oldCxt);
-
-	table_endscan(scan);
-	UnregisterSnapshot(snapshot);
-
-	if (tuple_map)
-		free_conversion_map(tuple_map);
-
-	ExecDropSingleTupleTableSlot(srcslot);
-
-	FreeExecutorState(estate);
-
-	foreach(listptr, partContexts)
-		deleteSplitPartitionContext((SplitPartitionContext *) lfirst(listptr), ti_options);
-
-	/* Need to close table and free buffers for DEFAULT partition. */
-	if (isOldDefaultPart)
-	{
-		Relation defaultPartRel = defaultPartCtx->partRel;
-
-		deleteSplitPartitionContext(defaultPartCtx, ti_options);
-		/* Keep the lock until commit. */
-		table_close(defaultPartRel, NoLock);
-	}
+	return isSameName;
 }
 
 /*
@@ -20947,6 +20917,414 @@ createPartitionTable(RangeVar *newPartName, RangeVar *modelRelName,
 				   NULL);
 }
 
+/*
+ * createNewPartitions: simple check of the new partitions.
+ *
+ * si: SPLIT PARTITION command context.
+ * splitName: split partition name.
+ * pcWithAllRows: context of partition that contains all the rows of the split
+ * 				  partition or NULL if no such partition exists.
+ *
+ * Function returns name of split partition (and can change it in case of
+ * optimization with split partition renaming).
+ */
+static RangeVar *
+createNewPartitions(SplitInfo * si, RangeVar *splitName,
+					SplitPartitionContext * pcWithAllRows,
+					AlterTableUtilityContext *context)
+{
+	ListCell   *listptr;
+	Oid			splitRelOid;
+	RangeVar   *splitPartName = splitName;
+
+	splitRelOid = RelationGetRelid(si->splitRel);
+
+	foreach(listptr, si->partContexts)
+	{
+		SplitPartitionContext *pc = (SplitPartitionContext *) lfirst(listptr);
+
+		if (pc == pcWithAllRows)
+		{
+			/* Need to reuse splitRel for partition instead of creation. */
+
+			/*
+			 * We must bump the command counter to make the split partition
+			 * tuple visible for rename.
+			 */
+			CommandCounterIncrement();
+
+			/*
+			 * Rename split partition to new partition.
+			 */
+			RenameRelationInternal(splitRelOid, pc->sps->name->relname, false, false);
+			splitPartName = makeRangeVar(get_namespace_name(RelationGetNamespace(si->splitRel)),
+										 pc->sps->name->relname, -1);
+
+			/*
+			 * We must bump the command counter to make the split partition
+			 * tuple visible after rename.
+			 */
+			CommandCounterIncrement();
+
+			pc->partRel = si->splitRel;
+			/* No need to open relation : splitRel is already opened. */
+		}
+		else
+		{
+			createPartitionTable(pc->sps->name, splitPartName, context);
+
+			/* Open the new partition and acquire exclusive lock on it. */
+			pc->partRel = table_openrv(pc->sps->name, AccessExclusiveLock);
+		}
+	}
+
+	return splitPartName;
+}
+
+/*
+ * moveSplitTableRows: scan split partition (splitRel) of partitioned table
+ * (rel) and move rows into new partitions.
+ *
+ * si: SPLIT PARTITION command context.
+ */
+static void
+moveSplitTableRows(SplitInfo * si)
+{
+	/* The FSM is empty, so don't bother using it. */
+	int			ti_options = TABLE_INSERT_SKIP_FSM;
+	CommandId	mycid;
+	ListCell   *listptr;
+	TupleTableSlot *srcslot;
+	ExprContext *econtext;
+	TableScanDesc scan;
+	Snapshot	snapshot;
+	MemoryContext oldCxt;
+	TupleConversionMap *tuple_map;
+	SplitPartitionContext *pc = NULL;
+	bool		isOldDefaultPart = false;
+	SplitPartitionContext *defaultPartCtx = si->defaultPartCtx;
+
+	mycid = GetCurrentCommandId(true);
+
+	/* Prepare new partitions contexts for insert rows. */
+	foreach(listptr, si->partContexts)
+		fillSplitPartitionContext((SplitPartitionContext *) lfirst(listptr));
+
+	/*
+	 * Create partition context for DEFAULT partition. We can insert values
+	 * into this partition in case spaces with values between new partitions.
+	 */
+	if (!defaultPartCtx && OidIsValid(si->defaultPartOid))
+	{
+		/* Indicate that we allocate context for old DEFAULT partition */
+		isOldDefaultPart = true;
+		defaultPartCtx = createSplitPartitionContext(NULL);
+		defaultPartCtx->partRel = table_open(si->defaultPartOid, AccessExclusiveLock);
+		fillSplitPartitionContext(defaultPartCtx);
+	}
+
+	econtext = GetPerTupleExprContext(si->estate);
+
+	/* Create necessary tuple slot. */
+	srcslot = MakeSingleTupleTableSlot(RelationGetDescr(si->splitRel),
+									   table_slot_callbacks(si->splitRel));
+
+	/*
+	 * Map computing for moving attributes of split partition to new partition
+	 * (for first new partition but other new partitions can use the same
+	 * map).
+	 */
+	pc = (SplitPartitionContext *) lfirst(list_head(si->partContexts));
+	tuple_map = convert_tuples_by_name(RelationGetDescr(si->splitRel),
+									   RelationGetDescr(pc->partRel));
+
+	/* Scan through the rows. */
+	snapshot = RegisterSnapshot(GetLatestSnapshot());
+	scan = table_beginscan(si->splitRel, snapshot, 0, NULL);
+
+	/*
+	 * Switch to per-tuple memory context and reset it for each tuple
+	 * produced, so we don't leak memory.
+	 */
+	oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(si->estate));
+
+	while (table_scan_getnextslot(scan, ForwardScanDirection, srcslot))
+	{
+		bool		found = false;
+		TupleTableSlot *insertslot;
+
+		/* Extract data from old tuple. */
+		slot_getallattrs(srcslot);
+
+		econtext->ecxt_scantuple = srcslot;
+
+		/* Search partition for current slot srcslot. */
+		foreach(listptr, si->partContexts)
+		{
+			pc = (SplitPartitionContext *) lfirst(listptr);
+
+			if (pc->partqualstate /* skip DEFAULT partition */ &&
+				ExecCheck(pc->partqualstate, econtext))
+			{
+				found = true;
+				break;
+			}
+			ResetExprContext(econtext);
+		}
+		if (!found)
+		{
+			/* Use DEFAULT partition if it exists. */
+			if (defaultPartCtx)
+				pc = defaultPartCtx;
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_CHECK_VIOLATION),
+						 errmsg("can not find partition for split partition row"),
+						 errtable(si->splitRel)));
+		}
+
+		if (tuple_map)
+		{
+			/* Need to use map for copy attributes. */
+			insertslot = execute_attr_map_slot(tuple_map->attrMap, srcslot, pc->dstslot);
+		}
+		else
+		{
+			/* Copy attributes directly. */
+			insertslot = pc->dstslot;
+
+			ExecClearTuple(insertslot);
+
+			memcpy(insertslot->tts_values, srcslot->tts_values,
+				   sizeof(Datum) * srcslot->tts_nvalid);
+			memcpy(insertslot->tts_isnull, srcslot->tts_isnull,
+				   sizeof(bool) * srcslot->tts_nvalid);
+
+			ExecStoreVirtualTuple(insertslot);
+		}
+
+		/* Write the tuple out to the new relation. */
+		table_tuple_insert(pc->partRel, insertslot, mycid, ti_options, pc->bistate);
+
+		ResetExprContext(econtext);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	MemoryContextSwitchTo(oldCxt);
+
+	table_endscan(scan);
+	UnregisterSnapshot(snapshot);
+
+	if (tuple_map)
+		free_conversion_map(tuple_map);
+
+	ExecDropSingleTupleTableSlot(srcslot);
+
+	/* Need to close table and free buffers for DEFAULT partition. */
+	if (isOldDefaultPart)
+	{
+		Relation	defaultPartRel = defaultPartCtx->partRel;
+
+		deleteSplitPartitionContext(defaultPartCtx);
+		/* Keep the lock until commit. */
+		table_close(defaultPartRel, NoLock);
+	}
+}
+
+/*
+ * findNewPartForSlot: find partition that contains slot value.
+ *
+ * si: SPLIT PARTITION context.
+ * checkPc: partition context for check slot value (can be NULL).
+ * slot: value to check.
+ */
+static SplitPartitionContext *
+findNewPartForSlot(SplitInfo * si, SplitPartitionContext * checkPc, TupleTableSlot *slot)
+{
+	ListCell   *listptr;
+	ExprContext *econtext;
+	MemoryContext oldCxt;
+	SplitPartitionContext *result = NULL;
+
+	econtext = GetPerTupleExprContext(si->estate);
+
+	/* Make sure the tuple is fully deconstructed. */
+	slot_getallattrs(slot);
+
+	econtext->ecxt_scantuple = slot;
+
+	/*
+	 * Switch to per-tuple memory context and reset it after each check, so we
+	 * don't leak memory.
+	 */
+	oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(si->estate));
+
+	if (checkPc)
+	{
+		if (ExecCheck(checkPc->partqualstate, econtext))
+		{
+			ResetExprContext(econtext);
+			result = checkPc;
+		}
+	}
+	else
+	{
+		/* Search partition for current slot srcslot. */
+		foreach(listptr, si->partContexts)
+		{
+			SplitPartitionContext *pc = (SplitPartitionContext *) lfirst(listptr);
+
+			if (pc->partqualstate /* skip DEFAULT partition */ &&
+				ExecCheck(pc->partqualstate, econtext))
+			{
+				ResetExprContext(econtext);
+				result = pc;
+				break;
+			}
+			ResetExprContext(econtext);
+		}
+
+		/* We not found partition with borders but exists DEFAULT partition. */
+		if (!result && si->defaultPartCtx)
+			result = si->defaultPartCtx;
+
+		/*
+		 * "result" can be NULL here because can be spaces between of the new
+		 * partitions and rows from the spaces can be moved to the DEFAULT
+		 * partition of the partitioned table.
+		 */
+	}
+
+	MemoryContextSwitchTo(oldCxt);
+
+	return result;
+}
+
+/*
+ * findNewPartWithAllRows: find partition that contains all the rows of the
+ * split partition; returns partition context if partition was found.
+ *
+ * si: SPLIT PARTITION context.
+ */
+static SplitPartitionContext *
+findNewPartWithAllRows(SplitInfo * si)
+{
+	PartitionKey key = RelationGetPartitionKey(si->rel);
+	ListCell   *index;
+	int			partnatts;
+	SplitPartitionContext *result = NULL;
+	AttrMap    *map;
+	AttrNumber *partattrs;
+	int			i;
+
+	/* We can use optimization for BY RANGE partitioning only. */
+	if (key->strategy != PARTITION_STRATEGY_RANGE)
+		return NULL;
+
+	partnatts = get_partition_natts(key);
+
+	/*
+	 * Partition key contains columns of partitioned tables si->rel but index
+	 * contains columns of si->splitRel. So we need a map for convert
+	 * attributes numbers (si->rel) -> (si->splitRel).
+	 */
+	map = build_attrmap_by_name_if_req(RelationGetDescr(si->splitRel),
+									   RelationGetDescr(si->rel),
+									   false);
+	if (map)
+	{
+		/*
+		 * Columns order in a partitioned table and split partition is
+		 * different. So need to create a new array with attribute numbers.
+		 */
+		partattrs = palloc(sizeof(AttrNumber) * partnatts);
+		for (i = 0; i < partnatts; i++)
+		{
+			AttrNumber	attr_num = get_partition_col_attnum(key, i);
+
+			partattrs[i] = map->attnums[attr_num - 1];
+		}
+	}
+	else
+	{
+		/* We can use array of partition key. */
+		partattrs = key->partattrs;
+	}
+
+	/* Scan all indexes of split partition. */
+	foreach(index, RelationGetIndexList(si->splitRel))
+	{
+		Oid			thisIndexOid = lfirst_oid(index);
+		Relation	indexRel = index_open(thisIndexOid, AccessShareLock);
+
+		/*
+		 * Index should be valid, btree (for searching min/max) and contain
+		 * the same columns as partition key.
+		 */
+		if (indexRel->rd_index->indisvalid &&
+			indexRel->rd_rel->relam == BTREE_AM_OID &&
+			indexRel->rd_index->indnatts == partnatts)
+		{
+			for (i = 0; i < indexRel->rd_index->indnatts; i++)
+			{
+				if (indexRel->rd_index->indkey.values[i] != partattrs[i])
+					break;
+			}
+
+			/* Index found? */
+			if (i == indexRel->rd_index->indnatts)
+			{
+				IndexScanDesc indexScan;
+				TupleTableSlot *slot;
+
+				indexScan = index_beginscan(si->splitRel, indexRel, SnapshotAny, 0, 0);
+				do
+				{
+					SplitPartitionContext *pc;
+
+					/* Search a minimum index value. */
+					index_rescan(indexScan, NULL, 0, NULL, 0);
+					slot = table_slot_create(si->splitRel, NULL);
+					if (!index_getnext_slot(indexScan, ForwardScanDirection, slot))
+					{
+						ExecDropSingleTupleTableSlot(slot);
+						break;
+					}
+					/* Find partition context for minimum index value. */
+					pc = findNewPartForSlot(si, NULL, slot);
+					ExecDropSingleTupleTableSlot(slot);
+
+					/* Search a maximum index value. */
+					index_rescan(indexScan, NULL, 0, NULL, 0);
+					slot = table_slot_create(si->splitRel, NULL);
+					if (!index_getnext_slot(indexScan, BackwardScanDirection, slot))
+					{
+						ExecDropSingleTupleTableSlot(slot);
+						break;
+					}
+					/* Check partition context "pc" for maximum index value. */
+					result = findNewPartForSlot(si, pc, slot);
+					ExecDropSingleTupleTableSlot(slot);
+				} while (0);
+
+				index_endscan(indexScan);
+				index_close(indexRel, AccessShareLock);
+				goto done;
+			}
+		}
+		index_close(indexRel, AccessShareLock);
+	}
+
+done:
+	if (map)
+	{
+		pfree(partattrs);
+		free_attrmap(map);
+	}
+	return result;
+}
+
 /*
  * ALTER TABLE <name> SPLIT PARTITION <partition-name> INTO <partition-list>
  */
@@ -20956,16 +21334,14 @@ ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, Relation rel,
 {
 	Relation	splitRel;
 	Oid			splitRelOid;
-	char		relname[NAMEDATALEN];
-	Oid			namespaceId;
-	ListCell   *listptr,
-			   *listptr2;
+	ListCell   *listptr;
 	bool		isSameName = false;
 	char		tmpRelName[NAMEDATALEN];
-	List	   *newPartRels = NIL;
 	ObjectAddress object;
 	RangeVar   *splitPartName = cmd->name;
 	Oid			defaultPartOid;
+	SplitPartitionContext *pcWithAllRows;
+	SplitInfo  *si;
 
 	defaultPartOid = get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, true));
 
@@ -20984,35 +21360,7 @@ ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	splitRelOid = RelationGetRelid(splitRel);
 
 	/* Check descriptions of new partitions. */
-	foreach(listptr, cmd->partlist)
-	{
-		Oid			existing_relid;
-		SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
-
-		strlcpy(relname, sps->name->relname, NAMEDATALEN);
-
-		/*
-		 * Look up the namespace in which we are supposed to create the
-		 * partition, check we have permission to create there, lock it
-		 * against concurrent drop, and mark stmt->relation as
-		 * RELPERSISTENCE_TEMP if a temporary namespace is selected.
-		 */
-		namespaceId =
-			RangeVarGetAndCheckCreationNamespace(sps->name, NoLock, NULL);
-
-		/*
-		 * This would fail later on anyway, if the relation already exists.
-		 * But by catching it here we can emit a nicer error message.
-		 */
-		existing_relid = get_relname_relid(relname, namespaceId);
-		if (existing_relid == splitRelOid && !isSameName)
-			/* One new partition can have the same name as split partition. */
-			isSameName = true;
-		else if (existing_relid != InvalidOid)
-			ereport(ERROR,
-					(errcode(ERRCODE_DUPLICATE_TABLE),
-					 errmsg("relation \"%s\" already exists", relname)));
-	}
+	isSameName = checkNewPartitions(cmd, splitRelOid);
 
 	/* Detach split partition. */
 	RemoveInheritance(splitRel, rel, false);
@@ -21033,8 +21381,7 @@ ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, Relation rel,
 		/* Rename partition. */
 		sprintf(tmpRelName, "split-%u-%X-tmp", RelationGetRelid(rel), MyProcPid);
 		RenameRelationInternal(splitRelOid, tmpRelName, false, false);
-		splitPartName = makeRangeVar(
-									 get_namespace_name(RelationGetNamespace(splitRel)),
+		splitPartName = makeRangeVar(get_namespace_name(RelationGetNamespace(splitRel)),
 									 tmpRelName, -1);
 
 		/*
@@ -21044,43 +21391,48 @@ ATExecSplitPartition(List **wqueue, AlteredTableInfo *tab, Relation rel,
 		CommandCounterIncrement();
 	}
 
+	/* Create SPLIT PARTITION context. */
+	si = createSplitInfo(cmd, rel, splitRel, defaultPartOid);
+
+	/*
+	 * Optimization: if exist a new partition that contains all the rows of
+	 * the split partition then do not copy rows, rename the split partition.
+	 */
+	pcWithAllRows = findNewPartWithAllRows(si);
+
 	/* Create new partitions (like split partition), without indexes. */
-	foreach(listptr, cmd->partlist)
+	splitPartName = createNewPartitions(si, splitPartName, pcWithAllRows, context);
+
+	if (!pcWithAllRows)
 	{
-		SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
-		Relation	newPartRel;
-
-		createPartitionTable(sps->name, splitPartName, context);
-
-		/* Open the new partition and acquire exclusive lock on it. */
-		newPartRel = table_openrv(sps->name, AccessExclusiveLock);
-
-		newPartRels = lappend(newPartRels, newPartRel);
+		/* Copy data from split partition to new partitions. */
+		moveSplitTableRows(si);
+		/* Keep the lock until commit. */
+		table_close(splitRel, NoLock);
 	}
 
-	/* Copy data from split partition to new partitions. */
-	moveSplitTableRows(rel, splitRel, cmd->partlist, newPartRels, defaultPartOid);
-	/* Keep the lock until commit. */
-	table_close(splitRel, NoLock);
-
 	/* Attach new partitions to partitioned table. */
-	forboth(listptr, cmd->partlist, listptr2, newPartRels)
+	foreach(listptr, si->partContexts)
 	{
-		SinglePartitionSpec *sps = (SinglePartitionSpec *) lfirst(listptr);
-		Relation	newPartRel = (Relation) lfirst(listptr2);
+		SplitPartitionContext *pc = (SplitPartitionContext *) lfirst(listptr);
 
 		/* wqueue = NULL: verification for each cloned constraint is not need. */
-		attachPartitionTable(NULL, rel, newPartRel, sps->bound);
+		attachPartitionTable(NULL, rel, pc->partRel, pc->sps->bound);
 		/* Keep the lock until commit. */
-		table_close(newPartRel, NoLock);
+		table_close(pc->partRel, NoLock);
 	}
 
-	/* Drop split partition. */
-	object.classId = RelationRelationId;
-	object.objectId = splitRelOid;
-	object.objectSubId = 0;
-	/* Probably DROP_CASCADE is not needed. */
-	performDeletion(&object, DROP_RESTRICT, 0);
+	if (!pcWithAllRows)
+	{
+		/* Drop split partition. */
+		object.classId = RelationRelationId;
+		object.objectId = splitRelOid;
+		object.objectSubId = 0;
+		/* Probably DROP_CASCADE is not needed. */
+		performDeletion(&object, DROP_RESTRICT, 0);
+	}
+
+	deleteSplitInfo(si);
 }
 
 /*
@@ -21226,8 +21578,8 @@ ATExecMergePartitions(List **wqueue, AlteredTableInfo *tab, Relation rel,
 							RelationGetRelationName(pc->partRel))));
 
 		/*
-		 * Checking that two partitions have the same name was before,
-		 * in function transformPartitionCmdForMerge().
+		 * Checking that two partitions have the same name was before, in
+		 * function transformPartitionCmdForMerge().
 		 */
 		if (equal(name, cmd->name))
 			/* One new partition can have the same name as merged partition. */
diff --git a/src/test/regress/expected/partition_split.out b/src/test/regress/expected/partition_split.out
index 08bf021796..ece707fdc6 100644
--- a/src/test/regress/expected/partition_split.out
+++ b/src/test/regress/expected/partition_split.out
@@ -1414,4 +1414,311 @@ SELECT * FROM sales_others;
 
 DROP TABLE sales_range;
 --
+--
+-- Tests for SPLIT optimization (BY RANGE partitioning): if one of the new
+-- partitions contains all the rows of the split partition, then we can rename
+-- the split partition instead of creating a new partition and moving the rows.
+--
+-- 1. Optimization should be used.
+--
+CREATE TABLE test(name text, i int) PARTITION BY RANGE (i);
+CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (1) TO (10);
+CREATE TABLE test_def PARTITION OF test DEFAULT;
+CREATE INDEX idx_test_i ON test(i);
+INSERT INTO test(name, i) VALUES
+  ('a1', 1), ('a5', 5), ('a15', 15), ('a12', 12), ('a17', 17), ('a11', 11);
+-- should be rows 15, 12, 17, 11:
+SELECT i FROM test_def;
+ i  
+----
+ 15
+ 12
+ 17
+ 11
+(4 rows)
+
+SELECT 'test_def'::regclass::oid AS prev_oid \gset
+ALTER TABLE test SPLIT PARTITION test_def INTO 
+  (PARTITION test_def DEFAULT, PARTITION test_2 FOR VALUES FROM (11) TO (19));
+-- should be 't' (table "test_2" after SPLIT should be the same as table
+-- "test_def" before SPLIT):
+SELECT 'test_2'::regclass::oid=:prev_oid;
+ ?column? 
+----------
+ t
+(1 row)
+
+-- should be rows 15, 12, 17, 11:
+SELECT i FROM test_2;
+ i  
+----
+ 15
+ 12
+ 17
+ 11
+(4 rows)
+
+-- should be 0 rows:
+SELECT i FROM test_def;
+ i 
+---
+(0 rows)
+
+-- should be 6 rows:
+SELECT * FROM test;
+ name | i  
+------+----
+ a1   |  1
+ a5   |  5
+ a15  | 15
+ a12  | 12
+ a17  | 17
+ a11  | 11
+(6 rows)
+
+DROP TABLE test CASCADE;
+--
+-- 2. Optimization cannot be used because not exists btree-index on the
+-- partition key (it is used to check the placement of rows in the partitions).
+--
+CREATE TABLE test(name text, i int) PARTITION BY RANGE (i);
+CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (1) TO (10);
+CREATE TABLE test_def PARTITION OF test DEFAULT;
+INSERT INTO test(name, i) VALUES
+  ('a1', 1), ('a5', 5), ('a15', 15), ('a12', 12), ('a17', 17), ('a11', 11);
+SELECT 'test_def'::regclass::oid AS prev_oid \gset
+ALTER TABLE test SPLIT PARTITION test_def INTO 
+  (PARTITION test_def DEFAULT, PARTITION test_2 FOR VALUES FROM (11) TO (19));
+-- should be 'f' (tables "test_2" and "test_def" should be different):
+SELECT 'test_2'::regclass::oid=:prev_oid;
+ ?column? 
+----------
+ f
+(1 row)
+
+-- should be rows 15, 12, 17, 11:
+SELECT i FROM test_2;
+ i  
+----
+ 15
+ 12
+ 17
+ 11
+(4 rows)
+
+-- should be 6 rows:
+SELECT * FROM test;
+ name | i  
+------+----
+ a1   |  1
+ a5   |  5
+ a15  | 15
+ a12  | 12
+ a17  | 17
+ a11  | 11
+(6 rows)
+
+DROP TABLE test CASCADE;
+--
+-- 3. Optimization cannot be used because rows should be moved into different
+-- partitions.
+--
+CREATE TABLE test(name text, i int) PARTITION BY RANGE (i);
+CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (1) TO (10);
+CREATE TABLE test_def PARTITION OF test DEFAULT;
+CREATE INDEX idx_test_i ON test(i);
+INSERT INTO test(name, i) VALUES
+  ('a1', 1), ('a5', 5), ('a15', 15), ('a12', 12), ('a27', 27), ('a11', 11);
+SELECT 'test_def'::regclass::oid AS prev_oid \gset
+ALTER TABLE test SPLIT PARTITION test_def INTO 
+  (PARTITION test_def DEFAULT, PARTITION test_2 FOR VALUES FROM (11) TO (19));
+-- should be 'f' (tables "test_2" and "test_def" should be different):
+SELECT 'test_2'::regclass::oid=:prev_oid;
+ ?column? 
+----------
+ f
+(1 row)
+
+-- should be rows 15, 12, 11:
+SELECT i FROM test_2;
+ i  
+----
+ 15
+ 12
+ 11
+(3 rows)
+
+-- should be 6 rows:
+SELECT * FROM test;
+ name | i  
+------+----
+ a1   |  1
+ a5   |  5
+ a15  | 15
+ a12  | 12
+ a11  | 11
+ a27  | 27
+(6 rows)
+
+DROP TABLE test CASCADE;
+--
+-- 4. Optimization should be used, DEFAUT partition renames to DEFAULT
+-- partition.
+--
+CREATE TABLE test(name text, i int) PARTITION BY RANGE (i);
+CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (1) TO (10);
+CREATE TABLE test_def PARTITION OF test DEFAULT;
+CREATE INDEX idx_test_i ON test(i);
+INSERT INTO test(name, i) VALUES
+  ('a1', 1), ('a5', 5), ('a25', 25), ('a22', 22), ('a27', 27), ('a21', 21);
+-- should be rows 25, 22, 27, 21:
+SELECT i FROM test_def;
+ i  
+----
+ 25
+ 22
+ 27
+ 21
+(4 rows)
+
+SELECT 'test_def'::regclass::oid AS prev_oid \gset
+ALTER TABLE test SPLIT PARTITION test_def INTO 
+  (PARTITION test_def DEFAULT, PARTITION test_2 FOR VALUES FROM (11) TO (19));
+-- should be 't' (table "test_def" after SPLIT should be the same as table
+-- "test_def" before SPLIT):
+SELECT 'test_def'::regclass::oid=:prev_oid;
+ ?column? 
+----------
+ t
+(1 row)
+
+-- should be 0 rows:
+SELECT i FROM test_2;
+ i 
+---
+(0 rows)
+
+-- should be rows 25, 22, 27, 21:
+SELECT i FROM test_def;
+ i  
+----
+ 25
+ 22
+ 27
+ 21
+(4 rows)
+
+-- should be 6 rows:
+SELECT * FROM test;
+ name | i  
+------+----
+ a1   |  1
+ a5   |  5
+ a25  | 25
+ a22  | 22
+ a27  | 27
+ a21  | 21
+(6 rows)
+
+DROP TABLE test CASCADE;
+--
+-- 5. Optimization should be used, 2-column partition key + different columns
+-- order in partitions.
+--
+CREATE TABLE test_2colkey(s smallint, b bigint, t text) PARTITION BY RANGE (b, s);
+CREATE TABLE test_2colkey_1 PARTITION OF test_2colkey FOR VALUES FROM (1000000001, 1) TO (1000000100, 100);
+CREATE TABLE test_2colkey_def(i int, b bigint, s smallint, t text);
+ALTER TABLE test_2colkey_def DROP COLUMN i;
+ALTER TABLE test_2colkey ATTACH PARTITION test_2colkey_def DEFAULT;
+CREATE INDEX idx_test_2colkey_s_b ON test_2colkey(b, s);
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000010, 3, 'value_10_3');
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000120, 4, 'value_120_4');
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000003, 5, 'value_3_5');
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000124, 2, 'value_124_2');
+-- should be 4 rows:
+SELECT b, s FROM test_2colkey;
+     b      | s 
+------------+---
+ 1000000010 | 3
+ 1000000003 | 5
+ 1000000120 | 4
+ 1000000124 | 2
+(4 rows)
+
+-- should be 2 rows:
+SELECT b, s FROM test_2colkey_def;
+     b      | s 
+------------+---
+ 1000000120 | 4
+ 1000000124 | 2
+(2 rows)
+
+SELECT 'test_2colkey_def'::regclass::oid AS prev_oid \gset
+ALTER TABLE test_2colkey SPLIT PARTITION test_2colkey_def INTO
+  (PARTITION test_2colkey_2 FOR VALUES FROM (1000000101, 1) TO (1000000200, 100),
+   PARTITION test_2colkey_def DEFAULT);
+-- should be 't' (table "test_2colkey_2" after SPLIT should be the same as table
+-- "test_2colkey_def" before SPLIT):
+SELECT 'test_2colkey_2'::regclass::oid=:prev_oid;
+ ?column? 
+----------
+ t
+(1 row)
+
+-- should be 2 rows:
+SELECT b, s FROM test_2colkey_2;
+     b      | s 
+------------+---
+ 1000000120 | 4
+ 1000000124 | 2
+(2 rows)
+
+-- should be 0 rows:
+SELECT b, s FROM test_2colkey_def;
+ b | s 
+---+---
+(0 rows)
+
+-- should be 6 rows:
+SELECT b, s FROM test_2colkey;
+     b      | s 
+------------+---
+ 1000000010 | 3
+ 1000000003 | 5
+ 1000000120 | 4
+ 1000000124 | 2
+(4 rows)
+
+--
+-- 5.1. Optimization cannot be used.
+--
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000200, 1, 'value_200_1');
+SELECT 'test_2colkey_2'::regclass::oid AS prev_oid \gset
+ALTER TABLE test_2colkey SPLIT PARTITION test_2colkey_2 INTO
+  (PARTITION test_2colkey_2 FOR VALUES FROM (1000000101, 1) TO (1000000150, 100),
+   PARTITION test_2colkey_3 FOR VALUES FROM (1000000151, 1) TO (1000000200, 100));
+-- should be 'f' (optimization is not used):
+SELECT 'test_2colkey_2'::regclass::oid=:prev_oid;
+ ?column? 
+----------
+ f
+(1 row)
+
+-- should be 2 rows:
+SELECT b, s FROM test_2colkey_2;
+     b      | s 
+------------+---
+ 1000000120 | 4
+ 1000000124 | 2
+(2 rows)
+
+-- should be 1 row:
+SELECT b, s FROM test_2colkey_3;
+     b      | s 
+------------+---
+ 1000000200 | 1
+(1 row)
+
+DROP TABLE test_2colkey CASCADE;
+--
 DROP SCHEMA partition_split_schema;
diff --git a/src/test/regress/sql/partition_split.sql b/src/test/regress/sql/partition_split.sql
index 58e17f33e8..627977ef30 100644
--- a/src/test/regress/sql/partition_split.sql
+++ b/src/test/regress/sql/partition_split.sql
@@ -829,5 +829,157 @@ SELECT * FROM sales_others;
 
 DROP TABLE sales_range;
 
+--
+--
+-- Tests for SPLIT optimization (BY RANGE partitioning): if one of the new
+-- partitions contains all the rows of the split partition, then we can rename
+-- the split partition instead of creating a new partition and moving the rows.
+--
+-- 1. Optimization should be used.
+--
+CREATE TABLE test(name text, i int) PARTITION BY RANGE (i);
+CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (1) TO (10);
+CREATE TABLE test_def PARTITION OF test DEFAULT;
+CREATE INDEX idx_test_i ON test(i);
+INSERT INTO test(name, i) VALUES
+  ('a1', 1), ('a5', 5), ('a15', 15), ('a12', 12), ('a17', 17), ('a11', 11);
+-- should be rows 15, 12, 17, 11:
+SELECT i FROM test_def;
+SELECT 'test_def'::regclass::oid AS prev_oid \gset
+ALTER TABLE test SPLIT PARTITION test_def INTO 
+  (PARTITION test_def DEFAULT, PARTITION test_2 FOR VALUES FROM (11) TO (19));
+-- should be 't' (table "test_2" after SPLIT should be the same as table
+-- "test_def" before SPLIT):
+SELECT 'test_2'::regclass::oid=:prev_oid;
+-- should be rows 15, 12, 17, 11:
+SELECT i FROM test_2;
+-- should be 0 rows:
+SELECT i FROM test_def;
+-- should be 6 rows:
+SELECT * FROM test;
+DROP TABLE test CASCADE;
+--
+-- 2. Optimization cannot be used because not exists btree-index on the
+-- partition key (it is used to check the placement of rows in the partitions).
+--
+CREATE TABLE test(name text, i int) PARTITION BY RANGE (i);
+CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (1) TO (10);
+CREATE TABLE test_def PARTITION OF test DEFAULT;
+INSERT INTO test(name, i) VALUES
+  ('a1', 1), ('a5', 5), ('a15', 15), ('a12', 12), ('a17', 17), ('a11', 11);
+SELECT 'test_def'::regclass::oid AS prev_oid \gset
+ALTER TABLE test SPLIT PARTITION test_def INTO 
+  (PARTITION test_def DEFAULT, PARTITION test_2 FOR VALUES FROM (11) TO (19));
+-- should be 'f' (tables "test_2" and "test_def" should be different):
+SELECT 'test_2'::regclass::oid=:prev_oid;
+-- should be rows 15, 12, 17, 11:
+SELECT i FROM test_2;
+-- should be 6 rows:
+SELECT * FROM test;
+DROP TABLE test CASCADE;
+--
+-- 3. Optimization cannot be used because rows should be moved into different
+-- partitions.
+--
+CREATE TABLE test(name text, i int) PARTITION BY RANGE (i);
+CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (1) TO (10);
+CREATE TABLE test_def PARTITION OF test DEFAULT;
+CREATE INDEX idx_test_i ON test(i);
+INSERT INTO test(name, i) VALUES
+  ('a1', 1), ('a5', 5), ('a15', 15), ('a12', 12), ('a27', 27), ('a11', 11);
+SELECT 'test_def'::regclass::oid AS prev_oid \gset
+ALTER TABLE test SPLIT PARTITION test_def INTO 
+  (PARTITION test_def DEFAULT, PARTITION test_2 FOR VALUES FROM (11) TO (19));
+-- should be 'f' (tables "test_2" and "test_def" should be different):
+SELECT 'test_2'::regclass::oid=:prev_oid;
+-- should be rows 15, 12, 11:
+SELECT i FROM test_2;
+-- should be 6 rows:
+SELECT * FROM test;
+DROP TABLE test CASCADE;
+--
+-- 4. Optimization should be used, DEFAUT partition renames to DEFAULT
+-- partition.
+--
+CREATE TABLE test(name text, i int) PARTITION BY RANGE (i);
+CREATE TABLE test_1 PARTITION OF test FOR VALUES FROM (1) TO (10);
+CREATE TABLE test_def PARTITION OF test DEFAULT;
+CREATE INDEX idx_test_i ON test(i);
+INSERT INTO test(name, i) VALUES
+  ('a1', 1), ('a5', 5), ('a25', 25), ('a22', 22), ('a27', 27), ('a21', 21);
+-- should be rows 25, 22, 27, 21:
+SELECT i FROM test_def;
+SELECT 'test_def'::regclass::oid AS prev_oid \gset
+ALTER TABLE test SPLIT PARTITION test_def INTO 
+  (PARTITION test_def DEFAULT, PARTITION test_2 FOR VALUES FROM (11) TO (19));
+-- should be 't' (table "test_def" after SPLIT should be the same as table
+-- "test_def" before SPLIT):
+SELECT 'test_def'::regclass::oid=:prev_oid;
+-- should be 0 rows:
+SELECT i FROM test_2;
+-- should be rows 25, 22, 27, 21:
+SELECT i FROM test_def;
+-- should be 6 rows:
+SELECT * FROM test;
+DROP TABLE test CASCADE;
+--
+-- 5. Optimization should be used, 2-column partition key + different columns
+-- order in partitions.
+--
+CREATE TABLE test_2colkey(s smallint, b bigint, t text) PARTITION BY RANGE (b, s);
+CREATE TABLE test_2colkey_1 PARTITION OF test_2colkey FOR VALUES FROM (1000000001, 1) TO (1000000100, 100);
+CREATE TABLE test_2colkey_def(i int, b bigint, s smallint, t text);
+ALTER TABLE test_2colkey_def DROP COLUMN i;
+ALTER TABLE test_2colkey ATTACH PARTITION test_2colkey_def DEFAULT;
+CREATE INDEX idx_test_2colkey_s_b ON test_2colkey(b, s);
+
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000010, 3, 'value_10_3');
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000120, 4, 'value_120_4');
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000003, 5, 'value_3_5');
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000124, 2, 'value_124_2');
+
+-- should be 4 rows:
+SELECT b, s FROM test_2colkey;
+-- should be 2 rows:
+SELECT b, s FROM test_2colkey_def;
+
+SELECT 'test_2colkey_def'::regclass::oid AS prev_oid \gset
+
+ALTER TABLE test_2colkey SPLIT PARTITION test_2colkey_def INTO
+  (PARTITION test_2colkey_2 FOR VALUES FROM (1000000101, 1) TO (1000000200, 100),
+   PARTITION test_2colkey_def DEFAULT);
+
+-- should be 't' (table "test_2colkey_2" after SPLIT should be the same as table
+-- "test_2colkey_def" before SPLIT):
+SELECT 'test_2colkey_2'::regclass::oid=:prev_oid;
+
+-- should be 2 rows:
+SELECT b, s FROM test_2colkey_2;
+-- should be 0 rows:
+SELECT b, s FROM test_2colkey_def;
+-- should be 6 rows:
+SELECT b, s FROM test_2colkey;
+
+--
+-- 5.1. Optimization cannot be used.
+--
+INSERT INTO test_2colkey (b, s, t) VALUES (1000000200, 1, 'value_200_1');
+
+SELECT 'test_2colkey_2'::regclass::oid AS prev_oid \gset
+
+ALTER TABLE test_2colkey SPLIT PARTITION test_2colkey_2 INTO
+  (PARTITION test_2colkey_2 FOR VALUES FROM (1000000101, 1) TO (1000000150, 100),
+   PARTITION test_2colkey_3 FOR VALUES FROM (1000000151, 1) TO (1000000200, 100));
+
+-- should be 'f' (optimization is not used):
+SELECT 'test_2colkey_2'::regclass::oid=:prev_oid;
+
+-- should be 2 rows:
+SELECT b, s FROM test_2colkey_2;
+-- should be 1 row:
+SELECT b, s FROM test_2colkey_3;
+
+DROP TABLE test_2colkey CASCADE;
+
 --
 DROP SCHEMA partition_split_schema;
-- 
2.39.2

