From 13cdb2984f3983f3577b35807b35d9c7a5a1c699 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Tue, 17 Feb 2026 12:05:51 +0500
Subject: [PATCH v5b 1/6] amcheck: add indexallkeysmatch verification for
 B-Tree indexes

Add a new "indexallkeysmatch" option to bt_index_check() and
bt_index_parent_check() that verifies each index tuple points to a
heap tuple with the same key.  This is the reverse of the existing
"heapallindexed" check, which verifies every heap tuple is present
in the index.

The implementation uses a Bloom filter to amortize random heap
lookups.  A sequential heap scan first fingerprints all visible
(key, tid) pairs.  During the index scan, each leaf tuple (including
posting list entries) is probed against this filter.  Only when the
filter says "not present" do we perform an actual heap fetch and
key comparison via FormIndexDatum, reporting corruption if the keys
differ.

This check detects corruption where an index entry stores a
different key than the heap tuple it points to -- a scenario that
the existing heapallindexed and structural checks cannot catch,
particularly when it manifests within posting lists before it
develops into a detectable ordering violation.

Bump amcheck extension version to 1.6.
---
 contrib/amcheck/Makefile                      |   4 +-
 contrib/amcheck/amcheck--1.5--1.6.sql         |  21 ++
 contrib/amcheck/amcheck.control               |   2 +-
 contrib/amcheck/expected/check_btree.out      |  20 ++
 contrib/amcheck/meson.build                   |   2 +
 contrib/amcheck/sql/check_btree.sql           |   7 +
 .../t/007_verify_nbtree_indexallkeysmatch.pl  | 111 +++++++
 contrib/amcheck/verify_nbtree.c               | 310 +++++++++++++++---
 8 files changed, 437 insertions(+), 40 deletions(-)
 create mode 100644 contrib/amcheck/amcheck--1.5--1.6.sql
 create mode 100644 contrib/amcheck/t/007_verify_nbtree_indexallkeysmatch.pl

diff --git a/contrib/amcheck/Makefile b/contrib/amcheck/Makefile
index 1b7a63cbaa4..2c23109a200 100644
--- a/contrib/amcheck/Makefile
+++ b/contrib/amcheck/Makefile
@@ -10,12 +10,12 @@ OBJS = \
 
 EXTENSION = amcheck
 DATA = amcheck--1.2--1.3.sql amcheck--1.1--1.2.sql amcheck--1.0--1.1.sql amcheck--1.0.sql \
-		amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql
+		amcheck--1.3--1.4.sql amcheck--1.4--1.5.sql amcheck--1.5--1.6.sql
 PGFILEDESC = "amcheck - function for verifying relation integrity"
 
 REGRESS = check check_btree check_gin check_heap
 
-EXTRA_INSTALL = contrib/pg_walinspect
+EXTRA_INSTALL = contrib/pg_walinspect contrib/pg_surgery
 TAP_TESTS = 1
 
 ifdef USE_PGXS
diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql
new file mode 100644
index 00000000000..ac01f429d7d
--- /dev/null
+++ b/contrib/amcheck/amcheck--1.5--1.6.sql
@@ -0,0 +1,21 @@
+/* contrib/amcheck/amcheck--1.5--1.6.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "ALTER EXTENSION amcheck UPDATE TO '1.6'" to load this file. \quit
+
+-- Add indexallkeysmatch parameter to bt_index_check and bt_index_parent_check
+CREATE FUNCTION bt_index_check(index regclass,
+    heapallindexed boolean, checkunique boolean, indexallkeysmatch boolean)
+RETURNS VOID
+AS 'MODULE_PATHNAME', 'bt_index_check'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION bt_index_parent_check(index regclass,
+    heapallindexed boolean, rootdescend boolean, checkunique boolean,
+    indexallkeysmatch boolean)
+RETURNS VOID
+AS 'MODULE_PATHNAME', 'bt_index_parent_check'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+REVOKE ALL ON FUNCTION bt_index_check(regclass, boolean, boolean, boolean) FROM PUBLIC;
+REVOKE ALL ON FUNCTION bt_index_parent_check(regclass, boolean, boolean, boolean, boolean) FROM PUBLIC;
diff --git a/contrib/amcheck/amcheck.control b/contrib/amcheck/amcheck.control
index c8ba6d7c9bc..2f329ef2cf4 100644
--- a/contrib/amcheck/amcheck.control
+++ b/contrib/amcheck/amcheck.control
@@ -1,5 +1,5 @@
 # amcheck extension
 comment = 'functions for verifying relation integrity'
-default_version = '1.5'
+default_version = '1.6'
 module_pathname = '$libdir/amcheck'
 relocatable = true
diff --git a/contrib/amcheck/expected/check_btree.out b/contrib/amcheck/expected/check_btree.out
index 6558f2c5a4f..d4da803b355 100644
--- a/contrib/amcheck/expected/check_btree.out
+++ b/contrib/amcheck/expected/check_btree.out
@@ -232,6 +232,26 @@ SELECT bt_index_parent_check('bttest_b_idx', heapallindexed => true, rootdescend
  
 (1 row)
 
+-- indexallkeysmatch: verify each index tuple points to heap tuple with same key
+SELECT bt_index_check('bttest_a_idx', heapallindexed => false, checkunique => false, indexallkeysmatch => true);
+ bt_index_check 
+----------------
+ 
+(1 row)
+
+SELECT bt_index_parent_check('bttest_a_idx', heapallindexed => false, rootdescend => false, checkunique => false, indexallkeysmatch => true);
+ bt_index_parent_check 
+-----------------------
+ 
+(1 row)
+
+-- indexallkeysmatch on expression index (exercises FormIndexDatum/ii_ExpressionsState path)
+SELECT bt_index_check('bttest_a_expr_idx', heapallindexed => false, checkunique => false, indexallkeysmatch => true);
+ bt_index_check 
+----------------
+ 
+(1 row)
+
 -- Check that null values in an unique index are not treated as equal
 CREATE TABLE bttest_unique_nulls (a serial, b int, c int UNIQUE);
 INSERT INTO bttest_unique_nulls VALUES (generate_series(1, 10000), 2, default);
diff --git a/contrib/amcheck/meson.build b/contrib/amcheck/meson.build
index d5137ef691d..220b1ce1d59 100644
--- a/contrib/amcheck/meson.build
+++ b/contrib/amcheck/meson.build
@@ -27,6 +27,7 @@ install_data(
   'amcheck--1.2--1.3.sql',
   'amcheck--1.3--1.4.sql',
   'amcheck--1.4--1.5.sql',
+  'amcheck--1.5--1.6.sql',
   kwargs: contrib_data_args,
 )
 
@@ -50,6 +51,7 @@ tests += {
       't/004_verify_nbtree_unique.pl',
       't/005_pitr.pl',
       't/006_verify_gin.pl',
+      't/007_verify_nbtree_indexallkeysmatch.pl',
     ],
   },
 }
diff --git a/contrib/amcheck/sql/check_btree.sql b/contrib/amcheck/sql/check_btree.sql
index 171f7f691ec..bc5a76c0b59 100644
--- a/contrib/amcheck/sql/check_btree.sql
+++ b/contrib/amcheck/sql/check_btree.sql
@@ -148,6 +148,13 @@ SELECT bt_index_check('bttest_b_idx', heapallindexed => false, checkunique => tr
 SELECT bt_index_parent_check('bttest_a_idx', heapallindexed => true, rootdescend => true, checkunique => true);
 SELECT bt_index_parent_check('bttest_b_idx', heapallindexed => true, rootdescend => false, checkunique => true);
 
+-- indexallkeysmatch: verify each index tuple points to heap tuple with same key
+SELECT bt_index_check('bttest_a_idx', heapallindexed => false, checkunique => false, indexallkeysmatch => true);
+SELECT bt_index_parent_check('bttest_a_idx', heapallindexed => false, rootdescend => false, checkunique => false, indexallkeysmatch => true);
+
+-- indexallkeysmatch on expression index (exercises FormIndexDatum/ii_ExpressionsState path)
+SELECT bt_index_check('bttest_a_expr_idx', heapallindexed => false, checkunique => false, indexallkeysmatch => true);
+
 -- Check that null values in an unique index are not treated as equal
 CREATE TABLE bttest_unique_nulls (a serial, b int, c int UNIQUE);
 INSERT INTO bttest_unique_nulls VALUES (generate_series(1, 10000), 2, default);
diff --git a/contrib/amcheck/t/007_verify_nbtree_indexallkeysmatch.pl b/contrib/amcheck/t/007_verify_nbtree_indexallkeysmatch.pl
new file mode 100644
index 00000000000..caf2cbd93e7
--- /dev/null
+++ b/contrib/amcheck/t/007_verify_nbtree_indexallkeysmatch.pl
@@ -0,0 +1,111 @@
+
+# Copyright (c) 2026, PostgreSQL Global Development Group
+#
+# Test indexallkeysmatch verification: each index tuple must point to a heap
+# tuple with the same key.
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init;
+$node->append_conf('postgresql.conf', 'autovacuum=off');
+$node->start;
+
+$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
+
+#
+# Test 1: indexallkeysmatch on uncorrupted table (plain column)
+#
+$node->safe_psql('postgres', q(
+	CREATE TABLE idxall_plain (id int);
+	INSERT INTO idxall_plain SELECT generate_series(1, 1000);
+	CREATE INDEX idxall_plain_idx ON idxall_plain (id);
+));
+
+my $result = $node->safe_psql('postgres',
+	q(SELECT bt_index_check('idxall_plain_idx', false, false, true)));
+is($result, '', 'indexallkeysmatch passes on uncorrupted plain table');
+
+#
+# Test 2: indexallkeysmatch on uncorrupted table with deduplication
+#
+$node->safe_psql('postgres', q(
+	CREATE TABLE idxall_dedup (id int);
+	INSERT INTO idxall_dedup SELECT i % 10 FROM generate_series(1, 1000) i;
+	CREATE INDEX idxall_dedup_idx ON idxall_dedup (id)
+		WITH (deduplicate_items = on);
+));
+
+$result = $node->safe_psql('postgres',
+	q(SELECT bt_index_check('idxall_dedup_idx', false, false, true)));
+is($result, '', 'indexallkeysmatch passes on uncorrupted table with deduplication');
+
+#
+# Test 3: indexallkeysmatch on uncorrupted expression index
+#
+$node->safe_psql('postgres', q(
+	CREATE FUNCTION idxall_func(int) RETURNS int LANGUAGE sql IMMUTABLE AS
+	$$ SELECT $1; $$;
+
+	CREATE TABLE idxall_expr (id int);
+	INSERT INTO idxall_expr SELECT generate_series(1, 500);
+	CREATE INDEX idxall_expr_idx ON idxall_expr (idxall_func(id));
+));
+
+$result = $node->safe_psql('postgres',
+	q(SELECT bt_index_check('idxall_expr_idx', false, false, true)));
+is($result, '', 'indexallkeysmatch passes on uncorrupted expression index');
+
+#
+# Test 4: Detect corruption -- swap expression function so heap re-evaluation
+# produces different keys than what the index stores.
+#
+# The index was built with idxall_func(x) = x.  Now change it to return x+1.
+# This simulates the corruption scenario: index says key=42 for a TID, but
+# re-reading the heap and re-evaluating gives key=43.
+#
+$node->safe_psql('postgres', q(
+	CREATE OR REPLACE FUNCTION idxall_func(int) RETURNS int LANGUAGE sql IMMUTABLE AS
+	$$ SELECT $1 + 1; $$;
+));
+
+my ($stdout, $stderr);
+($result, $stdout, $stderr) = $node->psql('postgres',
+	q(SELECT bt_index_check('idxall_expr_idx', false, false, true)));
+like($stderr,
+	qr/index tuple in index "idxall_expr_idx" does not match heap tuple/,
+	'detected index-heap key mismatch via expression function swap');
+
+#
+# Test 5: Restore function and verify no corruption reported
+#
+$node->safe_psql('postgres', q(
+	CREATE OR REPLACE FUNCTION idxall_func(int) RETURNS int LANGUAGE sql IMMUTABLE AS
+	$$ SELECT $1; $$;
+));
+
+$result = $node->safe_psql('postgres',
+	q(SELECT bt_index_check('idxall_expr_idx', false, false, true)));
+is($result, '', 'indexallkeysmatch passes after restoring correct function');
+
+#
+# Test 6: indexallkeysmatch with bt_index_parent_check
+#
+$node->safe_psql('postgres', q(
+	CREATE OR REPLACE FUNCTION idxall_func(int) RETURNS int LANGUAGE sql IMMUTABLE AS
+	$$ SELECT $1 + 1; $$;
+));
+
+($result, $stdout, $stderr) = $node->psql('postgres',
+	q(SELECT bt_index_parent_check('idxall_expr_idx', false, false, false, true)));
+like($stderr,
+	qr/index tuple in index "idxall_expr_idx" does not match heap tuple/,
+	'bt_index_parent_check also detects index-heap key mismatch');
+
+$node->stop;
+done_testing();
diff --git a/contrib/amcheck/verify_nbtree.c b/contrib/amcheck/verify_nbtree.c
index b74ab5f7a05..dff8bd44471 100644
--- a/contrib/amcheck/verify_nbtree.c
+++ b/contrib/amcheck/verify_nbtree.c
@@ -13,6 +13,11 @@
  * verify its structure.  A heap scan later uses Bloom filter probes to verify
  * that every visible heap tuple has a matching index tuple.
  *
+ * When heap-to-index verification (indexallkeysmatch) is requested, a Bloom
+ * filter fingerprints (key,tid) from a heap scan first.  The index scan then
+ * probes this filter; when the probe fails, a heap lookup verifies that the
+ * index tuple points to a heap tuple with the same key.
+ *
  *
  * Copyright (c) 2017-2026, PostgreSQL Global Development Group
  *
@@ -30,8 +35,9 @@
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
-#include "verify_common.h"
 #include "catalog/index.h"
+#include "executor/executor.h"
+#include "verify_common.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_opfamily_d.h"
 #include "common/pg_prng.h"
@@ -82,6 +88,8 @@ typedef struct BtreeCheckState
 	bool		readonly;
 	/* Also verifying heap has no unindexed tuples? */
 	bool		heapallindexed;
