From 04f2921672cb82d04c6fdd7a66d2808162e02881 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 3 Apr 2022 12:54:14 -0700
Subject: [PATCH v70 08/27] pgstat: scaffolding for transactional stats
 creation / drop.

On its own this doesn't achieve anything, but it's a largely independent piece
of infrastructure, so committing it separately makes sense.

FIXME: Need to bump XLOG_PAGE_MAGIC

Author: Andres Freund <andres@anarazel.de>
Reviewed-By: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/20220303021600.hs34ghqcw6zcokdh@alap3.anarazel.de
---
 src/include/access/xact.h                     |  40 +++-
 src/include/pgstat.h                          |  15 +-
 src/include/utils/pgstat_internal.h           |  13 ++
 src/backend/access/rmgrdesc/xactdesc.c        |  52 +++++
 src/backend/access/transam/twophase.c         |  45 ++++
 src/backend/access/transam/xact.c             |  64 ++++-
 src/backend/catalog/heap.c                    |   7 +
 src/backend/catalog/pg_proc.c                 |   5 +
 src/backend/commands/functioncmds.c           |   2 +
 src/backend/commands/subscriptioncmds.c       |   4 +-
 src/backend/postmaster/pgstat.c               |   4 +-
 src/backend/replication/slot.c                |   4 +-
 src/backend/storage/smgr/smgr.c               |   5 -
 src/backend/utils/activity/pgstat_function.c  |  22 ++
 src/backend/utils/activity/pgstat_relation.c  |  39 ++--
 src/backend/utils/activity/pgstat_replslot.c  |   4 +-
 .../utils/activity/pgstat_subscription.c      |  21 +-
 src/backend/utils/activity/pgstat_xact.c      | 220 ++++++++++++++++++
 src/tools/pgindent/typedefs.list              |   3 +
 19 files changed, 525 insertions(+), 44 deletions(-)

diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 062cc7e17d8..4e1e8735010 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -180,6 +180,7 @@ typedef struct SavedTransactionCharacteristics
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
 #define XACT_XINFO_HAS_GID				(1U << 7)
+#define XACT_XINFO_HAS_DROPPED_STATS	(1U << 8)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -230,7 +231,7 @@ typedef struct xl_xact_assignment
 typedef struct xl_xact_xinfo
 {
 	/*
-	 * Even though we right now only require 1 byte of space in xinfo we use
+	 * Even though we right now only require two bytes of space in xinfo we use
 	 * four so following records don't have to care about alignment. Commit
 	 * records can be large, so copying large portions isn't attractive.
 	 */
@@ -257,6 +258,27 @@ typedef struct xl_xact_relfilenodes
 } xl_xact_relfilenodes;
 #define MinSizeOfXactRelfilenodes offsetof(xl_xact_relfilenodes, xnodes)
 
+/*
+ * A transactionally dropped statistics entry.
+ *
+ * Declared here rather than pgstat.h because pgstat.h can't be included from
+ * frontend code, but the WAL format needs to be readable by frontend
+ * programs.
+ */
+typedef struct xl_xact_stats_item
+{
+	int			kind;
+	Oid			dboid;
+	Oid			objoid;
+} xl_xact_stats_item;
+
+typedef struct xl_xact_stats_items
+{
+	int		nitems;
+	xl_xact_stats_item items[FLEXIBLE_ARRAY_MEMBER];
+} xl_xact_stats_items;
+#define MinSizeOfXactStatsItems offsetof(xl_xact_stats_items, items)
+
 typedef struct xl_xact_invals
 {
 	int			nmsgs;			/* number of shared inval msgs */
@@ -283,6 +305,7 @@ typedef struct xl_xact_commit
 	/* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
 	/* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
 	/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
+	/* xl_xact_stats_items follows if XINFO_HAS_DROPPED_STATS */
 	/* xl_xact_invals follows if XINFO_HAS_INVALS */
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
@@ -298,6 +321,7 @@ typedef struct xl_xact_abort
 	/* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
 	/* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
 	/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
+	/* xl_xact_stats_items follows if XINFO_HAS_DROPPED_STATS */
 	/* No invalidation messages needed. */
 	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
 	/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
@@ -316,6 +340,8 @@ typedef struct xl_xact_prepare
 	int32		nsubxacts;		/* number of following subxact XIDs */
 	int32		ncommitrels;	/* number of delete-on-commit rels */
 	int32		nabortrels;		/* number of delete-on-abort rels */
+	int32		ncommitstats;	/* number of stats to drop on commit */
+	int32		nabortstats;	/* number of stats to drop on abort */
 	int32		ninvalmsgs;		/* number of cache invalidation messages */
 	bool		initfileinval;	/* does relcache init file need invalidation? */
 	uint16		gidlen;			/* length of the GID - GID follows the header */
@@ -342,6 +368,9 @@ typedef struct xl_xact_parsed_commit
 	int			nrels;
 	RelFileNode *xnodes;
 
+	int			nstats;
+	xl_xact_stats_item *stats;
+
 	int			nmsgs;
 	SharedInvalidationMessage *msgs;
 
@@ -349,6 +378,8 @@ typedef struct xl_xact_parsed_commit
 	char		twophase_gid[GIDSIZE];	/* only for 2PC */
 	int			nabortrels;		/* only for 2PC */
 	RelFileNode *abortnodes;	/* only for 2PC */
+	int			nabortstats;		/* only for 2PC */
+	xl_xact_stats_item *abortstats; /* only for 2PC */
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
@@ -370,6 +401,9 @@ typedef struct xl_xact_parsed_abort
 	int			nrels;
 	RelFileNode *xnodes;
 
+	int			nstats;
+	xl_xact_stats_item *stats;
+
 	TransactionId twophase_xid; /* only for 2PC */
 	char		twophase_gid[GIDSIZE];	/* only for 2PC */
 
@@ -449,6 +483,8 @@ extern int	xactGetCommittedChildren(TransactionId **ptr);
 extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
 									  int nsubxacts, TransactionId *subxacts,
 									  int nrels, RelFileNode *rels,
+									  int nstats,
+									  xl_xact_stats_item *stats,
 									  int nmsgs, SharedInvalidationMessage *msgs,
 									  bool relcacheInval,
 									  int xactflags,
@@ -458,6 +494,8 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
 extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 									 int nsubxacts, TransactionId *subxacts,
 									 int nrels, RelFileNode *rels,
+									 int nstats,
+									 xl_xact_stats_item *stats,
 									 int xactflags, TransactionId twophase_xid,
 									 const char *twophase_gid);
 extern void xact_redo(XLogReaderState *record);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 87ad20eb9b5..6dfde3246de 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -987,6 +987,9 @@ extern void AtEOSubXact_PgStat(bool isCommit, int nestDepth);
 extern void AtPrepare_PgStat(void);
 extern void PostPrepare_PgStat(void);
 extern void pgstat_clear_snapshot(void);
