From d7034ae8a510315af8ec5d077082861b18866c28 Mon Sep 17 00:00:00 2001
From: amit <amitlangote09@gmail.com>
Date: Wed, 27 Jul 2016 15:47:39 +0900
Subject: [PATCH 6/7] Tuple routing for partitioned tables.

Both COPY FROM and INSERT are covered by this commit.  Routing to foreing
partitions is not supported at the moment.

To implement tuple-routing, introduce a PartitionDispatch data structure.
Each partitioned table in a partition tree gets one and contains info
such as a pointer to its partition descriptor, partition key execution
state, global sequence numbers of its leaf partitions and a way to link
to the PartitionDispatch objects of any of its partitions that are
partitioned themselves. Starting with the PartitionDispatch object of the
root partitioned table and a tuple to route, one can get the global
sequence number of the leaf partition that the tuple gets routed to,
if one exists.
---
 src/backend/catalog/partition.c        |  342 +++++++++++++++++++++++++++++++-
 src/backend/commands/copy.c            |  151 ++++++++++++++-
 src/backend/commands/tablecmds.c       |    1 +
 src/backend/executor/execMain.c        |   58 ++++++-
 src/backend/executor/nodeModifyTable.c |  130 ++++++++++++
 src/backend/parser/analyze.c           |    8 +
 src/include/catalog/partition.h        |   11 +
 src/include/executor/executor.h        |    6 +
 src/include/nodes/execnodes.h          |    8 +
 src/test/regress/expected/insert.out   |   52 +++++
 src/test/regress/sql/insert.sql        |   25 +++
 11 files changed, 786 insertions(+), 6 deletions(-)

diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index 197fcef..710829a 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -113,6 +113,28 @@ typedef struct PartitionRangeBound
 	bool	lower;		/* this is the lower (vs upper) bound */
 } PartitionRangeBound;
 
+/*-----------------------
+ * PartitionDispatch - information about one partitioned table in a partition
+ * hiearchy required to route a tuple to one of its partitions
+ *
+ *	relid		OID of the table
+ *	key			Partition key information of the table
+ *	keystate	Execution state required for expressions in the partition key
+ *	partdesc	Partition descriptor of the table
+ *	indexes		Array with partdesc->nparts members (for details on what
+ *				individual members represent, see how they are set in
+ *				RelationGetPartitionDispatchInfo())
+ *-----------------------
+ */
+typedef struct PartitionDispatchData
+{
+	Oid						relid;
+	PartitionKey			key;
+	List				   *keystate;	/* list of ExprState */
+	PartitionDesc			partdesc;
+	int					   *indexes;
+} PartitionDispatchData;
+
 static int32 qsort_partition_list_value_cmp(const void *a, const void *b, void *arg);
 static int32 qsort_partition_rbound_cmp(const void *a, const void *b, void *arg);
 