+	/* Also verifying each index tuple points to heap tuple with same key? */
+	bool		indexallkeysmatch;
 	/* Also making sure non-pivot tuples can be found by new search? */
 	bool		rootdescend;
 	/* Also check uniqueness constraint if index is unique */
@@ -132,6 +140,15 @@ typedef struct BtreeCheckState
 	bloom_filter *filter;
 	/* Debug counter */
 	int64		heaptuplespresent;
+
+	/*
+	 * Mutable state, for optional indexallkeysmatch verification:
+	 */
+
+	/* Bloom filter fingerprints heap (key,tid) pairs */
+	bloom_filter *heapfilter;
+	/* Debug counter for index tuples verified */
+	int64		indextuplesverified;
 } BtreeCheckState;
 
 /*
@@ -169,6 +186,7 @@ typedef struct BTCallbackState
 {
 	bool		parentcheck;
 	bool		heapallindexed;
+	bool		indexallkeysmatch;
 	bool		rootdescend;
 	bool		checkunique;
 } BTCallbackState;
@@ -180,7 +198,7 @@ static void bt_index_check_callback(Relation indrel, Relation heaprel,
 									void *state, bool readonly);
 static void bt_check_every_level(Relation rel, Relation heaprel,
 								 bool heapkeyspace, bool readonly, bool heapallindexed,
-								 bool rootdescend, bool checkunique);
+								 bool indexallkeysmatch, bool rootdescend, bool checkunique);
 static BtreeLevel bt_check_level_from_leftmost(BtreeCheckState *state,
 											   BtreeLevel level);
 static bool bt_leftmost_ignoring_half_dead(BtreeCheckState *state,
@@ -212,6 +230,13 @@ static void bt_downlink_missing_check(BtreeCheckState *state, bool rightsplit,
 static void bt_tuple_present_callback(Relation index, ItemPointer tid,
 									  Datum *values, bool *isnull,
 									  bool tupleIsAlive, void *checkstate);
+static void bt_heap_fingerprint_callback(Relation index, ItemPointer tid,
+										  Datum *values, bool *isnull,
+										  bool tupleIsAlive, void *checkstate);
+static void bt_verify_index_tuple_points_to_heap(BtreeCheckState *state,
+												  IndexTuple itup,
+												  BlockNumber targetblock,
+												  OffsetNumber offset);
 static IndexTuple bt_normalize_tuple(BtreeCheckState *state,
 									 IndexTuple itup);
 static inline IndexTuple bt_posting_plain_tuple(IndexTuple itup, int n);
@@ -240,13 +265,15 @@ static inline ItemPointer BTreeTupleGetHeapTIDCareful(BtreeCheckState *state,
 static inline ItemPointer BTreeTupleGetPointsToTID(IndexTuple itup);
 
 /*
- * bt_index_check(index regclass, heapallindexed boolean, checkunique boolean)
+ * bt_index_check(index regclass, heapallindexed boolean, checkunique boolean, indexallkeysmatch boolean)
  *
  * Verify integrity of B-Tree index.
  *
  * Acquires AccessShareLock on heap & index relations.  Does not consider
  * invariants that exist between parent/child pages.  Optionally verifies
- * that heap does not contain any unindexed or incorrectly indexed tuples.
+ * that heap does not contain any unindexed or incorrectly indexed tuples
+ * (heapallindexed), or that each index tuple points to a heap tuple with
+ * the same key (indexallkeysmatch).
  */
 Datum
 bt_index_check(PG_FUNCTION_ARGS)
