From f3a1371656da372e26cb56bf543cbaa0c99720a8 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Mon, 23 Mar 2026 12:50:15 +0100
Subject: [PATCH] Reproduce filtering issue.

---
 contrib/test_decoding/expected/filtering.out |  74 ++++++++++++++
 contrib/test_decoding/specs/filtering.spec   | 101 +++++++++++++++++++
 src/backend/executor/nodeModifyTable.c       |   2 +
 src/backend/replication/logical/snapbuild.c  |   3 +
 src/test/isolation/isolationtester.c         |   9 +-
 5 files changed, 188 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/filtering.out
 create mode 100644 contrib/test_decoding/specs/filtering.spec

diff --git a/contrib/test_decoding/expected/filtering.out b/contrib/test_decoding/expected/filtering.out
new file mode 100644
index 00000000000..6ba9690509f
--- /dev/null
+++ b/contrib/test_decoding/expected/filtering.out
@@ -0,0 +1,74 @@
+Parsed test spec with 5 sessions
+
+starting permutation: s1_assign_xid s3_repack s2_assign_xid s1_rollback s4_insert s5_wakeup_snapbuild s2_rollback s5_wakeup_insert_speculative s4_insert_commit s5_wakeup_repack
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step s1_assign_xid: 
+    BEGIN;
+    CREATE TABLE c(i int);
+
+step s3_repack: 
+	REPACK (CONCURRENTLY) a;
+ <waiting ...>
+step s2_assign_xid: 
+    BEGIN;
+    CREATE TABLE d(i int);
+
+step s1_rollback: 
+    ROLLBACK;
+
+step s4_insert: 
+	BEGIN;
+	INSERT INTO t(i)
+	SELECT max(i) + 1 FROM t ON CONFLICT (i) DO UPDATE SET i=EXCLUDED.i;
+ <waiting ...>
+step s5_wakeup_snapbuild: 
+	SELECT injection_points_wakeup('snapbuild-full');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step s2_rollback: 
+    ROLLBACK;
+
+step s5_wakeup_insert_speculative: 
+	SELECT injection_points_wakeup('insert-speculative-before-confirm');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step s4_insert: <... completed>
+step s4_insert_commit: 
+	COMMIT;
+
+step s5_wakeup_repack: 
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step s3_repack: <... completed>
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
diff --git a/contrib/test_decoding/specs/filtering.spec b/contrib/test_decoding/specs/filtering.spec
new file mode 100644
index 00000000000..a02c9f8facb
--- /dev/null
+++ b/contrib/test_decoding/specs/filtering.spec
@@ -0,0 +1,101 @@
+setup
+{
+	CREATE TABLE a(i int primary key, j int) WITH (autovacuum_enabled = off);
+	INSERT INTO a(i, j) VALUES (1, 1), (2, 2);
+	CREATE TABLE t(i int primary key);
+	INSERT INTO t(i) VALUES (1);
+	CREATE EXTENSION injection_points;
+}
+
+session s1
+step s1_assign_xid
+{
+    BEGIN;
+    CREATE TABLE c(i int);
+}
+step s1_rollback
+{
+    ROLLBACK;
+}
+
+session s2
+step s2_assign_xid
+{
+    BEGIN;
+    CREATE TABLE d(i int);
+}
+step s2_rollback
+{
+    ROLLBACK;
+}
+
+session s3
+setup
+{
+	SELECT injection_points_attach('snapbuild-full', 'wait');
+	SELECT injection_points_attach('repack-concurrently-before-lock', 'wait');
+}
+step s3_repack
+{
+	REPACK (CONCURRENTLY) a;
+}
+teardown
+{
+	SELECT injection_points_detach('repack-concurrently-before-lock');
+	SELECT injection_points_detach('snapbuild-full');
+}
+
+session s4
+setup
+{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('insert-speculative-before-confirm', 'wait');
+}
+step s4_insert
+{
+	BEGIN;
+	INSERT INTO t(i)
+	SELECT max(i) + 1 FROM t ON CONFLICT (i) DO UPDATE SET i=EXCLUDED.i;
+}
+step s4_insert_commit
+{
+	COMMIT;
+}
+teardown
+{
+	SELECT injection_points_detach('insert-speculative-before-confirm');
+}
+
+session s5
+step s5_wakeup_snapbuild
+{
+	SELECT injection_points_wakeup('snapbuild-full');
+}
+step s5_wakeup_insert_speculative
+{
+	SELECT injection_points_wakeup('insert-speculative-before-confirm');
+}
+step s5_wakeup_repack
+{
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+}
+
+permutation
+# Bring the snapshot builder to the FULL_SNAPSHOT state.
+s1_assign_xid
+s3_repack
+s2_assign_xid
+s1_rollback
+# Perform the speculative insert, but no confirmation so far. The snapshot
+# builder should decode it.
+s4_insert
+# Let the snapshout builder achieve CONSISTENT state and finish the setup.
+s5_wakeup_snapbuild
+s2_rollback
+# While REPACK is waiting on repack-concurrently-before-lock, let the insert
+# get confirmed. Relation filtering is now enabled.
+s5_wakeup_insert_speculative
+s4_insert_commit
+# REPACK should now decode the speculative insert and decode the speculative
+# insert (with the confirmation record filtered out).
+s5_wakeup_repack
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 680c29f35d5..6d5482e5746 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1232,6 +1232,8 @@ ExecInsert(ModifyTableContext *context,
 												   slot, arbiterIndexes,
 												   &specConflict);
 