@@ -126,12 +148,22 @@ static PartitionRangeBound *make_one_range_bound(PartitionKey key, int index, Li
 static int32 partition_rbound_cmp(PartitionKey key,
 					 Datum *datums1, RangeDatumContent *content1, bool lower1,
 					 PartitionRangeBound *b2);
+static int32 partition_rbound_datum_cmp(PartitionKey key,
+						   Datum *rb_datums, RangeDatumContent *rb_content,
+						   Datum *tuple_datums);
 
 static int32 partition_bound_cmp(PartitionKey key, PartitionBoundInfo boundinfo,
 					int offset, void *probe, bool probe_is_bound);
 static int partition_bound_bsearch(PartitionKey key, PartitionBoundInfo boundinfo,
 						void *probe, bool probe_is_bound, bool *is_equal);
 
+/* Support get_partition_for_tuple() */
+static void FormPartitionKeyDatum(PartitionDispatch pd,
+							TupleTableSlot *slot,
+							EState *estate,
+							Datum *values,
+							bool *isnull);
+
 /*
  * RelationBuildPartitionDesc
  *		Form rel's partition descriptor
@@ -895,6 +927,115 @@ RelationGetPartitionQual(Relation rel, bool recurse)
 	return generate_partition_qual(rel, recurse);
 }
 
+/* Turn an array of OIDs with N elements into a list */
+#define OID_ARRAY_TO_LIST(arr, N, list) \
+	do\
+	{\
+		int		i;\
+		for (i = 0; i < (N); i++)\
+			(list) = lappend_oid((list), (arr)[i]);\
+	} while(0)
+
+/*
+ * RelationGetPartitionDispatchInfo
+ *		Returns information necessary to route tuples down a partition tree
+ *
+ * All the partitions will be locked with lockmode, unless it is NoLock.
+ * A list of the OIDs of all the leaf partition of rel is returned in
+ * *leaf_part_oids.
+ */
+PartitionDispatch *
+RelationGetPartitionDispatchInfo(Relation rel, int lockmode,
+								 List **leaf_part_oids)
+{
+	PartitionDesc	rootpartdesc = RelationGetPartitionDesc(rel);
+	PartitionDispatchData **pd;
+	List	   *all_parts = NIL,
+			   *parted_rels = NIL;
+	ListCell   *lc;
+	int			i,
+				k,
+				num_parted;
+
+	/*
+	 * Lock partitions and collect OIDs of the partitioned ones to prepare
+	 * their PartitionDispatch objects.
+	 *
+	 * Cannot use find_all_inheritors() here, because then the order of OIDs
+	 * in parted_rels list would be unknown, which does not help because down
+	 * below, we assign indexes within individual PartitionDispatch in an
+	 * order that's predetermined (determined by the order of OIDs in
+	 * individual partition descriptors).
+	 */
+	parted_rels = lappend_oid(parted_rels, RelationGetRelid(rel));
+	num_parted = 1;
+	OID_ARRAY_TO_LIST(rootpartdesc->oids, rootpartdesc->nparts, all_parts);
+	foreach(lc, all_parts)
+	{
+		Relation		partrel = heap_open(lfirst_oid(lc), lockmode);
+		PartitionDesc	partdesc = RelationGetPartitionDesc(partrel);
+
+		/*
+		 * If this partition is a partitined table, add its children to to the
+		 * end of the list, so that they are processed as well.
+		 */
+		if (partdesc)
+		{
+			num_parted++;
+			parted_rels = lappend_oid(parted_rels, lfirst_oid(lc));
+			OID_ARRAY_TO_LIST(partdesc->oids, partdesc->nparts, all_parts);
+		}
+
+		heap_close(partrel, NoLock);
+	}
+
+	/* Generate PartitionDispatch objects for all partitioned tables */
+	pd = (PartitionDispatchData **) palloc(num_parted *
+										sizeof(PartitionDispatchData *));
+	*leaf_part_oids = NIL;
+	i = k = 0;
+	foreach(lc, parted_rels)
+	{
+		/* We locked all partitions above */
+		Relation	partrel = heap_open(lfirst_oid(lc), NoLock);
+		PartitionDesc partdesc = RelationGetPartitionDesc(partrel);
+		int			j,
+					m;
+
+		pd[i] = (PartitionDispatch) palloc(sizeof(PartitionDispatchData));
+		pd[i]->relid = RelationGetRelid(partrel);
+		pd[i]->key = RelationGetPartitionKey(partrel);
+		pd[i]->keystate = NIL;
+		pd[i]->partdesc = partdesc;
+		pd[i]->indexes = (int *) palloc(partdesc->nparts * sizeof(int));
+		heap_close(partrel, NoLock);
+
+		m = 0;
+		for (j = 0; j < partdesc->nparts; j++)
+		{
+			Oid		partrelid = partdesc->oids[j];
+
+			if (get_rel_relkind(partrelid) != RELKIND_PARTITIONED_TABLE)
+			{
+				*leaf_part_oids = lappend_oid(*leaf_part_oids, partrelid);
+				pd[i]->indexes[j] = k++;
+			}
+			else
+			{
+				/*
+				 * We can assign indexes this way because of the way
+				 * parted_rels has been generated.
+				 */
+				pd[i]->indexes[j] = -(i + 1 + m);
+				m++;
+			}
+		}
+		i++;
+	}
+
+	return pd;
+}
+
 /* Module-local functions */
 
 /*
@@ -1329,6 +1470,172 @@ generate_partition_qual(Relation rel, bool recurse)
 	return result;
 }
 
+/* ----------------
+ *		FormPartitionKeyDatum
+ *			Construct values[] and isnull[] arrays for the partition key
+ *			of a tuple.
+ *
+ *	pkinfo			partition key execution info
+ *	slot			Heap tuple from which to extract partition key
+ *	estate			executor state for evaluating any partition key
+ *					expressions (must be non-NULL)
+ *	values			Array of partition key Datums (output area)
+ *	isnull			Array of is-null indicators (output area)
+ *
+ * the ecxt_scantuple slot of estate's per-tuple expr context must point to
+ * the heap tuple passed in.
+ * ----------------
+ */
+static void
+FormPartitionKeyDatum(PartitionDispatch pd,
+					  TupleTableSlot *slot,
+					  EState *estate,
+					  Datum *values,
+					  bool *isnull)
+{
+	ListCell   *partexpr_item;
+	int			i;
+
+	if (pd->key->partexprs != NIL && pd->keystate == NIL)
+	{
+		/* Check caller has set up context correctly */
+		Assert(estate != NULL &&
+			   GetPerTupleExprContext(estate)->ecxt_scantuple == slot);
+
+		/* First time through, set up expression evaluation state */
+		pd->keystate = (List *) ExecPrepareExpr((Expr *) pd->key->partexprs,
+												estate);
+	}
+
+	partexpr_item = list_head(pd->keystate);
+	for (i = 0; i < pd->key->partnatts; i++)
+	{
+		AttrNumber	keycol = pd->key->partattrs[i];
+		Datum		datum;
+		bool		isNull;
+
+		if (keycol != 0)
+		{
+			/* Plain column; get the value directly from the heap tuple */
+			datum = slot_getattr(slot, keycol, &isNull);
+		}
+		else
+		{
+			/* Expression; need to evaluate it */
+			if (partexpr_item == NULL)
+				elog(ERROR, "wrong number of partition key expressions");
+			datum = ExecEvalExprSwitchContext((ExprState *) lfirst(partexpr_item),
+											   GetPerTupleExprContext(estate),
+											   &isNull,
+											   NULL);
+			partexpr_item = lnext(partexpr_item);
+		}
+		values[i] = datum;
+		isnull[i] = isNull;
+	}
+
+	if (partexpr_item != NULL)
+		elog(ERROR, "wrong number of partition key expressions");
+}
+
+/*
+ * get_partition_for_tuple
+ *		Finds a leaf partition for tuple contained in *slot
+ *
+ * Returned value is the sequence number of the leaf partition thus found,
+ * or -1 if no leaf partition is found for the tuple.  *failed_at is set
+ * to the OID of the partitioned table whose partition was not found in
+ * the latter case.
+ */
+int
+get_partition_for_tuple(PartitionDispatch *pd,
+						TupleTableSlot *slot,
+						EState *estate,
+						Oid *failed_at)
+{
+	PartitionDispatch parent;
+	Datum	values[PARTITION_MAX_KEYS];
+	bool	isnull[PARTITION_MAX_KEYS];
+	int		cur_offset,
+			cur_index;
+	int		i;
+
+	/* start with the root partitioned table */
+	parent = pd[0];
+	while(true)
+	{
+		PartitionKey	key = parent->key;
+		PartitionDesc	partdesc = parent->partdesc;
+
+		/* Quick exit */
+		if (partdesc->nparts == 0)
+		{
+			*failed_at = parent->relid;
+			return -1;
+		}
+
+		/* Extract partition key from tuple */
+		FormPartitionKeyDatum(parent, slot, estate, values, isnull);
+
+		if (key->strategy == PARTITION_STRATEGY_RANGE)
+		{
+			/* Disallow nulls in the range partition key of the tuple */
+			for (i = 0; i < key->partnatts; i++)
+				if (isnull[i])
+					ereport(ERROR,
+					(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+					 errmsg("range partition key of row contains null")));
+		}
+
+		if (partdesc->boundinfo->has_null && isnull[0])
+			/* Tuple maps to the null-accepting list partition */
+			cur_index = partdesc->boundinfo->null_index;
+		else
+		{
+			/* Else bsearch in partdesc->boundinfo */
+			bool	equal = false;
+
+			cur_offset = partition_bound_bsearch(key, partdesc->boundinfo,
+												 values, false, &equal);
+			switch (key->strategy)
+			{
+				case PARTITION_STRATEGY_LIST:
+					if (cur_offset >= 0 && equal)
+						cur_index = partdesc->boundinfo->indexes[cur_offset];
+					else
+						cur_index = -1;
+					break;
+
+				case PARTITION_STRATEGY_RANGE:
+					/*
+					 * Offset returned is such that the bound at offset is
+					 * found to be less or equal with the tuple. So, the
+					 * bound at offset+1 would be the upper bound.
+					 */
+					cur_index = partdesc->boundinfo->indexes[cur_offset+1];
+					break;
+			}
+		}
+
+		/*
+		 * cur_index < 0 means we failed to find a partition of this parent.
+		 * cur_index >= 0 means we either found the leaf partition, or the
+		 * next parent to find a partition of.
+		 */
+		if (cur_index < 0)
+		{
+			*failed_at = parent->relid;
+			return -1;
+		}
+		else if (parent->indexes[cur_index] < 0)
+			parent = pd[-parent->indexes[cur_index]];
+		else
+			break;
+	}
+
+	return parent->indexes[cur_index];
+}
+
 /*
  * qsort_partition_list_value_cmp
  *
@@ -1461,6 +1768,36 @@ partition_rbound_cmp(PartitionKey key,
 }
 
 /*
+ * partition_rbound_datum_cmp
+ *
+ * Return whether range bound (specified in rb_datums, rb_content, and
+ * rb_lower) <=, =, >= partition key of tuple (tuple_datums)
+ */
+static int32
+partition_rbound_datum_cmp(PartitionKey key,
+						   Datum *rb_datums, RangeDatumContent *rb_content,
+						   Datum *tuple_datums)
+{
+	int		i;
+	int32	cmpval;
+
+	for (i = 0; i < key->partnatts; i++)
+	{
+		if (rb_content[i] != RANGE_DATUM_FINITE)
+			return rb_content[i] == RANGE_DATUM_NEG_INF ? -1 : 1;
+
+		cmpval = DatumGetInt32(FunctionCall2Coll(&key->partsupfunc[i],
+												 key->partcollation[i],
+												 rb_datums[i],
+												 tuple_datums[i]));
+		if (cmpval != 0)
+			break;
+	}
+
+	return cmpval;
+}
+
+/*
  * partition_bound_cmp
  * 
  * Return whether the bound at offset in boundinfo is <=, =, >= the argument
@@ -1499,7 +1836,10 @@ partition_bound_cmp(PartitionKey key, PartitionBoundInfo boundinfo,
 											  bound_datums, content, lower,
 											  (PartitionRangeBound *) probe);
 			}
-
+			else
+				cmpval = partition_rbound_datum_cmp(key,
+													bound_datums, content,
+													(Datum *) probe);
 			break;
 		}
 	}
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 7a2bf94..7d76ead 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -161,6 +161,10 @@ typedef struct CopyStateData
 	ExprState **defexprs;		/* array of default att expressions */
 	bool		volatile_defexprs;		/* is any of defexprs volatile? */
 	List	   *range_table;
+	PartitionDispatch	   *partition_dispatch_info;
+	int						num_partitions;
+	ResultRelInfo		   *partitions;
+	TupleConversionMap	  **partition_tupconv_maps;
 
 	/*
 	 * These variables are used to reduce overhead in textual COPY FROM.
@@ -1397,6 +1401,67 @@ BeginCopy(ParseState *pstate,
 					(errcode(ERRCODE_UNDEFINED_COLUMN),
 					 errmsg("table \"%s\" does not have OIDs",
 							RelationGetRelationName(cstate->rel))));
+
+		/*
+		 * Initialize state for CopyFrom tuple routing.  Watch out for
+		 * any foreign partitions.
+		 */
+		if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		{
+			PartitionDispatch *pd;
+			List		   *leaf_parts;
+			ListCell	   *cell;
+			int				i,
+							num_leaf_parts;
+			ResultRelInfo  *leaf_part_rri;
+
+			/* Get the tuple-routing information and lock partitions */
+			pd = RelationGetPartitionDispatchInfo(rel, RowExclusiveLock,
+												  &leaf_parts);
+			num_leaf_parts = list_length(leaf_parts);
+			cstate->partition_dispatch_info = pd;
+			cstate->num_partitions = num_leaf_parts;
+			cstate->partitions = (ResultRelInfo *) palloc(num_leaf_parts *
+														sizeof(ResultRelInfo));
+			cstate->partition_tupconv_maps = (TupleConversionMap **)
+						palloc0(num_leaf_parts * sizeof(TupleConversionMap *));
+
+			leaf_part_rri = cstate->partitions;
+			i = 0;
+			foreach(cell, leaf_parts)
+			{
+				Relation	partrel;
+
+				/*
+				 * All partitions locked above; will be closed after CopyFrom is
+				 * finished.
+				 */
+				partrel = heap_open(lfirst_oid(cell), NoLock);
+
+				/*
+				 * Verify result relation is a valid target for the current
+				 * operation.
+				 */
+				CheckValidResultRel(partrel, CMD_INSERT);
+
+				InitResultRelInfo(leaf_part_rri,
+								  partrel,
+								  1,		/* dummy */
+								  false,	/* no need for partition check */
+								  0);
+
+				/* Open partition indices */
+				ExecOpenIndices(leaf_part_rri, false);
+
+				if (!equalTupleDescs(tupDesc, RelationGetDescr(partrel)))
+					cstate->partition_tupconv_maps[i] =
+								convert_tuples_by_name(tupDesc,
+									RelationGetDescr(partrel),
+									gettext_noop("could not convert row type"));
+				leaf_part_rri++;
+				i++;
+			}
+		}
 	}
 	else
 	{
@@ -2255,6 +2320,7 @@ CopyFrom(CopyState cstate)
 	Datum	   *values;
 	bool	   *nulls;
 	ResultRelInfo *resultRelInfo;
+	ResultRelInfo *saved_resultRelInfo = NULL;
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ExprContext *econtext;
 	TupleTableSlot *myslot;
@@ -2281,6 +2347,7 @@ CopyFrom(CopyState cstate)
 	 * only hint about them in the view case.)
 	 */
 	if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