+struct xl_xact_stats_item;
+extern int	pgstat_get_transactional_drops(bool isCommit, struct xl_xact_stats_item **items);
+extern void pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_item *items, bool is_redo);
 
 /* Functions called from backends */
 extern void pgstat_report_stat(bool force);
@@ -1057,6 +1060,9 @@ extern void pgstat_report_connect(Oid dboid);
  * Functions in pgstat_function.c
  */
 
+extern void pgstat_create_function(Oid proid);
+extern void pgstat_drop_function(Oid proid);
+
 struct FunctionCallInfoBaseData;
 extern void pgstat_init_function_usage(struct FunctionCallInfoBaseData *fcinfo,
 									   PgStat_FunctionCallUsage *fcu);
@@ -1070,6 +1076,8 @@ extern PgStat_BackendFunctionEntry *find_funcstat_entry(Oid func_id);
  * Functions in pgstat_relation.c
  */
 
+extern void pgstat_create_relation(Relation rel);
+extern void pgstat_drop_relation(Relation rel);
 extern void pgstat_copy_relation_stats(Relation dstrel, Relation srcrel);
 
 extern void pgstat_relation_init(Relation rel);
@@ -1143,8 +1151,8 @@ extern PgStat_TableStatus *find_tabstat_entry(Oid rel_id);
 extern void pgstat_reset_replslot_counters(void);
 extern void pgstat_reset_replslot_counter(const char *name);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
-extern void pgstat_report_replslot_create(const char *slotname);
-extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_create_replslot(const char *slotname);
+extern void pgstat_drop_replslot(const char *slotname);
 
 
 /*
@@ -1170,7 +1178,8 @@ extern int	pgstat_slru_index(const char *name);
 extern void pgstat_reset_subscription_counters(void);
 extern void pgstat_reset_subscription_counter(Oid subid);
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
-extern void pgstat_report_subscription_drop(Oid subid);
+extern void pgstat_create_subscription(Oid subid);
+extern void pgstat_drop_subscription(Oid subid);
 
 
 /*
diff --git a/src/include/utils/pgstat_internal.h b/src/include/utils/pgstat_internal.h
index 7f4d79470f7..16e99afa885 100644
--- a/src/include/utils/pgstat_internal.h
+++ b/src/include/utils/pgstat_internal.h
@@ -42,6 +42,16 @@ typedef struct PgStat_SubXactStatus
 
 	struct PgStat_SubXactStatus *prev;	/* higher-level subxact if any */
 
+	/*
+	 * Dropping the statistics for objects that dropped transactionally itself
+	 * needs to be transactional. Therefore we collect the stats dropped in
+	 * the current (sub-)transaction and only execute the stats drop when we
+	 * know if the transaction commits/aborts. To handle replicas and crashes,
+	 * stats drops are included in commit records.
+	 */
+	dlist_head	pending_drops;
+	int			pending_drops_count;
+
 	/*
 	 * Tuple insertion/deletion counts for an open transaction can't be
 	 * propagated into PgStat_TableStatus counters until we know if it is
@@ -134,6 +144,9 @@ extern bool pgstat_wal_pending(void);
  */
 
 extern PgStat_SubXactStatus *pgstat_xact_stack_level_get(int nest_level);