@@ -255,6 +282,7 @@ bt_index_check(PG_FUNCTION_ARGS)
 	BTCallbackState args;
 
 	args.heapallindexed = false;
+	args.indexallkeysmatch = false;
 	args.rootdescend = false;
 	args.parentcheck = false;
 	args.checkunique = false;
@@ -263,6 +291,8 @@ bt_index_check(PG_FUNCTION_ARGS)
 		args.heapallindexed = PG_GETARG_BOOL(1);
 	if (PG_NARGS() >= 3)
 		args.checkunique = PG_GETARG_BOOL(2);
+	if (PG_NARGS() >= 4)
+		args.indexallkeysmatch = PG_GETARG_BOOL(3);
 
 	amcheck_lock_relation_and_check(indrelid, BTREE_AM_OID,
 									bt_index_check_callback,
@@ -272,13 +302,15 @@ bt_index_check(PG_FUNCTION_ARGS)
 }
 
 /*
- * bt_index_parent_check(index regclass, heapallindexed boolean, rootdescend boolean, checkunique boolean)
+ * bt_index_parent_check(index regclass, heapallindexed boolean, rootdescend boolean, checkunique boolean, indexallkeysmatch boolean)
  *
  * Verify integrity of B-Tree index.
  *
  * Acquires ShareLock on heap & index relations.  Verifies that downlinks in
  * parent pages are valid lower bounds on child pages.  Optionally verifies
- * that heap does not contain any unindexed or incorrectly indexed tuples.
+ * that heap does not contain any unindexed or incorrectly indexed tuples
+ * (heapallindexed), or that each index tuple points to a heap tuple with
+ * the same key (indexallkeysmatch).
  */
 Datum
 bt_index_parent_check(PG_FUNCTION_ARGS)