+		cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
 		!(cstate->rel->trigdesc &&
 		  cstate->rel->trigdesc->trig_insert_instead_row))
 	{
@@ -2391,6 +2458,7 @@ CopyFrom(CopyState cstate)
 	InitResultRelInfo(resultRelInfo,
 					  cstate->rel,
 					  1,		/* dummy rangetable index */
+					  true,		/* do load partition check expression */
 					  0);
 
 	ExecOpenIndices(resultRelInfo, false);
@@ -2418,6 +2486,7 @@ CopyFrom(CopyState cstate)
 	if ((resultRelInfo->ri_TrigDesc != NULL &&
 		 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
 		  resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+		cstate->partition_dispatch_info != NULL ||
 		cstate->volatile_defexprs)
 	{
 		useHeapMultiInsert = false;
@@ -2442,7 +2511,11 @@ CopyFrom(CopyState cstate)
 	values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
 	nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
 
-	bistate = GetBulkInsertState();
+	if (useHeapMultiInsert)
+		bistate = GetBulkInsertState();
+	else
+		bistate = NULL;
+
 	econtext = GetPerTupleExprContext(estate);
 
 	/* Set up callback to identify error line number */
@@ -2494,6 +2567,56 @@ CopyFrom(CopyState cstate)
 		slot = myslot;
 		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 
+		/* Determine the partition to heap_insert the tuple into */
+		if (cstate->partition_dispatch_info)
+		{
+			int		leaf_part_index;
+			TupleConversionMap *map;
+
+			/*
+			 * Away we go ... If we end up not finding a partition after all,
+			 * ExecFindPartition() does not return and errors out instead.
+			 * Otherwise, the returned value is to be used as an index into
+			 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+			 * will get us the ResultRelInfo and TupleConversionMap for the
+			 * partition, respectively.
+			 */
+			leaf_part_index = ExecFindPartition(resultRelInfo,
+											cstate->partition_dispatch_info,
+												slot,
+												estate);
+			Assert(leaf_part_index >= 0 &&
+				   leaf_part_index < cstate->num_partitions);
+
+			/*
+			 * Save the old ResultRelInfo and switch to the one corresponding
+			 * to the selected partition.
+			 */
+			saved_resultRelInfo = resultRelInfo;
+			resultRelInfo = cstate->partitions + leaf_part_index;
+
+			/* We do not yet have a way to insert into a foreign partition */
+			if (resultRelInfo->ri_FdwRoutine)
+				ereport(ERROR,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("cannot route inserted tuples to a foreign table")));
+
+			/*
+			 * For ExecInsertIndexTuples() to work on the partition's indexes
+			 */
+			estate->es_result_relation_info = resultRelInfo;
+
+			/*
+			 * We might need to convert from the parent rowtype to the
+			 * partition rowtype.
+			 */
+			map = cstate->partition_tupconv_maps[leaf_part_index];
+			if (map)
+				tuple = do_convert_tuple(tuple, map);
+
+			tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+		}
+
 		skip_tuple = false;
 
 		/* BEFORE ROW INSERT Triggers */
@@ -2553,7 +2676,8 @@ CopyFrom(CopyState cstate)
 					List	   *recheckIndexes = NIL;
 
 					/* OK, store the tuple and create index entries for it */
-					heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+					heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
+								hi_options, bistate);
 
 					if (resultRelInfo->ri_NumIndices > 0)
 						recheckIndexes = ExecInsertIndexTuples(slot,
@@ -2577,6 +2701,12 @@ CopyFrom(CopyState cstate)
 			 * tuples inserted by an INSERT command.
 			 */
 			processed++;
+
+			if (saved_resultRelInfo)
+			{
+				resultRelInfo = saved_resultRelInfo;
+				estate->es_result_relation_info = resultRelInfo;
+			}
 		}
 	}
 