+extern void pgstat_drop_transactional(PgStat_Kind kind, Oid dboid, Oid objoid);
+extern void pgstat_create_transactional(PgStat_Kind kind, Oid dboid, Oid objoid);
+
 
 /*
  * Variables in pgstat.c
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 025d556f6ce..d3f625d0726 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -84,6 +84,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 		data += xl_relfilenodes->nrels * sizeof(RelFileNode);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+	{
+		xl_xact_stats_items *xl_drops = (xl_xact_stats_items *) data;
+
+		parsed->nstats = xl_drops->nitems;
+		parsed->stats = xl_drops->items;
+
+		data += MinSizeOfXactStatsItems;
+		data += xl_drops->nitems * sizeof(xl_xact_stats_item);
+	}
+
 	if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
 	{
 		xl_xact_invals *xl_invals = (xl_xact_invals *) data;
@@ -179,6 +190,17 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		data += xl_relfilenodes->nrels * sizeof(RelFileNode);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+	{
+		xl_xact_stats_items *xl_drops = (xl_xact_stats_items *) data;
+
+		parsed->nstats = xl_drops->nitems;
+		parsed->stats = xl_drops->items;
+
+		data += MinSizeOfXactStatsItems;
+		data += xl_drops->nitems * sizeof(xl_xact_stats_item);
+	}
+
 	if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
 	{
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
@@ -244,6 +266,12 @@ ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parsed_prepare *p
 	parsed->abortnodes = (RelFileNode *) bufptr;
 	bufptr += MAXALIGN(xlrec->nabortrels * sizeof(RelFileNode));
 
+	parsed->stats = (xl_xact_stats_item *) bufptr;
+	bufptr += MAXALIGN(xlrec->ncommitstats * sizeof(xl_xact_stats_item));
+
+	parsed->abortstats = (xl_xact_stats_item *) bufptr;
+	bufptr += MAXALIGN(xlrec->nabortstats * sizeof(xl_xact_stats_item));
+
 	parsed->msgs = (SharedInvalidationMessage *) bufptr;
 	bufptr += MAXALIGN(xlrec->ninvalmsgs * sizeof(SharedInvalidationMessage));
 }
@@ -280,6 +308,25 @@ xact_desc_subxacts(StringInfo buf, int nsubxacts, TransactionId *subxacts)
 	}
 }
 
+static void
+xact_desc_stats(StringInfo buf, const char *label,
+				int ndropped, xl_xact_stats_item *dropped_stats)
+{
+	int			i;
+
+	if (ndropped > 0)
+	{
+		appendStringInfo(buf, "; %sdropped stats:", label);
+		for (i = 0; i < ndropped; i++)
+		{
+			appendStringInfo(buf, " %u/%u/%u",
+							 dropped_stats[i].kind,
+							 dropped_stats[i].dboid,
+							 dropped_stats[i].objoid);
+		}
+	}
+}
+
 static void
 xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
 {
@@ -295,6 +342,7 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
 
 	xact_desc_relations(buf, "rels", parsed.nrels, parsed.xnodes);
 	xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
+	xact_desc_stats(buf, "", parsed.nstats, parsed.stats);
 
 	standby_desc_invalidations(buf, parsed.nmsgs, parsed.msgs, parsed.dbId,
 							   parsed.tsId,
@@ -338,6 +386,8 @@ xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, RepOriginId or
 						 LSN_FORMAT_ARGS(parsed.origin_lsn),
 						 timestamptz_to_str(parsed.origin_timestamp));
 	}
+
+	xact_desc_stats(buf, "", parsed.nstats, parsed.stats);
 }
 
 static void
@@ -353,6 +403,8 @@ xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, RepOriginI
 	xact_desc_relations(buf, "rels(commit)", parsed.nrels, parsed.xnodes);
 	xact_desc_relations(buf, "rels(abort)", parsed.nabortrels,
 						parsed.abortnodes);
+	xact_desc_stats(buf, "commit ", parsed.nstats, parsed.stats);
+	xact_desc_stats(buf, "abort ", parsed.nabortstats, parsed.abortstats);
 	xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
 
 	standby_desc_invalidations(buf, parsed.nmsgs, parsed.msgs, parsed.dbId,
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 4dc8ccc12b9..b35da6f1aad 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -205,6 +205,8 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
 											TransactionId *children,
 											int nrels,
 											RelFileNode *rels,
+											int nstats,
+											xl_xact_stats_item *stats,
 											int ninvalmsgs,
 											SharedInvalidationMessage *invalmsgs,
 											bool initfileinval,
@@ -214,6 +216,8 @@ static void RecordTransactionAbortPrepared(TransactionId xid,
 										   TransactionId *children,
 										   int nrels,
 										   RelFileNode *rels,
+										   int nstats,
+										   xl_xact_stats_item *stats,
 										   const char *gid);
 static void ProcessRecords(char *bufptr, TransactionId xid,
 						   const TwoPhaseCallback callbacks[]);
@@ -1046,6 +1050,8 @@ StartPrepare(GlobalTransaction gxact)
 	TransactionId *children;
 	RelFileNode *commitrels;
 	RelFileNode *abortrels;
+	xl_xact_stats_item *abortstats = NULL;
+	xl_xact_stats_item *commitstats = NULL;
 	SharedInvalidationMessage *invalmsgs;
 
 	/* Initialize linked list */
@@ -1071,6 +1077,10 @@ StartPrepare(GlobalTransaction gxact)
 	hdr.nsubxacts = xactGetCommittedChildren(&children);
 	hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
 	hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
+	hdr.ncommitstats =
+		pgstat_get_transactional_drops(true, &commitstats);
+	hdr.nabortstats =
+		pgstat_get_transactional_drops(false, &abortstats);
 	hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
 														  &hdr.initfileinval);
 	hdr.gidlen = strlen(gxact->gid) + 1;	/* Include '\0' */
@@ -1101,6 +1111,18 @@ StartPrepare(GlobalTransaction gxact)
 		save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
 		pfree(abortrels);
 	}