@@ -287,6 +319,7 @@ bt_index_parent_check(PG_FUNCTION_ARGS)
 	BTCallbackState args;
 
 	args.heapallindexed = false;
+	args.indexallkeysmatch = false;
 	args.rootdescend = false;
 	args.parentcheck = true;
 	args.checkunique = false;
@@ -297,6 +330,8 @@ bt_index_parent_check(PG_FUNCTION_ARGS)
 		args.rootdescend = PG_GETARG_BOOL(2);
 	if (PG_NARGS() >= 4)
 		args.checkunique = PG_GETARG_BOOL(3);
+	if (PG_NARGS() >= 5)
+		args.indexallkeysmatch = PG_GETARG_BOOL(4);
 
 	amcheck_lock_relation_and_check(indrelid, BTREE_AM_OID,
 									bt_index_check_callback,
@@ -348,7 +383,8 @@ bt_index_check_callback(Relation indrel, Relation heaprel, void *state, bool rea
 
 	/* Check index, possibly against table it is an index on */
 	bt_check_every_level(indrel, heaprel, heapkeyspace, readonly,
-						 args->heapallindexed, args->rootdescend, args->checkunique);
+						 args->heapallindexed, args->indexallkeysmatch,
+						 args->rootdescend, args->checkunique);
 }
 
 /*
@@ -376,8 +412,8 @@ bt_index_check_callback(Relation indrel, Relation heaprel, void *state, bool rea
  */
 static void
 bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