@@ -2590,7 +2720,8 @@ CopyFrom(CopyState cstate)
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	FreeBulkInsertState(bistate);
+	if (bistate)
+		FreeBulkInsertState(bistate);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -2614,6 +2745,20 @@ CopyFrom(CopyState cstate)
 
 	ExecCloseIndices(resultRelInfo);
 
+	/* Close all partitions and indices thereof */
+	if (cstate->partition_dispatch_info)
+	{
+		int		i;
+
+		for (i = 0; i < cstate->num_partitions; i++)
+		{
+			ResultRelInfo *resultRelInfo = cstate->partitions + i;
+
+			ExecCloseIndices(resultRelInfo);
+			heap_close(resultRelInfo->ri_RelationDesc, NoLock);
+		}
+	}
+
 	FreeExecutorState(estate);
 
 	/*
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 99c9d10..8a8eb01 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -1322,6 +1322,7 @@ ExecuteTruncate(TruncateStmt *stmt)
 		InitResultRelInfo(resultRelInfo,
 						  rel,
 						  0,	/* dummy rangetable index */
+						  false,
 						  0);
 		resultRelInfo++;
 	}
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index c7a6347..54fb771 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -826,6 +826,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
 			InitResultRelInfo(resultRelInfo,
 							  resultRelation,
 							  resultRelationIndex,
+							  true,
 							  estate->es_instrument);
 			resultRelInfo++;
 		}