+	if (hdr.ncommitstats > 0)
+	{
+		save_state_data(commitstats,
+						hdr.ncommitstats * sizeof(xl_xact_stats_item));
+		pfree(commitstats);
+	}
+	if (hdr.nabortstats > 0)
+	{
+		save_state_data(abortstats,
+						hdr.nabortstats	* sizeof(xl_xact_stats_item));
+		pfree(abortstats);
+	}
 	if (hdr.ninvalmsgs > 0)
 	{
 		save_state_data(invalmsgs,
@@ -1472,6 +1494,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	RelFileNode *abortrels;
 	RelFileNode *delrels;
 	int			ndelrels;
+	xl_xact_stats_item *commitstats;
+	xl_xact_stats_item *abortstats;
 	SharedInvalidationMessage *invalmsgs;
 
 	/*
@@ -1506,6 +1530,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
 	abortrels = (RelFileNode *) bufptr;
 	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+	commitstats = (xl_xact_stats_item*) bufptr;
+	bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
+	abortstats = (xl_xact_stats_item*) bufptr;
+	bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
 	invalmsgs = (SharedInvalidationMessage *) bufptr;
 	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 
@@ -1527,12 +1555,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 		RecordTransactionCommitPrepared(xid,
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
+										hdr->ncommitstats,
+										commitstats,
 										hdr->ninvalmsgs, invalmsgs,
 										hdr->initfileinval, gid);
 	else
 		RecordTransactionAbortPrepared(xid,
 									   hdr->nsubxacts, children,
 									   hdr->nabortrels, abortrels,
+									   hdr->nabortstats,
+									   abortstats,
 									   gid);
 
 	ProcArrayRemove(proc, latestXid);
@@ -1568,6 +1600,11 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	/* Make sure files supposed to be dropped are dropped */
 	DropRelationFiles(delrels, ndelrels, false);
 
+	if (isCommit)
+		pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
+	else
+		pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
+
 	/*
 	 * Handle cache invalidation messages.
 	 *
@@ -2066,6 +2103,8 @@ RecoverPreparedTransactions(void)
 		bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
 		bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
 		bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+		bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
+		bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
 		bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 
 		/*
@@ -2248,6 +2287,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								TransactionId *children,
 								int nrels,
 								RelFileNode *rels,
+								int nstats,
+								xl_xact_stats_item *stats,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
 								bool initfileinval,
@@ -2277,6 +2318,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 */
 	recptr = XactLogCommitRecord(committs,
 								 nchildren, children, nrels, rels,
+								 nstats, stats,
 								 ninvalmsgs, invalmsgs,
 								 initfileinval,
 								 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
@@ -2343,6 +2385,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 							   TransactionId *children,
 							   int nrels,
 							   RelFileNode *rels,
+							   int nstats,
+							   xl_xact_stats_item *stats,
 							   const char *gid)
 {
 	XLogRecPtr	recptr;
@@ -2373,6 +2417,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	recptr = XactLogAbortRecord(GetCurrentTimestamp(),
 								nchildren, children,
 								nrels, rels,
+								nstats, stats,
 								MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
 								xid, gid);
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3596a7d7345..c3e7f605765 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1285,6 +1285,8 @@ RecordTransactionCommit(void)
 	RelFileNode *rels;
 	int			nchildren;
 	TransactionId *children;
+	int			ndroppedstats = 0;
+	xl_xact_stats_item *droppedstats = NULL;
 	int			nmsgs = 0;
 	SharedInvalidationMessage *invalMessages = NULL;
 	bool		RelcacheInitFileInval = false;
@@ -1303,6 +1305,7 @@ RecordTransactionCommit(void)
 	/* Get data needed for commit record */
 	nrels = smgrGetPendingDeletes(true, &rels);
 	nchildren = xactGetCommittedChildren(&children);
+	ndroppedstats = pgstat_get_transactional_drops(true, &droppedstats);
 	if (XLogStandbyInfoActive())
 		nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
 													 &RelcacheInitFileInval);
@@ -1317,10 +1320,12 @@ RecordTransactionCommit(void)
 		/*
 		 * We expect that every RelationDropStorage is followed by a catalog
 		 * update, and hence XID assignment, so we shouldn't get here with any
-		 * pending deletes.  Use a real test not just an Assert to check this,
-		 * since it's a bit fragile.
+		 * pending deletes. Same is true for dropping stats.
+		 *
+		 * Use a real test not just an Assert to check this, since it's a bit
+		 * fragile.
 		 */
-		if (nrels != 0)
+		if (nrels != 0 || ndroppedstats != 0)
 			elog(ERROR, "cannot commit a transaction that deleted files but has no xid");
 
 		/* Can't have child XIDs either; AssignTransactionId enforces this */
@@ -1395,6 +1400,7 @@ RecordTransactionCommit(void)
 
 		XactLogCommitRecord(xactStopTimestamp,
 							nchildren, children, nrels, rels,
+							ndroppedstats, droppedstats,
 							nmsgs, invalMessages,
 							RelcacheInitFileInval,
 							MyXactFlags,
@@ -1698,6 +1704,8 @@ RecordTransactionAbort(bool isSubXact)
 	TransactionId latestXid;
 	int			nrels;
 	RelFileNode *rels;
+	int			ndroppedstats = 0;
+	xl_xact_stats_item *droppedstats = NULL;
 	int			nchildren;
 	TransactionId *children;
 	TimestampTz xact_time;
@@ -1734,6 +1742,7 @@ RecordTransactionAbort(bool isSubXact)
 	/* Fetch the data we need for the abort record */
 	nrels = smgrGetPendingDeletes(false, &rels);
 	nchildren = xactGetCommittedChildren(&children);
+	ndroppedstats = pgstat_get_transactional_drops(false, &droppedstats);
 
 	/* XXX do we really need a critical section here? */
 	START_CRIT_SECTION();
@@ -1750,6 +1759,7 @@ RecordTransactionAbort(bool isSubXact)
 	XactLogAbortRecord(xact_time,
 					   nchildren, children,
 					   nrels, rels,
+					   ndroppedstats, droppedstats,
 					   MyXactFlags, InvalidTransactionId,
 					   NULL);
 
@@ -5573,6 +5583,7 @@ XLogRecPtr
 XactLogCommitRecord(TimestampTz commit_time,
 					int nsubxacts, TransactionId *subxacts,
 					int nrels, RelFileNode *rels,
+					int ndroppedstats, xl_xact_stats_item *droppedstats,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval,
 					int xactflags, TransactionId twophase_xid,
@@ -5583,6 +5594,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_dbinfo xl_dbinfo;
 	xl_xact_subxacts xl_subxacts;
 	xl_xact_relfilenodes xl_relfilenodes;
+	xl_xact_stats_items xl_dropped_stats;
 	xl_xact_invals xl_invals;
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
@@ -5640,6 +5652,12 @@ XactLogCommitRecord(TimestampTz commit_time,
 		info |= XLR_SPECIAL_REL_UPDATE;
 	}
 
+	if (ndroppedstats > 0)
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
+		xl_dropped_stats.nitems = ndroppedstats;
+	}
+
 	if (nmsgs > 0)
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
@@ -5696,6 +5714,14 @@ XactLogCommitRecord(TimestampTz commit_time,
 						 nrels * sizeof(RelFileNode));
 	}
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+	{
+		XLogRegisterData((char *) (&xl_dropped_stats),
+						 MinSizeOfXactStatsItems);
+		XLogRegisterData((char *) droppedstats,
+						 ndroppedstats * sizeof(xl_xact_stats_item));
+	}
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS)
 	{
 		XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
@@ -5729,6 +5755,7 @@ XLogRecPtr
 XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
+				   int ndroppedstats, xl_xact_stats_item *droppedstats,
 				   int xactflags, TransactionId twophase_xid,
 				   const char *twophase_gid)
 {
@@ -5736,6 +5763,7 @@ XactLogAbortRecord(TimestampTz abort_time,
 	xl_xact_xinfo xl_xinfo;
 	xl_xact_subxacts xl_subxacts;
 	xl_xact_relfilenodes xl_relfilenodes;
+	xl_xact_stats_items xl_dropped_stats;
 	xl_xact_twophase xl_twophase;
 	xl_xact_dbinfo xl_dbinfo;
 	xl_xact_origin xl_origin;
@@ -5773,6 +5801,12 @@ XactLogAbortRecord(TimestampTz abort_time,
 		info |= XLR_SPECIAL_REL_UPDATE;
 	}
 
+	if (ndroppedstats > 0)
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
+		xl_dropped_stats.nitems = ndroppedstats;
+	}
+
 	if (TransactionIdIsValid(twophase_xid))
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
@@ -5834,6 +5868,14 @@ XactLogAbortRecord(TimestampTz abort_time,
 						 nrels * sizeof(RelFileNode));
 	}
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+	{
+		XLogRegisterData((char *) (&xl_dropped_stats),
+						 MinSizeOfXactStatsItems);
+		XLogRegisterData((char *) droppedstats,
+						 ndroppedstats * sizeof(xl_xact_stats_item));
+	}
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
 	{
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
@@ -5967,6 +6009,14 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
 		DropRelationFiles(parsed->xnodes, parsed->nrels, true);
 	}
 
+	if (parsed->nstats > 0)
+	{
+		/* see equivalent call for relations above */
+		XLogFlush(lsn);
+
+		pgstat_execute_transactional_drops(parsed->nstats, parsed->stats, true);
+	}
+
 	/*
 	 * We issue an XLogFlush() for the same reason we emit ForceSyncCommit()
 	 * in normal operation. For example, in CREATE DATABASE, we copy all files
@@ -6069,6 +6119,14 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
 
 		DropRelationFiles(parsed->xnodes, parsed->nrels, true);
 	}
+
+	if (parsed->nstats > 0)
+	{
+		/* see equivalent call for relations above */
+		XLogFlush(lsn);
+
+		pgstat_execute_transactional_drops(parsed->nstats, parsed->stats, true);
+	}
 }
 
 void
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 6eb78a9c0ff..9b512ccd3c0 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -64,6 +64,7 @@
 #include "parser/parse_relation.h"
 #include "parser/parsetree.h"
 #include "partitioning/partdesc.h"