-					 bool readonly, bool heapallindexed, bool rootdescend,
-					 bool checkunique)
+					 bool readonly, bool heapallindexed, bool indexallkeysmatch,
+					 bool rootdescend, bool checkunique)
 {
 	BtreeCheckState *state;
 	Page		metapage;
@@ -407,38 +443,17 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
 	state->heapkeyspace = heapkeyspace;
 	state->readonly = readonly;
 	state->heapallindexed = heapallindexed;
+	state->indexallkeysmatch = indexallkeysmatch;
 	state->rootdescend = rootdescend;
 	state->checkunique = checkunique;
 	state->snapshot = InvalidSnapshot;
 
-	if (state->heapallindexed)
+	if (state->heapallindexed || state->indexallkeysmatch)
 	{
-		int64		total_pages;
-		int64		total_elems;
-		uint64		seed;
-
 		/*
-		 * Size Bloom filter based on estimated number of tuples in index,
-		 * while conservatively assuming that each block must contain at least
-		 * MaxTIDsPerBTreePage / 3 "plain" tuples -- see
-		 * bt_posting_plain_tuple() for definition, and details of how posting
-		 * list tuples are handled.
-		 */
-		total_pages = RelationGetNumberOfBlocks(rel);
-		total_elems = Max(total_pages * (MaxTIDsPerBTreePage / 3),
-						  (int64) state->rel->rd_rel->reltuples);
-		/* Generate a random seed to avoid repetition */
-		seed = pg_prng_uint64(&pg_global_prng_state);
-		/* Create Bloom filter to fingerprint index */
-		state->filter = bloom_create(total_elems, maintenance_work_mem, seed);
-		state->heaptuplespresent = 0;
-
-		/*
-		 * Register our own snapshot for heapallindexed, rather than asking
-		 * table_index_build_scan() to do this for us later.  This needs to
-		 * happen before index fingerprinting begins, so we can later be
-		 * certain that index fingerprinting should have reached all tuples
-		 * returned by table_index_build_scan().
+		 * Register our own snapshot for heapallindexed/indexallkeysmatch, rather
+		 * than asking table_index_build_scan() to do this for us later.  This
+		 * needs to happen before fingerprinting begins.
 		 */
 		state->snapshot = RegisterSnapshot(GetTransactionSnapshot());
 
@@ -463,16 +478,55 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
 						   RelationGetRelationName(rel)));
 	}
 
+	if (state->heapallindexed)
+	{
+		int64		total_pages;
+		int64		total_elems;
+		uint64		seed;
+
+		/*
+		 * Size Bloom filter based on estimated number of tuples in index,
+		 * while conservatively assuming that each block must contain at least
+		 * MaxTIDsPerBTreePage / 3 "plain" tuples -- see
+		 * bt_posting_plain_tuple() for definition, and details of how posting
+		 * list tuples are handled.
+		 */
+		total_pages = RelationGetNumberOfBlocks(rel);
+		total_elems = Max(total_pages * (MaxTIDsPerBTreePage / 3),
+						  (int64) state->rel->rd_rel->reltuples);
+		seed = pg_prng_uint64(&pg_global_prng_state);
+		state->filter = bloom_create(total_elems, maintenance_work_mem, seed);
+		state->heaptuplespresent = 0;
+	}
+
+	if (state->indexallkeysmatch)
+	{
+		int64		total_pages;
+		int64		total_elems;
+		uint64		seed;
+
+		/*
+		 * Size Bloom filter based on estimated number of heap tuples.
+		 */
+		total_pages = RelationGetNumberOfBlocks(heaprel);
+		total_elems = Max(total_pages * MaxHeapTuplesPerPage,
+						  (int64) heaprel->rd_rel->reltuples);
+		seed = pg_prng_uint64(&pg_global_prng_state);
+		state->heapfilter = bloom_create(total_elems, maintenance_work_mem, seed);
+		state->indextuplesverified = 0;
+	}
+
 	/*
 	 * We need a snapshot to check the uniqueness of the index.  For better
 	 * performance, take it once per index check.  If one was already taken
 	 * above, use that.
 	 */