@@ -1215,6 +1216,7 @@ void
 InitResultRelInfo(ResultRelInfo *resultRelInfo,
 				  Relation resultRelationDesc,
 				  Index resultRelationIndex,
+				  bool load_partition_check,
 				  int instrument_options)
 {
 	MemSet(resultRelInfo, 0, sizeof(ResultRelInfo));
@@ -1252,8 +1254,10 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
 	resultRelInfo->ri_ConstraintExprs = NULL;
 	resultRelInfo->ri_junkFilter = NULL;
 	resultRelInfo->ri_projectReturning = NULL;
-	resultRelInfo->ri_PartitionCheck =
-						RelationGetPartitionQual(resultRelationDesc, true);
+	if (load_partition_check)
+		resultRelInfo->ri_PartitionCheck =
+							RelationGetPartitionQual(resultRelationDesc,
+													 true);
 }
 
 /*
@@ -1316,6 +1320,7 @@ ExecGetTriggerResultRel(EState *estate, Oid relid)
 	InitResultRelInfo(rInfo,
 					  rel,
 					  0,		/* dummy rangetable index */
+					  true,
 					  estate->es_instrument);
 	estate->es_trig_target_relations =
 		lappend(estate->es_trig_target_relations, rInfo);
@@ -2990,3 +2995,52 @@ EvalPlanQualEnd(EPQState *epqstate)
 	epqstate->planstate = NULL;
 	epqstate->origslot = NULL;
 }