+#include "pgstat.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "utils/builtins.h"
@@ -1475,6 +1476,9 @@ heap_create_with_catalog(const char *relname,
 	if (oncommit != ONCOMMIT_NOOP)
 		register_on_commit_action(relid, oncommit);
 
+	/* ensure that stats are dropped if transaction aborts */
+	pgstat_create_relation(new_rel_desc);
+
 	/*
 	 * ok, the relation has been cataloged, so close our relations and return
 	 * the OID of the newly created relation.
@@ -1851,6 +1855,9 @@ heap_drop_with_catalog(Oid relid)
 	if (RELKIND_HAS_STORAGE(rel->rd_rel->relkind))
 		RelationDropStorage(rel);
 
+	/* ensure that stats are dropped if transaction commits */
+	pgstat_drop_relation(rel);
+
 	/*
 	 * Close relcache entry, but *keep* AccessExclusiveLock on the relation
 	 * until transaction commit.  This ensures no one else will try to do
diff --git a/src/backend/catalog/pg_proc.c b/src/backend/catalog/pg_proc.c
index ac8aacbd591..d82221fdb8f 100644
--- a/src/backend/catalog/pg_proc.c
+++ b/src/backend/catalog/pg_proc.c
@@ -35,6 +35,7 @@
 #include "parser/analyze.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_type.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "tcop/pquery.h"
 #include "tcop/tcopprot.h"
@@ -709,6 +710,10 @@ ProcedureCreate(const char *procedureName,
 			AtEOXact_GUC(true, save_nestlevel);
 	}
 
+	/* ensure that stats are dropped if transaction commits */
+	if (!is_update)
+		pgstat_create_function(retval);
+
 	return myself;
 }
 
diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c
index 25b75375a8e..91f02a7eb2c 100644
--- a/src/backend/commands/functioncmds.c
+++ b/src/backend/commands/functioncmds.c
@@ -1325,6 +1325,8 @@ RemoveFunctionById(Oid funcOid)
 
 	table_close(relation, RowExclusiveLock);
 