-	if (state->checkunique)
+	if (state->checkunique || state->indexallkeysmatch)
 	{
 		state->indexinfo = BuildIndexInfo(state->rel);
 
-		if (state->indexinfo->ii_Unique && state->snapshot == InvalidSnapshot)
+		if (state->checkunique && state->indexinfo->ii_Unique &&
+			state->snapshot == InvalidSnapshot)
 			state->snapshot = RegisterSnapshot(GetTransactionSnapshot());
 	}
 
@@ -490,6 +544,36 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
 												 ALLOCSET_DEFAULT_SIZES);
 	state->checkstrategy = GetAccessStrategy(BAS_BULKREAD);
 
+	/*
+	 * When indexallkeysmatch, fingerprint heap first so we can verify each index
+	 * tuple points to a heap tuple with the same key during the index scan.
+	 */
+	if (state->indexallkeysmatch)
+	{
+		IndexInfo  *indexinfo = BuildIndexInfo(state->rel);
+		TableScanDesc scan;
+
+		scan = table_beginscan_strat(state->heaprel,
+									 state->snapshot,
+									 0, NULL, true, true);
+		indexinfo->ii_Concurrent = true;
+		indexinfo->ii_Unique = false;
+		indexinfo->ii_ExclusionOps = NULL;
+		indexinfo->ii_ExclusionProcs = NULL;
+		indexinfo->ii_ExclusionStrats = NULL;
+
+		elog(DEBUG1, "fingerprinting heap \"%s\" for index \"%s\" verification",
+			 RelationGetRelationName(state->heaprel),
+			 RelationGetRelationName(state->rel));
+
+		table_index_build_scan(state->heaprel, state->rel, indexinfo, true, false,
+							   bt_heap_fingerprint_callback, state, scan);
+
+		ereport(DEBUG1,
+				(errmsg_internal("finished heap fingerprint with bitset %.2f%% set",
+								100.0 * bloom_prop_bits_set(state->heapfilter))));
+	}
+
 	/* Get true root block from meta-page */
 	metapage = palloc_btree_page(state, BTREE_METAPAGE);
 	metad = BTPageGetMeta(metapage);
@@ -596,6 +680,14 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
 		bloom_free(state->filter);
 	}
 
+	if (state->indexallkeysmatch)
+	{
+		ereport(DEBUG1,
+				(errmsg_internal("finished verifying " INT64_FORMAT " index tuples point to matching heap tuples",
+								 state->indextuplesverified)));
+		bloom_free(state->heapfilter);
+	}
+
 	/* Be tidy: */
 	if (state->snapshot != InvalidSnapshot)
 		UnregisterSnapshot(state->snapshot);
@@ -1516,6 +1608,28 @@ bt_target_page_check(BtreeCheckState *state)
 			}
 		}
 