+
+/*
+ * ExecFindPartition -- Find a leaf partition in the partition tree rooted
+ * at parent, for the heap tuple contained in *slot
+ *
+ * estate must be non-NULL; we'll need it to compute any expressions in the
+ * partition key(s)
+ *
+ * If no leaf partition is found, this routine errors out with the appropriate
+ * error message, else it returns the leaf partition sequence number returned
+ * by get_partition_for_tuple() unchanged.
+ */
+int
+ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
+				  TupleTableSlot *slot, EState *estate)
+{
+	int		result;
+	Oid		failed_at;
+	ExprContext *econtext = GetPerTupleExprContext(estate);
+
+	econtext->ecxt_scantuple = slot;
+	result = get_partition_for_tuple(pd, slot, estate, &failed_at);
+	if (result < 0)
+	{
+		Relation	rel = resultRelInfo->ri_RelationDesc;
+		char	   *val_desc;
+		Bitmapset  *insertedCols,
+				   *updatedCols,
+				   *modifiedCols;
+		TupleDesc	tupDesc = RelationGetDescr(rel);
+
+		insertedCols = GetInsertedColumns(resultRelInfo, estate);
+		updatedCols = GetUpdatedColumns(resultRelInfo, estate);
+		modifiedCols = bms_union(insertedCols, updatedCols);
+		val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
+												 slot,
+												 tupDesc,
+												 modifiedCols,
+												 64);
+		Assert(OidIsValid(failed_at));
+		ereport(ERROR,
+				(errcode(ERRCODE_CHECK_VIOLATION),
+				 errmsg("no partition of relation \"%s\" found for row",
+						get_rel_name(failed_at)),
+		  val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
+	}
+
+	return result;
+}
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 6eccfb7..87a04aa 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -258,6 +258,7 @@ ExecInsert(ModifyTableState *mtstate,
 {
 	HeapTuple	tuple;
 	ResultRelInfo *resultRelInfo;
+	ResultRelInfo *saved_resultRelInfo = NULL;
 	Relation	resultRelationDesc;
 	Oid			newId;
 	List	   *recheckIndexes = NIL;
@@ -272,6 +273,56 @@ ExecInsert(ModifyTableState *mtstate,
 	 * get information on the (current) result relation
 	 */
 	resultRelInfo = estate->es_result_relation_info;
+
+	/* Determine the partition to heap_insert the tuple into */
+	if (mtstate->mt_partition_dispatch_info)
+	{
+		int		leaf_part_index;
+		TupleConversionMap *map;
+
+		/*
+		 * Away we go ... If we end up not finding a partition after all,
+		 * ExecFindPartition() does not return and errors out instead.
+		 * Otherwise, the returned value is to be used as an index into
+		 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+		 * will get us the ResultRelInfo and TupleConversionMap for the
+		 * partition, respectively.
+		 */
+		leaf_part_index = ExecFindPartition(resultRelInfo,
+										mtstate->mt_partition_dispatch_info,
+											slot,
+											estate);
+		Assert(leaf_part_index >= 0 &&
+			   leaf_part_index < mtstate->mt_num_partitions);
+
+		/*
+		 * Save the old ResultRelInfo and switch to the one corresponding to
+		 * the selected partition.
+		 */
+		saved_resultRelInfo = resultRelInfo;
+		resultRelInfo = mtstate->mt_partitions + leaf_part_index;
+
+		/* We do not yet have a way to insert into a foreign partition */
+		if (resultRelInfo->ri_FdwRoutine)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("cannot route inserted tuples to a foreign table")));
+
+		/* For ExecInsertIndexTuples() to work on the partition's indexes */
+		estate->es_result_relation_info = resultRelInfo;
+
+		/*
+		 * We might need to convert from the parent rowtype to the partition
+		 * rowtype.
+		 */
+		map = mtstate->mt_partition_tupconv_maps[leaf_part_index];
+		if (map)
+		{
+			tuple = do_convert_tuple(tuple, map);
+			ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+		}
+	}
+
 	resultRelationDesc = resultRelInfo->ri_RelationDesc;
 
 	/*
@@ -511,6 +562,12 @@ ExecInsert(ModifyTableState *mtstate,
 
 	list_free(recheckIndexes);
 
+	if (saved_resultRelInfo)
+	{
+		resultRelInfo = saved_resultRelInfo;
+		estate->es_result_relation_info = resultRelInfo;
+	}
+
 	/*
 	 * Check any WITH CHECK OPTION constraints from parent views.  We are
 	 * required to do this after testing all constraints and uniqueness
@@ -1565,6 +1622,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	Plan	   *subplan;
 	ListCell   *l;
 	int			i;
+	Relation	rel;
 
 	/* check for unsupported flags */
 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -1655,6 +1713,69 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
 	estate->es_result_relation_info = saved_resultRelInfo;
 
+	/* Build state for INSERT tuple routing */
+	rel = mtstate->resultRelInfo->ri_RelationDesc;
+	if (operation == CMD_INSERT &&
+		rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		PartitionDispatch  *pd;
+		int					i,
+							j,
+							num_leaf_parts;
+		List			   *leaf_parts;
+		ListCell		   *cell;
+		ResultRelInfo	   *leaf_part_rri;
+
+		/* Form the partition node tree and lock partitions */
+		pd = RelationGetPartitionDispatchInfo(rel, RowExclusiveLock,
+											  &leaf_parts);
+		mtstate->mt_partition_dispatch_info = pd;
+		num_leaf_parts = list_length(leaf_parts);
+		mtstate->mt_num_partitions = num_leaf_parts;
+		mtstate->mt_partitions = (ResultRelInfo *)
+						palloc0(num_leaf_parts * sizeof(ResultRelInfo));
+		mtstate->mt_partition_tupconv_maps = (TupleConversionMap **)
+					palloc0(num_leaf_parts * sizeof(TupleConversionMap *));
+
+		leaf_part_rri = mtstate->mt_partitions;
+		i = j = 0;
+		foreach(cell, leaf_parts)
+		{
+			Oid			ftoid = lfirst_oid(cell);
+			Relation	part_rel;
+
+			part_rel = heap_open(ftoid, RowExclusiveLock);
+
+			/*
+			 * Verify result relation is a valid target for the current
+			 * operation
+			 */
+			CheckValidResultRel(part_rel, CMD_INSERT);
+
+			InitResultRelInfo(leaf_part_rri,
+							  part_rel,
+							  1,		/* dummy */
+							  false,	/* no need for partition checks */
+							  eflags);
+
+			/* Open partition indices (note: ON CONFLICT unsupported)*/
+			if (leaf_part_rri->ri_RelationDesc->rd_rel->relhasindex &&
+				operation != CMD_DELETE &&
+				leaf_part_rri->ri_IndexRelationDescs == NULL)
+				ExecOpenIndices(leaf_part_rri, false);
+
+			if (!equalTupleDescs(RelationGetDescr(rel),
+								 RelationGetDescr(part_rel)))
+				mtstate->mt_partition_tupconv_maps[i] =
+							convert_tuples_by_name(RelationGetDescr(rel),
+												   RelationGetDescr(part_rel),
+								  gettext_noop("could not convert row type"));
+
+			leaf_part_rri++;
+			i++;
+		}
+	}
+
 	/*
 	 * Initialize any WITH CHECK OPTION constraints if needed.
 	 */
@@ -1972,6 +2093,15 @@ ExecEndModifyTable(ModifyTableState *node)
 														   resultRelInfo);
 	}
 