+	pgstat_drop_function(funcOid);
+
 	/*
 	 * If there's a pg_aggregate tuple, delete that too.
 	 */
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 51505373ea4..83192dbd51f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -738,6 +738,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
+	pgstat_create_subscription(subid);
+
 	if (opts.enabled)
 		ApplyLauncherWakeupAtCommit();
 
@@ -1592,7 +1594,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * gets lost.
 	 */
 	if (slotname)
-		pgstat_report_subscription_drop(subid);
+		pgstat_drop_subscription(subid);
 
 	table_close(rel, NoLock);
 }
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index d20b6058976..f44c6e4038e 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -862,7 +862,7 @@ pgstat_vacuum_stat(void)
 			CHECK_FOR_INTERRUPTS();
 
 			if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
-				pgstat_report_replslot_drop(NameStr(slotentry->slotname));
+				pgstat_drop_replslot(NameStr(slotentry->slotname));
 		}
 	}
 
@@ -886,7 +886,7 @@ pgstat_vacuum_stat(void)
 			CHECK_FOR_INTERRUPTS();
 
 			if (hash_search(htab, (void *) &(subentry->subid), HASH_FIND, NULL) == NULL)
-				pgstat_report_subscription_drop(subentry->subid);
+				pgstat_drop_subscription(subentry->subid);
 		}
 
 		hash_destroy(htab);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index ed4c8b3ad55..b02571e4dd8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -356,7 +356,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * ReplicationSlotAllocationLock.
 	 */
 	if (SlotIsLogical(slot))
-		pgstat_report_replslot_create(NameStr(slot->data.name));
+		pgstat_create_replslot(NameStr(slot->data.name));
 
 	/*
 	 * Now that the slot has been marked as in_use and active, it's safe to
@@ -746,7 +746,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	 * doesn't seem worth doing as in practice this won't happen frequently.
 	 */
 	if (SlotIsLogical(slot))
-		pgstat_report_replslot_drop(NameStr(slot->data.name));
+		pgstat_drop_replslot(NameStr(slot->data.name));
 
 	/*
 	 * We release this at the very end, so that nobody starts trying to create
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index d71a557a352..2c7a2b28572 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -415,11 +415,6 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
 			smgrsw[which].smgr_close(rels[i], forknum);
 	}
 
-	/*
-	 * It'd be nice to tell the stats collector to forget them immediately,
-	 * too. But we can't because we don't know the OIDs.
-	 */
-
 	/*
 	 * Send a shared-inval message to force other backends to close any
 	 * dangling smgr references they may have for these rels.  We should do
diff --git a/src/backend/utils/activity/pgstat_function.c b/src/backend/utils/activity/pgstat_function.c
index 93ec29757aa..ad9879afb2a 100644
--- a/src/backend/utils/activity/pgstat_function.c
+++ b/src/backend/utils/activity/pgstat_function.c
@@ -48,6 +48,28 @@ static HTAB *pgStatFunctions = NULL;
 static instr_time total_func_time;
 
 
+/*
+ * Ensure that stats are dropped if transaction aborts.
+ */
+void
+pgstat_create_function(Oid proid)
+{
+	pgstat_create_transactional(PGSTAT_KIND_FUNCTION,
+								MyDatabaseId,
+								proid);
+}
+
+/*
+ * Ensure that stats are dropped if transaction commits.
+ */
+void
+pgstat_drop_function(Oid proid)
+{
+	pgstat_drop_transactional(PGSTAT_KIND_FUNCTION,
+							  MyDatabaseId,
+							  proid);
+}
+
 /*
  * Initialize function call usage data.
  * Called by the executor before invoking a function.
diff --git a/src/backend/utils/activity/pgstat_relation.c b/src/backend/utils/activity/pgstat_relation.c
index b0ac406e5fd..53080eadb89 100644
--- a/src/backend/utils/activity/pgstat_relation.c
+++ b/src/backend/utils/activity/pgstat_relation.c
@@ -171,33 +171,26 @@ pgstat_relation_init(Relation rel)
 }
 
 /*
- * Tell the collector that we just dropped a relation.
- * (If the message gets lost, we will still clean the dead entry eventually
- * via future invocations of pgstat_vacuum_stat().)
- *
- * Currently not used for lack of any good place to call it; we rely
- * entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
+ * Ensure that stats are dropped if transaction aborts.
  */
-#ifdef NOT_USED
 void
-pgstat_drop_relation(Oid relid)
+pgstat_create_relation(Relation rel)
 {
-	PgStat_MsgTabpurge msg;
-	int			len;
-
-	if (pgStatSock == PGINVALID_SOCKET)
-		return;
-
-	msg.m_tableid[0] = relid;
-	msg.m_nentries = 1;
-
-	len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
-
-	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
-	msg.m_databaseid = MyDatabaseId;
-	pgstat_send(&msg, len);
+	pgstat_create_transactional(PGSTAT_KIND_RELATION,
+								rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId,
+								RelationGetRelid(rel));
+}
+
+/*
+ * Ensure that stats are dropped if transaction commits.
+ */
+void
+pgstat_drop_relation(Relation rel)
+{
+	pgstat_drop_transactional(PGSTAT_KIND_RELATION,
+							  rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId,
+							  RelationGetRelid(rel));
 }
-#endif							/* NOT_USED */
 
 /*
  * Called from autovacuum.c to report startup of an autovacuum process.
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index 1c197f79a90..8d64ecd8aaf 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -111,7 +111,7 @@ pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
  * Report replication slot creation.
  */
 void