+			INJECTION_POINT("insert-speculative-before-confirm", NULL);
+
 			/* adjust the tuple's state accordingly */
 			table_tuple_complete_speculative(resultRelationDesc, slot,
 											 specToken, !specConflict);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index fbdd4600a2b..883e5b6e18f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -141,6 +141,7 @@
 #include "storage/procarray.h"
 #include "storage/standby.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/snapshot.h"
@@ -1390,6 +1391,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 		builder->state = SNAPBUILD_FULL_SNAPSHOT;
 		builder->next_phase_at = running->nextXid;
 
+		INJECTION_POINT("snapbuild-full", NULL);
+
 		ereport(LOG,
 				errmsg("logical decoding found initial consistent point at %X/%08X",
 					   LSN_FORMAT_ARGS(lsn)),
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index 440c875b8ac..8f17ee412c9 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -216,15 +216,22 @@ main(int argc, char **argv)
 	 * exactly expect concurrent use of test tables.  However, autovacuum will
 	 * occasionally take AccessExclusiveLock to truncate a table, and we must
 	 * ignore that transient wait.
+	 *
+	 * If the session's backend is blocked, and if its background worker is
+	 * waiting on an injection point, we assume that the injection point is
+	 * the reason for the backend to be blocked. That's what we check in the
+	 * second query of the UNION. XXX Should we use a separate query for that?
 	 */
 	initPQExpBuffer(&wait_query);
 	appendPQExpBufferStr(&wait_query,
+						 "WITH blocking(res) AS ("
 						 "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
 	/* The spec syntax requires at least one session; assume that here. */
 	appendPQExpBufferStr(&wait_query, conns[1].backend_pid_str);
 	for (i = 2; i < nconns; i++)
 		appendPQExpBuffer(&wait_query, ",%s", conns[i].backend_pid_str);
-	appendPQExpBufferStr(&wait_query, "}')");
+	appendPQExpBufferStr(&wait_query, "}') UNION "
+						 "SELECT pg_catalog.pg_isolation_test_session_is_blocked(pid, '{}') FROM pg_stat_activity WHERE leader_pid=$1) SELECT bool_or(res) FROM blocking");
 
 	res = PQprepare(conns[0].conn, PREP_WAITING, wait_query.data, 0, NULL);
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-- 
2.47.3