+	/* Close all partitions and indices thereof */
+	for (i = 0; i < node->mt_num_partitions; i++)
+	{
+		ResultRelInfo *resultRelInfo = node->mt_partitions + i;
+
+		ExecCloseIndices(resultRelInfo);
+		heap_close(resultRelInfo->ri_RelationDesc, NoLock);
+	}
+
 	/*
 	 * Free the exprcontext
 	 */
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 36f8c54..88380ba 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -806,8 +806,16 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
 
 	/* Process ON CONFLICT, if any. */
 	if (stmt->onConflictClause)
+	{
+		/* Bail out if target relation is partitioned table */
+		if (pstate->p_target_rangetblentry->relkind == RELKIND_PARTITIONED_TABLE)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("ON CONFLICT clause is not supported with partitioned tables")));
+
 		qry->onConflict = transformOnConflictClause(pstate,
 													stmt->onConflictClause);
+	}
 
 	/*
 	 * If we have a RETURNING clause, we need to add the target relation to
diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h
index 70d8325..f76c5d9 100644
--- a/src/include/catalog/partition.h
+++ b/src/include/catalog/partition.h
@@ -14,6 +14,8 @@
 #define PARTITION_H
 
 #include "fmgr.h"
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
 #include "parser/parse_node.h"
 #include "utils/rel.h"
 
@@ -36,6 +38,7 @@ typedef struct PartitionDescData
 } PartitionDescData;
 
 typedef struct PartitionDescData *PartitionDesc;
+typedef struct PartitionDispatchData *PartitionDispatch;
 
 extern void RelationBuildPartitionDesc(Relation relation);
 extern bool partition_bounds_equal(PartitionKey key,
@@ -45,4 +48,12 @@ extern void check_new_partition_bound(char *relname, Relation parent, Node *boun
 extern Oid get_partition_parent(Oid relid);
 extern List *get_qual_from_partbound(Relation rel, Relation parent, Node *bound);
 extern List *RelationGetPartitionQual(Relation rel, bool recurse);
+
+/* For tuple routing */
+extern PartitionDispatch *RelationGetPartitionDispatchInfo(Relation rel, int lockmode,
+								 List **leaf_part_oids);
+extern int get_partition_for_tuple(PartitionDispatch *pd,
+					TupleTableSlot *slot,
+					EState *estate,
+					Oid *failed_at);
 #endif   /* PARTITION_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 136276b..b4d09f9 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -14,6 +14,7 @@
 #ifndef EXECUTOR_H
 #define EXECUTOR_H
 
+#include "catalog/partition.h"
 #include "executor/execdesc.h"
 #include "nodes/parsenodes.h"
 
@@ -188,6 +189,7 @@ extern void CheckValidResultRel(Relation resultRel, CmdType operation);
 extern void InitResultRelInfo(ResultRelInfo *resultRelInfo,
 				  Relation resultRelationDesc,
 				  Index resultRelationIndex,
+				  bool load_partition_check,
 				  int instrument_options);
 extern ResultRelInfo *ExecGetTriggerResultRel(EState *estate, Oid relid);
 extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids);