-pgstat_report_replslot_create(const char *slotname)
+pgstat_create_replslot(const char *slotname)
 {
 	PgStat_MsgReplSlot msg;
 
@@ -126,7 +126,7 @@ pgstat_report_replslot_create(const char *slotname)
  * Report replication slot drop.
  */
 void
-pgstat_report_replslot_drop(const char *slotname)
+pgstat_drop_replslot(const char *slotname)
 {
 	PgStat_MsgReplSlot msg;
 
diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c
index 7e82bb16a19..d1d8c0ecfd2 100644
--- a/src/backend/utils/activity/pgstat_subscription.c
+++ b/src/backend/utils/activity/pgstat_subscription.c
@@ -70,14 +70,31 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
 }
 
 /*
- * Report dropping the subscription.
+ * Report creating the subscription.
+ *
+ * Ensures that stats are dropped if transaction rolls back.
  */
 void
-pgstat_report_subscription_drop(Oid subid)
+pgstat_create_subscription(Oid subid)
+{
+	pgstat_create_transactional(PGSTAT_KIND_SUBSCRIPTION,
+								InvalidOid, subid);
+}
+
+/*
+ * Report dropping the subscription.
+ *
+ * Ensures that stats are dropped if transaction commits.
+ */
+void
+pgstat_drop_subscription(Oid subid)
 {
 	PgStat_MsgSubscriptionDrop msg;
 
 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONDROP);
 	msg.m_subid = subid;
 	pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionDrop));
+
+	pgstat_drop_transactional(PGSTAT_KIND_SUBSCRIPTION,
+							  InvalidOid, subid);
 }
diff --git a/src/backend/utils/activity/pgstat_xact.c b/src/backend/utils/activity/pgstat_xact.c
index 17907e32789..5d3c184efb2 100644
--- a/src/backend/utils/activity/pgstat_xact.c
+++ b/src/backend/utils/activity/pgstat_xact.c
@@ -19,6 +19,18 @@
 #include "utils/pgstat_internal.h"
 
 
+typedef struct PgStat_PendingDroppedStatsItem
+{
+	xl_xact_stats_item item;
+	bool		is_create;
+	dlist_node	node;
+} PgStat_PendingDroppedStatsItem;
+
+
+static void AtEOXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state, bool isCommit);
+static void AtEOSubXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state,
+											bool isCommit, int nestDepth);
+
 static PgStat_SubXactStatus *pgStatXactStack = NULL;
 
 
@@ -40,6 +52,7 @@ AtEOXact_PgStat(bool isCommit, bool parallel)
 		Assert(xact_state->prev == NULL);
 
 		AtEOXact_PgStat_Relations(xact_state, isCommit);
+		AtEOXact_PgStat_DroppedStats(xact_state, isCommit);
 	}
 	pgStatXactStack = NULL;
 
@@ -47,6 +60,49 @@ AtEOXact_PgStat(bool isCommit, bool parallel)
 	pgstat_clear_snapshot();
 }
 
+/*
+ * When committing, drop stats for objects dropped in the transaction. When
+ * aborting, drop stats for objects created in the transaction.
+ */
+static void
+AtEOXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state, bool isCommit)
+{
+	dlist_mutable_iter iter;
+
+	if (xact_state->pending_drops_count == 0)
+	{
+		Assert(dlist_is_empty(&xact_state->pending_drops));
+		return;
+	}
+
+	dlist_foreach_modify(iter, &xact_state->pending_drops)
+	{
+		PgStat_PendingDroppedStatsItem *pending =
+		dlist_container(PgStat_PendingDroppedStatsItem, node, iter.cur);
+
+		if (isCommit && !pending->is_create)
+		{
+			/*
+			 * Transaction that dropped an object committed. Drop the stats
+			 * too.
+			 */
+			/* will do work in subsequent commit */
+		}
+		else if (!isCommit && pending->is_create)
+		{
+			/*
+			 * Transaction that created an object aborted. Drop the stats
+			 * associated with the object.
+			 */
+			/* will do work in subsequent commit */
+		}
+
+		dlist_delete(&pending->node);
+		xact_state->pending_drops_count--;
+		pfree(pending);
+	}
+}
+
 /*
  * Called from access/transam/xact.c at subtransaction commit/abort.
  */
@@ -64,11 +120,63 @@ AtEOSubXact_PgStat(bool isCommit, int nestDepth)
 		pgStatXactStack = xact_state->prev;
 
 		AtEOSubXact_PgStat_Relations(xact_state, isCommit, nestDepth);
+		AtEOSubXact_PgStat_DroppedStats(xact_state, isCommit, nestDepth);
 
 		pfree(xact_state);
 	}
 }
 
+/*
+ * Like AtEOXact_PgStat_DroppedStats(), but for subtransactions.
+ */
+static void
+AtEOSubXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state,
+								bool isCommit, int nestDepth)
+{
+	PgStat_SubXactStatus *parent_xact_state;
+	dlist_mutable_iter iter;
+
+	if (xact_state->pending_drops_count == 0)
+		return;
+
+	parent_xact_state = pgstat_xact_stack_level_get(nestDepth - 1);
+
+	dlist_foreach_modify(iter, &xact_state->pending_drops)
+	{
+		PgStat_PendingDroppedStatsItem *pending =
+		dlist_container(PgStat_PendingDroppedStatsItem, node, iter.cur);
+
+		dlist_delete(&pending->node);
+		xact_state->pending_drops_count--;
+
+		if (!isCommit && pending->is_create)
+		{
+			/*
+			 * Subtransaction creating a new stats object aborted. Drop the
+			 * stats object.
+			 */
+			/* will do work in subsequent commit */
+			pfree(pending);
+		}
+		else if (isCommit)
+		{
+			/*
+			 * Subtransaction dropping a stats object committed. Can't yet
+			 * remove the stats object, the surrounding transaction might
+			 * still abort. Pass it on to the parent.
+			 */
+			dlist_push_tail(&parent_xact_state->pending_drops, &pending->node);
+			parent_xact_state->pending_drops_count++;
+		}
+		else
+		{
+			pfree(pending);
+		}
+	}
+
+	Assert(xact_state->pending_drops_count == 0);
+}
+
 /*
  * Save the transactional stats state at 2PC transaction prepare.
  */