+		/* Verify each index tuple points to heap tuple with same key */
+		if (state->indexallkeysmatch && P_ISLEAF(topaque) && !ItemIdIsDead(itemid))
+		{
+			if (BTreeTupleIsPosting(itup))
+			{
+				for (int i = 0; i < BTreeTupleGetNPosting(itup); i++)
+				{
+					IndexTuple	logtuple;
+
+					logtuple = bt_posting_plain_tuple(itup, i);
+					bt_verify_index_tuple_points_to_heap(state, logtuple,
+														  state->targetblock, offset);
+					pfree(logtuple);
+				}
+			}
+			else
+			{
+				bt_verify_index_tuple_points_to_heap(state, itup,
+													 state->targetblock, offset);
+			}
+		}
+
 		/*
 		 * * High key check *
 		 *
@@ -2812,6 +2926,128 @@ bt_tuple_present_callback(Relation index, ItemPointer tid, Datum *values,
 		pfree(norm);
 }
 
+/*
+ * Per-tuple callback from table_index_build_scan for indexallkeysmatch.  Add
+ * each visible heap tuple's (key, tid) to the Bloom filter for later probe
+ * during the index scan.
+ */
+static void
+bt_heap_fingerprint_callback(Relation index, ItemPointer tid, Datum *values,
+							  bool *isnull, bool tupleIsAlive, void *checkstate)
+{
+	BtreeCheckState *state = (BtreeCheckState *) checkstate;
+	IndexTuple	itup,
+				norm;
+
+	Assert(state->indexallkeysmatch);
+
+	itup = index_form_tuple(RelationGetDescr(index), values, isnull);
+	itup->t_tid = *tid;
+	norm = bt_normalize_tuple(state, itup);
+	bloom_add_element(state->heapfilter, (unsigned char *) norm,
+					  IndexTupleSize(norm));
+	pfree(itup);
+	if (norm != itup)
+		pfree(norm);
+}
+
+/*
+ * Verify that the index tuple points to a heap tuple with the same key.
+ * When the Bloom filter lacks the (key, tid), perform a heap lookup to confirm.
+ * Skip index tuples that point to dead heap tuples (not visible to snapshot).
+ */
+static void
+bt_verify_index_tuple_points_to_heap(BtreeCheckState *state, IndexTuple itup,
+									BlockNumber targetblock, OffsetNumber offset)
+{
+	ItemPointer tid = BTreeTupleGetHeapTID(itup);
+	IndexTuple	norm;
+	bool		in_filter;
+
+	Assert(state->indexallkeysmatch);
+
+	norm = bt_normalize_tuple(state, itup);
+	in_filter = !bloom_lacks_element(state->heapfilter, (unsigned char *) norm,
+									 IndexTupleSize(norm));
+	if (norm != itup)
+		pfree(norm);
+
+	if (in_filter)
+	{
+		/* Fingerprint contains only visible tuples, so this one is verified */
+		state->indextuplesverified++;
+		return;
+	}
+
+	/*
+	 * Bloom filter says (key, tid) not in heap.  Follow TID to verify; this
+	 * amortizes random heap lookups when the filter has false negatives, or
+	 * reports corruption when the index points to wrong heap tuple.  Skip
+	 * dead tuples (table_tuple_fetch_row_version returns false for them).
+	 */
+	{
+		TupleTableSlot *slot;
+		IndexTuple	heap_itup;
+		IndexTuple	heap_norm;
+		Datum		values[INDEX_MAX_KEYS];
+		bool		isnull[INDEX_MAX_KEYS];
+		IndexInfo  *indexinfo;
+		EState	   *estate;
+		bool		found;
+
+		slot = table_slot_create(state->heaprel, NULL);
+		found = table_tuple_fetch_row_version(state->heaprel, tid,
+											  state->snapshot, slot);
+		if (!found)
+		{
+			ExecDropSingleTupleTableSlot(slot);
+			return;			/* dead or non-existent heap tuple, skip */
+		}
+
+		indexinfo = state->indexinfo;
+		estate = CreateExecutorState();
+		GetPerTupleExprContext(estate)->ecxt_scantuple = slot;
+		FormIndexDatum(indexinfo, slot, estate, values, isnull);
+		FreeExecutorState(estate);
+
+		/* These may have been pointing to the now-gone estate */
+		indexinfo->ii_ExpressionsState = NIL;
+		indexinfo->ii_PredicateState = NULL;
+
+		heap_itup = index_form_tuple(RelationGetDescr(state->rel), values, isnull);
+		heap_itup->t_tid = *tid;
+		heap_norm = bt_normalize_tuple(state, heap_itup);
+
+		norm = bt_normalize_tuple(state, itup);
+		if (IndexTupleSize(heap_norm) != IndexTupleSize(norm) ||
+			memcmp(heap_norm, norm, IndexTupleSize(norm)) != 0)
+		{
+			ExecDropSingleTupleTableSlot(slot);
+			pfree(heap_itup);
+			if (heap_norm != heap_itup)
+				pfree(heap_norm);
+			if (norm != itup)
+				pfree(norm);
+			ereport(ERROR,
+					(errcode(ERRCODE_INDEX_CORRUPTED),
+					 errmsg("index tuple in index \"%s\" does not match heap tuple",
+							RelationGetRelationName(state->rel)),
+					 errdetail_internal("Index tid=(%u,%u) points to heap tid=(%u,%u) with different key.",
+									   targetblock, offset,
+									   ItemPointerGetBlockNumber(tid),
+									   ItemPointerGetOffsetNumber(tid))));
+		}
+
+		ExecDropSingleTupleTableSlot(slot);
+		pfree(heap_itup);
+		if (heap_norm != heap_itup)
+			pfree(heap_norm);
+		if (norm != itup)
+			pfree(norm);
+		state->indextuplesverified++;
+	}
+}
+
 /*
  * Normalize an index tuple for fingerprinting.
  *
-- 
2.43.0