@@ -211,6 +213,10 @@ extern void EvalPlanQualSetPlan(EPQState *epqstate,
 extern void EvalPlanQualSetTuple(EPQState *epqstate, Index rti,
 					 HeapTuple tuple);
 extern HeapTuple EvalPlanQualGetTuple(EPQState *epqstate, Index rti);
+extern int ExecFindPartition(ResultRelInfo *resultRelInfo,
+				  PartitionDispatch *pd,
+				  TupleTableSlot *slot,
+				  EState *estate);
 
 #define EvalPlanQualSetSlot(epqstate, slot)  ((epqstate)->origslot = (slot))
 extern void EvalPlanQualFetchRowMarks(EPQState *epqstate);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ff8b66b..606cb21 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -16,6 +16,7 @@
 
 #include "access/genam.h"
 #include "access/heapam.h"
+#include "access/tupconvert.h"
 #include "executor/instrument.h"
 #include "lib/pairingheap.h"
 #include "nodes/params.h"
@@ -1147,6 +1148,13 @@ typedef struct ModifyTableState
 										 * tlist  */
 	TupleTableSlot *mt_conflproj;		/* CONFLICT ... SET ... projection
 										 * target */
+	struct PartitionDispatchData **mt_partition_dispatch_info;
+										/* Tuple-routing support info */
+	int				mt_num_partitions;	/* Number of members in the
+										 * following arrays */
+	ResultRelInfo  *mt_partitions;	/* Per partition result relation */
+	TupleConversionMap **mt_partition_tupconv_maps;
+									/* Per partition tuple conversion map */
 } ModifyTableState;
 
 /* ----------------
diff --git a/src/test/regress/expected/insert.out b/src/test/regress/expected/insert.out
index cc9f957..9706034 100644
--- a/src/test/regress/expected/insert.out
+++ b/src/test/regress/expected/insert.out
@@ -223,6 +223,58 @@ DETAIL:  Failing row contains (cc, 1).
 -- ok
 insert into part_EE_FF_1_10 values ('ff', 1);
 insert into part_EE_FF_10_20 values ('ff', 11);
+-- Check tuple routing for partitioned tables
+-- fail
+insert into range_parted values ('a', 0);
+ERROR:  no partition of relation "range_parted" found for row
+DETAIL:  Failing row contains (a, 0).
+-- ok
+insert into range_parted values ('a', 1);
+insert into range_parted values ('a', 10);
+-- fail
+insert into range_parted values ('a', 20);
+ERROR:  no partition of relation "range_parted" found for row
+DETAIL:  Failing row contains (a, 20).
+-- ok
+insert into range_parted values ('b', 1);
+insert into range_parted values ('b', 10);
+select tableoid::regclass, * from range_parted;
+    tableoid    | a | b  
+----------------+---+----
+ part_a_1_a_10  | a |  1
+ part_a_1_a_10  | a |  1
+ part_a_10_a_20 | a | 10
+ part_b_1_b_10  | b |  1
+ part_b_10_b_20 | b | 10
+ part_b_10_b_20 | b | 10
+(6 rows)
+
+-- ok
+insert into list_parted values (null, 1);
+insert into list_parted (a) values ('aA');
+-- fail (partition of part_EE_FF not found)
+insert into list_parted values ('EE', 0);
+ERROR:  no partition of relation "part_ee_ff" found for row
+DETAIL:  Failing row contains (EE, 0).
+insert into part_EE_FF values ('EE', 0);
+ERROR:  no partition of relation "part_ee_ff" found for row
+DETAIL:  Failing row contains (EE, 0).
+-- ok
+insert into list_parted values ('EE', 1);
+insert into part_EE_FF values ('EE', 10);
+select tableoid::regclass, * from list_parted;
+     tableoid     | a  | b  
+------------------+----+----
+ part_aa_bb       | aA |   
+ part_cc_dd       | cC |  1
+ part_null        |    |  0
+ part_null        |    |  1
+ part_ee_ff_1_10  | ff |  1
+ part_ee_ff_1_10  | EE |  1
+ part_ee_ff_10_20 | ff | 11
+ part_ee_ff_10_20 | EE | 10
+(8 rows)
+
 -- cleanup
 drop table range_parted cascade;
 NOTICE:  drop cascades to 4 other objects
diff --git a/src/test/regress/sql/insert.sql b/src/test/regress/sql/insert.sql
index 3a9430e..afecb74 100644
--- a/src/test/regress/sql/insert.sql
+++ b/src/test/regress/sql/insert.sql
@@ -137,6 +137,31 @@ insert into part_EE_FF_1_10 values ('cc', 1);
 insert into part_EE_FF_1_10 values ('ff', 1);
 insert into part_EE_FF_10_20 values ('ff', 11);
 
+-- Check tuple routing for partitioned tables
+
+-- fail
+insert into range_parted values ('a', 0);
+-- ok
+insert into range_parted values ('a', 1);
+insert into range_parted values ('a', 10);
+-- fail
+insert into range_parted values ('a', 20);
+-- ok
+insert into range_parted values ('b', 1);
+insert into range_parted values ('b', 10);
+select tableoid::regclass, * from range_parted;
+
+-- ok
+insert into list_parted values (null, 1);
+insert into list_parted (a) values ('aA');
+-- fail (partition of part_EE_FF not found)
+insert into list_parted values ('EE', 0);
+insert into part_EE_FF values ('EE', 0);
+-- ok
+insert into list_parted values ('EE', 1);
+insert into part_EE_FF values ('EE', 10);
+select tableoid::regclass, * from list_parted;
+
 -- cleanup
 drop table range_parted cascade;
 drop table list_parted cascade;
-- 
1.7.1

