From 20ad3320d9d5f16756d3fd0bba2db5df74117d35 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 26 Feb 2020 17:34:54 -0300
Subject: [PATCH] Enable BEFORE row-level triggers for partitioned tables

... with the limitation that if partitioning columns are changed, the
operation is aborted.  (The reason for this limitation is that it might
require re-routing the tuple, which doesn't work.)
---
 src/backend/commands/tablecmds.c       |   7 --
 src/backend/commands/trigger.c         | 106 ++++++++++++++++++++++---
 src/backend/partitioning/partdesc.c    |   2 +-
 src/include/utils/reltrigger.h         |   1 +
 src/test/regress/expected/triggers.out |  53 ++++++++++++-
 src/test/regress/sql/triggers.sql      |  39 ++++++++-
 6 files changed, 182 insertions(+), 26 deletions(-)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 02a7c04fdb..3b560067a4 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -16503,13 +16503,6 @@ CloneRowTriggersToPartition(Relation parent, Relation partition)
 			 !OidIsValid(trigForm->tgparentid)))
 			continue;
 
-		/*
-		 * Complain if we find an unexpected trigger type.
-		 */
-		if (!TRIGGER_FOR_AFTER(trigForm->tgtype))
-			elog(ERROR, "unexpected trigger \"%s\" found",
-				 NameStr(trigForm->tgname));
-
 		/* Use short-lived context for CREATE TRIGGER */
 		oldcxt = MemoryContextSwitchTo(perTupCxt);
 
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 6e8b7223fe..d4687de211 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -53,10 +53,12 @@
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/bytea.h"
+#include "utils/datum.h"
 #include "utils/fmgroids.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/partcache.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
@@ -82,6 +84,9 @@ static int	MyTriggerDepth = 0;
 /* Local function prototypes */
 static void ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid);
 static void SetTriggerFlags(TriggerDesc *trigdesc, Trigger *trigger);