@@ -130,6 +238,8 @@ pgstat_xact_stack_level_get(int nest_level)
 		xact_state = (PgStat_SubXactStatus *)
 			MemoryContextAlloc(TopTransactionContext,
 							   sizeof(PgStat_SubXactStatus));
+		dlist_init(&xact_state->pending_drops);
+		xact_state->pending_drops_count = 0;
 		xact_state->nest_level = nest_level;
 		xact_state->prev = pgStatXactStack;
 		xact_state->first = NULL;
@@ -137,3 +247,113 @@ pgstat_xact_stack_level_get(int nest_level)
 	}
 	return xact_state;
 }
+
+/*
+ * Get stat items that need to be dropped at commit / abort.
+ *
+ * When committing, stats for objects that have been dropped in the
+ * transaction are returned. When aborting, stats for newly created objects is
+ * returned.
+ *
+ * Used by COMMIT / ABORT and 2PC PREPARE processing when building their
+ * respective WAL records, to ensure stats are dropped in case of a crash / on
+ * standbys.
+ */
+int
+pgstat_get_transactional_drops(bool isCommit, xl_xact_stats_item **items)
+{
+	PgStat_SubXactStatus *xact_state = pgStatXactStack;
+	int			nitems = 0;
+	dlist_iter	iter;
+
+	if (xact_state == NULL)
+		return 0;
+
+	/*
+	 * We expect to be called for subtransaction abort (which logs a WAL
+	 * record), but not for subtransaction commit (which doesn't).
+	 */
+	Assert(!isCommit || xact_state->nest_level == 1);
+	Assert(!isCommit || xact_state->prev == NULL);
+
+	*items = palloc(xact_state->pending_drops_count
+					* sizeof(PgStat_PendingDroppedStatsItem));
+
+	dlist_foreach(iter, &xact_state->pending_drops)
+	{
+		PgStat_PendingDroppedStatsItem *pending =
+		dlist_container(PgStat_PendingDroppedStatsItem, node, iter.cur);
+
+		if (isCommit && pending->is_create)
+			continue;
+		if (!isCommit && !pending->is_create)
+			continue;
+
+		Assert(nitems < xact_state->pending_drops_count);
+		(*items)[nitems++] = pending->item;
+	}
+
+	return nitems;
+}
+
+/*
+ * Execute scheduled drops post-commit. Called from xact_redo_commit() /
+ * xact_redo_abort() during recovery, and from FinishPreparedTransaction()
+ * during normal 2PC COMMIT/ABORT PREPARED processing.
+ */
+void
+pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_item *items, bool is_redo)
+{
+	if (ndrops == 0)
+		return;
+
+	for (int i = 0; i < ndrops; i++)
+	{
+		/* will do work in subsequent commit */
+	}
+}
+
+static void
+create_drop_transactional_internal(PgStat_Kind kind, Oid dboid, Oid objoid, bool is_create)
+{
+	int			nest_level = GetCurrentTransactionNestLevel();
+	PgStat_SubXactStatus *xact_state;
+	PgStat_PendingDroppedStatsItem *drop = (PgStat_PendingDroppedStatsItem *)
+	MemoryContextAlloc(TopTransactionContext, sizeof(PgStat_PendingDroppedStatsItem));
+
+	xact_state = pgstat_xact_stack_level_get(nest_level);
+
+	drop->is_create = is_create;
+	drop->item.kind = kind;
+	drop->item.dboid = dboid;
+	drop->item.objoid = objoid;
+
+	dlist_push_tail(&xact_state->pending_drops, &drop->node);
+	xact_state->pending_drops_count++;
+}
+
+/*
+ * Create a stats entry for a newly created database object in a transactional
+ * manner.
+ *
+ * I.e. if the current (sub-)transaction aborts, the stats entry will also be
+ * dropped.
+ */
+void
+pgstat_create_transactional(PgStat_Kind kind, Oid dboid, Oid objoid)
+{
+	create_drop_transactional_internal(kind, dboid, objoid, /* create */ true);
+}
+
+/*
+ * Drop a stats entry for a just dropped database object in a transactional
+ * manner.
+ *
+ * I.e. if the current (sub-)transaction aborts, the stats entry will stay
+ * alive.
+ */
+void
+pgstat_drop_transactional(PgStat_Kind kind, Oid dboid, Oid objoid)
+{
+	create_drop_transactional_internal(kind, dboid, objoid, /* create */ false);
+}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 63988089505..dc38e16405d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1976,6 +1976,7 @@ PgStat_MsgTabstat
 PgStat_MsgTempFile
 PgStat_MsgVacuum
 PgStat_MsgWal
+PgStat_PendingDroppedStatsItem
 PgStat_SLRUStats
 PgStat_StatDBEntry
 PgStat_StatFuncEntry
@@ -3783,6 +3784,8 @@ xl_xact_parsed_commit
 xl_xact_parsed_prepare
 xl_xact_prepare
 xl_xact_relfilenodes
+xl_xact_stats_item
+xl_xact_stats_items
 xl_xact_subxacts
 xl_xact_twophase
 xl_xact_xinfo
-- 
2.35.1.677.gabf474a5dd