+static void ReportTriggerPartkeyChange(ResultRelInfo *relinfo,
+					HeapTuple oldtuple, HeapTuple newtuple,
+					const char *trigname);
 static bool GetTupleForTrigger(EState *estate,
 							   EPQState *epqstate,
 							   ResultRelInfo *relinfo,
@@ -226,18 +231,6 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
 		 */
 		if (stmt->row)
 		{
-			/*
-			 * BEFORE triggers FOR EACH ROW are forbidden, because they would
-			 * allow the user to direct the row to another partition, which
-			 * isn't implemented in the executor.
-			 */
-			if (stmt->timing != TRIGGER_TYPE_AFTER)
-				ereport(ERROR,
-						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-						 errmsg("\"%s\" is a partitioned table",
-								RelationGetRelationName(rel)),
-						 errdetail("Partitioned tables cannot have BEFORE / FOR EACH ROW triggers.")));
-
 			/*
 			 * Disallow use of transition tables.
 			 *
@@ -1977,6 +1970,7 @@ RelationBuildTriggers(Relation relation)
 		build->tgtype = pg_trigger->tgtype;
 		build->tgenabled = pg_trigger->tgenabled;
 		build->tgisinternal = pg_trigger->tgisinternal;
+		build->tgisclone = OidIsValid(pg_trigger->tgparentid);
 		build->tgconstrrelid = pg_trigger->tgconstrrelid;
 		build->tgconstrindid = pg_trigger->tgconstrindid;
 		build->tgconstraint = pg_trigger->tgconstraint;
@@ -2280,6 +2274,8 @@ equalTriggerDescs(TriggerDesc *trigdesc1, TriggerDesc *trigdesc2)
 				return false;
 			if (trig1->tgisinternal != trig2->tgisinternal)
 				return false;
+			if (trig1->tgisclone != trig2->tgisclone)
+				return false;
 			if (trig1->tgconstrrelid != trig2->tgconstrrelid)
 				return false;
 			if (trig1->tgconstrindid != trig2->tgconstrindid)
@@ -2576,6 +2572,14 @@ ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo,
 		}
 		else if (newtuple != oldtuple)
 		{
+			/*
+			 * If this trigger is a clone of some trigger in a partitioned
+			 * table, disallow it from changing the partitioning key.
+			 */
+			if (trigger->tgisclone)
+				ReportTriggerPartkeyChange(relinfo, oldtuple, newtuple,
+										   trigger->tgname);
+
 			ExecForceStoreHeapTuple(newtuple, slot, false);
 
 			if (should_free)
@@ -3102,6 +3106,10 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
 		{
 			ExecForceStoreHeapTuple(newtuple, newslot, false);
 
+			if (trigger->tgisclone)
+				ReportTriggerPartkeyChange(relinfo, trigtuple, newtuple,
+										   trigger->tgname);
+
 			/*
 			 * If the tuple returned by the trigger / being stored, is the old
 			 * row version, and the heap tuple passed to the trigger was
@@ -3411,6 +3419,80 @@ GetTupleForTrigger(EState *estate,
 	return true;
 }
 
+/*
+ * When a tuple in a partition is being modified by a BEFORE trigger, raise an
+ * error if the partitioning key columns are changed by the trigger.
+ */
+static void
+ReportTriggerPartkeyChange(ResultRelInfo *relinfo,
+					HeapTuple oldtuple, HeapTuple newtuple,
+					const char *trigname)
+{
+	Oid			current_rel_id;
+
+	/*
+	 * For each partitioning level, climbing up from the leaf, check whether
+	 * the partitioning key columns are being bytewise modified.  If any
+	 * column is modified, abort the operation.
+	 *
+	 * Repeat the check until we reach the root of the partitioning hierarchy.
+	 */
+	current_rel_id = relinfo->ri_RelationDesc->rd_id;
+	for (;;)
+	{
+		PartitionKey	key;
+		Oid				parent;
+		Relation		prel;
+
+		parent = get_partition_parent(current_rel_id);
+		prel = relation_open(parent, NoLock);	/* we already hold lock */
+		key = RelationGetPartitionKey(prel);
+		for (int i = 0; i < key->partnatts; i++)
+		{
+			Form_pg_attribute att;
+			AttrNumber	attnum = key->partattrs[i];
+			bool	isnull;
+			Datum	oatt,
+					natt;
+
+			/*
+			 * FIXME [at least] two problems here: 1. we repeat attr
+			 * extraction for every partitioning level; 2. we need to map
+			 * attribute numbers.  Maybe it'd be better to be passed slots, not
+			 * heap-tuples.
+			 */
+			oatt = heap_getattr(oldtuple, attnum, prel->rd_att, &isnull);
+			Assert(!isnull);	/* partkeys cannot be null */
+			natt = heap_getattr(newtuple, attnum, prel->rd_att, &isnull);
+			Assert(!isnull);
+
+			att = TupleDescAttr(prel->rd_att, attnum - 1);
+
+			/*
+			 * No need for datum_image_eq detoasting here; if the partition key
+			 * changed at all, report that as an error.
+			 */
+			if (!datumIsEqual(oatt, natt,
+							  att->attbyval, att->attlen))
+				ereport(ERROR,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("changing partitioning columns in a before trigger is not supported"),
+						 errdetail("Column \"%s\" was changed by trigger \"%s\".",
+								   NameStr(att->attname), trigname)));
+		}
+
+		/* up one level */
+		current_rel_id = parent;
+		relation_close(prel, NoLock);
+
+		/* If we just checked the root, we're done */
+		if (current_rel_id == RelationGetRelid(relinfo->ri_PartitionRoot))
+			break;
+
+		CHECK_FOR_INTERRUPTS();		/* for luck */
+	}
+}
+
 /*
  * Is trigger enabled to fire?
  */
diff --git a/src/backend/partitioning/partdesc.c b/src/backend/partitioning/partdesc.c
index bdda42e40f..e7f23542e8 100644
--- a/src/backend/partitioning/partdesc.c
+++ b/src/backend/partitioning/partdesc.c
@@ -307,7 +307,7 @@ CreatePartitionDirectory(MemoryContext mcxt)
  *
  * The purpose of this function is to ensure that we get the same
  * PartitionDesc for each relation every time we look it up.  In the
- * face of current DDL, different PartitionDescs may be constructed with
+ * face of concurrent DDL, different PartitionDescs may be constructed with
  * different views of the catalog state, but any single particular OID
  * will always get the same PartitionDesc for as long as the same
  * PartitionDirectory is used.
diff --git a/src/include/utils/reltrigger.h b/src/include/utils/reltrigger.h
index 28df43d833..b22acb034e 100644
--- a/src/include/utils/reltrigger.h
+++ b/src/include/utils/reltrigger.h
@@ -29,6 +29,7 @@ typedef struct Trigger
 	int16		tgtype;
 	char		tgenabled;
 	bool		tgisinternal;
+	bool		tgisclone;
 	Oid			tgconstrrelid;
 	Oid			tgconstrindid;
 	Oid			tgconstraint;
diff --git a/src/test/regress/expected/triggers.out b/src/test/regress/expected/triggers.out
index 22e65cc1ec..dfd7cef743 100644
--- a/src/test/regress/expected/triggers.out
+++ b/src/test/regress/expected/triggers.out
@@ -1958,10 +1958,6 @@ drop table my_table;
 create table parted_trig (a int) partition by list (a);
 create function trigger_nothing() returns trigger
   language plpgsql as $$ begin end; $$;
-create trigger failed before insert or update or delete on parted_trig
-  for each row execute procedure trigger_nothing();
-ERROR:  "parted_trig" is a partitioned table
-DETAIL:  Partitioned tables cannot have BEFORE / FOR EACH ROW triggers.
 create trigger failed instead of update on parted_trig
   for each row execute procedure trigger_nothing();
 ERROR:  "parted_trig" is a table
@@ -2246,6 +2242,55 @@ NOTICE:  aasvogel <- woof!
 NOTICE:  trigger parted_trig on parted1_irreg AFTER INSERT for ROW: (a,b)=(3,aasvogel)
 NOTICE:  trigger parted_trig_odd on parted1_irreg AFTER INSERT for ROW: (a,b)=(3,aasvogel)
 drop table parted_irreg_ancestor;
+-- Before triggers and partitions
+create table parted (a int, b int, c text) partition by list (a);
+create table parted_1 partition of parted for values in (1)
+  partition by list (b);
+create table parted_1_1 partition of parted_1 for values in (1);
+create function parted_trigfunc() returns trigger language plpgsql as $$
+begin
+  new.a = new.a + 1;
+  return new;
+end;
+$$;
+insert into parted values (1, 1, 'uno uno');    -- works
+create trigger t before insert or update or delete on parted
+  for each row execute procedure parted_trigfunc();
+insert into parted values (1, 1, 'uno uno v2');    -- fail
+ERROR:  changing partitioning columns in a before trigger is not supported
+DETAIL:  Column "a" was changed by trigger "t".
+update parted set c = c || 'v3';                   -- fail
+ERROR:  changing partitioning columns in a before trigger is not supported
+DETAIL:  Column "a" was changed by trigger "t".
+create or replace function parted_trigfunc() returns trigger language plpgsql as $$
+begin
+  new.b = new.b + 1;
+  return new;
+end;
+$$;
+insert into parted values (1, 1, 'uno uno v4');    -- fail
+ERROR:  changing partitioning columns in a before trigger is not supported
+DETAIL:  Column "b" was changed by trigger "t".
+update parted set c = c || 'v5';                   -- fail
+ERROR:  changing partitioning columns in a before trigger is not supported
+DETAIL:  Column "b" was changed by trigger "t".
+create or replace function parted_trigfunc() returns trigger language plpgsql as $$
+begin
+  new.c = new.c || ' and so';
+  return new;
+end;
+$$;
+insert into parted values (1, 1, 'uno uno');       -- works
+update parted set c = c || ' v6';                   -- works
+select tableoid::regclass, * from parted;
+  tableoid  | a | b |            c             
+------------+---+---+--------------------------
+ parted_1_1 | 1 | 1 | uno uno v6 and so
+ parted_1_1 | 1 | 1 | uno uno and so v6 and so
+(2 rows)
+
+drop table parted;
+drop function parted_trigfunc();
 --
 -- Constraint triggers and partitioned tables
 create table parted_constr_ancestor (a int, b text)
diff --git a/src/test/regress/sql/triggers.sql b/src/test/regress/sql/triggers.sql
index 0f61fdf0ea..3d9f14618c 100644
--- a/src/test/regress/sql/triggers.sql
+++ b/src/test/regress/sql/triggers.sql
@@ -1348,8 +1348,6 @@ drop table my_table;
 create table parted_trig (a int) partition by list (a);
 create function trigger_nothing() returns trigger
   language plpgsql as $$ begin end; $$;
-create trigger failed before insert or update or delete on parted_trig
-  for each row execute procedure trigger_nothing();
 create trigger failed instead of update on parted_trig
   for each row execute procedure trigger_nothing();
 create trigger failed after update on parted_trig
@@ -1561,6 +1559,43 @@ insert into parted1_irreg values ('aardwolf', 2);
 insert into parted_irreg_ancestor values ('aasvogel', 3);
 drop table parted_irreg_ancestor;
 
+-- Before triggers and partitions
+create table parted (a int, b int, c text) partition by list (a);
+create table parted_1 partition of parted for values in (1)
+  partition by list (b);
+create table parted_1_1 partition of parted_1 for values in (1);
+create function parted_trigfunc() returns trigger language plpgsql as $$
+begin
+  new.a = new.a + 1;
+  return new;
+end;
+$$;
+insert into parted values (1, 1, 'uno uno');    -- works
+create trigger t before insert or update or delete on parted
+  for each row execute procedure parted_trigfunc();
+insert into parted values (1, 1, 'uno uno v2');    -- fail
+update parted set c = c || 'v3';                   -- fail
+create or replace function parted_trigfunc() returns trigger language plpgsql as $$
+begin
+  new.b = new.b + 1;
+  return new;
+end;
+$$;
+insert into parted values (1, 1, 'uno uno v4');    -- fail
+update parted set c = c || 'v5';                   -- fail
+create or replace function parted_trigfunc() returns trigger language plpgsql as $$
+begin
+  new.c = new.c || ' and so';
+  return new;
+end;
+$$;
+insert into parted values (1, 1, 'uno uno');       -- works
+update parted set c = c || ' v6';                   -- works
+select tableoid::regclass, * from parted;
+
+drop table parted;
+drop function parted_trigfunc();
+
 --
 -- Constraint triggers and partitioned tables
 create table parted_constr_ancestor (a int, b text)
-- 
2.20.1

