From 95cda56b5e9d53232c9b5f95abe423e878b1fe78 Mon Sep 17 00:00:00 2001
From: Greg Burd <greg@burd.me>
Date: Fri, 27 Feb 2026 12:19:06 -0500
Subject: [PATCH v33 1/2] Add comprehensive tests for HOT updates and replica
 identity

Adds regression and isolation tests covering:
- HOT update decisions across various index types (B-tree, BRIN,
  partial, expression, multi-column, unique constraints)
- Replica identity key extraction for logical replication
  (DEFAULT, FULL, USING INDEX, NOTHING modes)
- Concurrent HOT update scenarios (locking, blocking, index scans,
  HOT chains, FOR UPDATE/KEY SHARE interactions)

Regression tests:
- hot_updates.sql: 10 scenarios testing HOT eligibility
- replica_identity_logging.sql: 11 scenarios verifying replica
  identity keys logged to WAL via test_decoding

Isolation tests:
- hot_updates_concurrent.spec: concurrent updates on same/different rows
- hot_updates_index_scan.spec: interactions with index scans and row locks
- hot_updates_chain.spec: HOT chain building and snapshot isolation
---
 .../isolation/expected/hot_updates_chain.out  | 144 ++++
 .../expected/hot_updates_concurrent.out       | 143 ++++
 .../expected/hot_updates_index_scan.out       | 126 +++
 src/test/isolation/isolation_schedule         |   3 +
 .../isolation/specs/hot_updates_chain.spec    | 110 +++
 .../specs/hot_updates_concurrent.spec         | 107 +++
 .../specs/hot_updates_index_scan.spec         |  91 +++
 src/test/regress/expected/hot_updates.out     | 725 ++++++++++++++++++
 .../expected/replica_identity_logging.out     | 396 ++++++++++
 src/test/regress/parallel_schedule            |   7 +
 src/test/regress/sql/hot_updates.sql          | 553 +++++++++++++
 .../regress/sql/replica_identity_logging.sql  | 349 +++++++++
 12 files changed, 2754 insertions(+)
 create mode 100644 src/test/isolation/expected/hot_updates_chain.out
 create mode 100644 src/test/isolation/expected/hot_updates_concurrent.out
 create mode 100644 src/test/isolation/expected/hot_updates_index_scan.out
 create mode 100644 src/test/isolation/specs/hot_updates_chain.spec
 create mode 100644 src/test/isolation/specs/hot_updates_concurrent.spec
 create mode 100644 src/test/isolation/specs/hot_updates_index_scan.spec
 create mode 100644 src/test/regress/expected/hot_updates.out
 create mode 100644 src/test/regress/expected/replica_identity_logging.out
 create mode 100644 src/test/regress/sql/hot_updates.sql
 create mode 100644 src/test/regress/sql/replica_identity_logging.sql

diff --git a/src/test/isolation/expected/hot_updates_chain.out b/src/test/isolation/expected/hot_updates_chain.out
new file mode 100644
index 00000000000..503252009ea
--- /dev/null
+++ b/src/test/isolation/expected/hot_updates_chain.out
@@ -0,0 +1,144 @@
+Parsed test spec with 5 sessions
+
+starting permutation: s1_begin s1_hot_update1 s1_hot_update2 s1_hot_update3 s1_commit s1_select s1_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_hot_update2: UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1;
+step s1_hot_update3: UPDATE hot_test SET non_indexed_col = 'update3' WHERE id = 1;
+step s1_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|update3        
+(1 row)
+
+step s1_verify_hot: 
+    -- Check for HOT chain: LP_REDIRECT or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s2_begin s2_select_before s1_begin s1_hot_update1 s1_hot_update2 s1_commit s2_select_after s2_commit
+step s2_begin: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s2_select_before: SELECT non_indexed_col FROM hot_test WHERE id = 1;
+non_indexed_col
+---------------
+initial        
+(1 row)
+
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_hot_update2: UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1;
+step s1_commit: COMMIT;
+step s2_select_after: SELECT non_indexed_col FROM hot_test WHERE id = 1;
+non_indexed_col
+---------------
+initial        
+(1 row)
+
+step s2_commit: COMMIT;
+
+starting permutation: s1_begin s1_hot_update1 s1_hot_update2 s1_commit s3_begin s3_non_hot_update s3_commit s1_select
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_hot_update2: UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1;
+step s1_commit: COMMIT;
+step s3_begin: BEGIN;
+step s3_non_hot_update: UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+step s3_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        150|update2        
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update1 s1_commit s3_begin s3_non_hot_update s3_commit s4_begin s4_hot_after_non_hot s4_commit s4_select s4_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_commit: COMMIT;
+step s3_begin: BEGIN;
+step s3_non_hot_update: UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+step s3_commit: COMMIT;
+step s4_begin: BEGIN;
+step s4_hot_after_non_hot: UPDATE hot_test SET non_indexed_col = 'after_non_hot' WHERE id = 1;
+step s4_commit: COMMIT;
+step s4_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        150|after_non_hot  
+(1 row)
+
+step s4_verify_hot: 
+    -- Check for new HOT chain after non-HOT update broke the previous chain
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update1 s1_hot_update2 s5_begin s5_hot_update_row2_1 s5_hot_update_row2_2 s1_commit s5_commit s1_select s5_select s1_verify_hot s5_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_hot_update2: UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1;
+step s5_begin: BEGIN;
+step s5_hot_update_row2_1: UPDATE hot_test SET non_indexed_col = 'row2_update1' WHERE id = 2;
+step s5_hot_update_row2_2: UPDATE hot_test SET non_indexed_col = 'row2_update2' WHERE id = 2;
+step s1_commit: COMMIT;
+step s5_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|update2        
+(1 row)
+
+step s5_select: SELECT * FROM hot_test WHERE id = 2;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 2|        200|row2_update2   
+(1 row)
+
+step s1_verify_hot: 
+    -- Check for HOT chain: LP_REDIRECT or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+step s5_verify_hot: 
+    -- Check for HOT chain on page 0
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
diff --git a/src/test/isolation/expected/hot_updates_concurrent.out b/src/test/isolation/expected/hot_updates_concurrent.out
new file mode 100644
index 00000000000..b1a8b0cb7b2
--- /dev/null
+++ b/src/test/isolation/expected/hot_updates_concurrent.out
@@ -0,0 +1,143 @@
+Parsed test spec with 4 sessions
+
+starting permutation: s1_begin s1_hot_update s2_begin s2_hot_update s1_commit s2_commit s1_select s2_select s2_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1;
+step s2_begin: BEGIN;
+step s2_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s2' WHERE id = 1; <waiting ...>
+step s1_commit: COMMIT;
+step s2_hot_update: <... completed>
+step s2_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|updated_s2     
+(1 row)
+
+step s2_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|updated_s2     
+(1 row)
+
+step s2_verify_hot: 
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update s3_begin s3_non_hot_update s1_commit s3_commit s3_select s3_verify_index
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1;
+step s3_begin: BEGIN;
+step s3_non_hot_update: UPDATE hot_test SET indexed_col = 150 WHERE id = 1; <waiting ...>
+step s1_commit: COMMIT;
+step s3_non_hot_update: <... completed>
+step s3_commit: COMMIT;
+step s3_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        150|updated_s1     
+(1 row)
+
+step s3_verify_index: 
+    -- Verify index was updated (proves non-HOT)
+    SELECT COUNT(*) = 1 AS index_updated FROM hot_test WHERE indexed_col = 150;
+    SELECT COUNT(*) = 0 AS old_value_gone FROM hot_test WHERE indexed_col = 100;
+
+index_updated
+-------------
+t            
+(1 row)
+
+old_value_gone
+--------------
+t             
+(1 row)
+
+
+starting permutation: s3_begin s3_non_hot_update s1_begin s1_hot_update s3_commit s1_commit s1_select s1_verify_hot
+step s3_begin: BEGIN;
+step s3_non_hot_update: UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1; <waiting ...>
+step s3_commit: COMMIT;
+step s1_hot_update: <... completed>
+step s1_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        150|updated_s1     
+(1 row)
+
+step s1_verify_hot: 
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update s4_begin s4_hot_update_row2 s1_commit s4_commit s1_select s4_select s1_verify_hot s4_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1;
+step s4_begin: BEGIN;
+step s4_hot_update_row2: UPDATE hot_test SET non_indexed_col = 'updated_s4' WHERE id = 2;
+step s1_commit: COMMIT;
+step s4_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|updated_s1     
+(1 row)
+
+step s4_select: SELECT * FROM hot_test WHERE id = 2;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 2|        200|updated_s4     
+(1 row)
+
+step s1_verify_hot: 
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+step s4_verify_hot: 
+    -- Check for HOT chain on page 0
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
diff --git a/src/test/isolation/expected/hot_updates_index_scan.out b/src/test/isolation/expected/hot_updates_index_scan.out
new file mode 100644
index 00000000000..d72322b2146
--- /dev/null
+++ b/src/test/isolation/expected/hot_updates_index_scan.out
@@ -0,0 +1,126 @@
+Parsed test spec with 4 sessions
+
+starting permutation: s1_begin s1_hot_update s2_begin s2_index_scan s1_commit s2_commit
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50;
+step s2_begin: BEGIN;
+step s2_index_scan: SELECT * FROM hot_test WHERE indexed_col = 500;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|initial50      
+(1 row)
+
+step s1_commit: COMMIT;
+step s2_commit: COMMIT;
+
+starting permutation: s1_begin s1_non_hot_update s1_commit s2_begin s2_index_scan_new s2_commit s2_verify_index
+step s1_begin: BEGIN;
+step s1_non_hot_update: UPDATE hot_test SET indexed_col = 555 WHERE id = 50;
+step s1_commit: COMMIT;
+step s2_begin: BEGIN;
+step s2_index_scan_new: SELECT * FROM hot_test WHERE indexed_col = 555;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        555|initial50      
+(1 row)
+
+step s2_commit: COMMIT;
+step s2_verify_index: 
+    -- After non-HOT update, verify index reflects the change
+    SELECT COUNT(*) = 1 AS found_new_value FROM hot_test WHERE indexed_col = 555;
+    SELECT COUNT(*) = 0 AS old_value_gone FROM hot_test WHERE indexed_col = 500;
+
+found_new_value
+---------------
+t              
+(1 row)
+
+old_value_gone
+--------------
+t             
+(1 row)
+
+
+starting permutation: s3_begin s3_select_for_update s1_begin s1_hot_update s3_commit s1_commit s1_verify_hot
+step s3_begin: BEGIN;
+step s3_select_for_update: SELECT * FROM hot_test WHERE id = 50 FOR UPDATE;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|initial50      
+(1 row)
+
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50; <waiting ...>
+step s3_commit: COMMIT;
+step s1_hot_update: <... completed>
+step s1_commit: COMMIT;
+step s1_verify_hot: 
+    -- Verify HOT chain exists for row with id=50
+    SELECT EXISTS (
+        SELECT 1 FROM heap_page_items(get_raw_page('hot_test', 0))
+        WHERE lp_flags = 2
+           OR (t_ctid IS NOT NULL
+               AND (t_ctid::text::point)[0]::int = 0
+               AND t_ctid != ('(0,' || lp || ')')::tid)
+    ) AS has_hot_chain;
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update s3_begin s3_select_for_update s1_commit s3_commit
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50;
+step s3_begin: BEGIN;
+step s3_select_for_update: SELECT * FROM hot_test WHERE id = 50 FOR UPDATE; <waiting ...>
+step s1_commit: COMMIT;
+step s3_select_for_update: <... completed>
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|hot_updated    
+(1 row)
+
+step s3_commit: COMMIT;
+
+starting permutation: s4_begin s4_select_for_key_share s1_begin s1_hot_update s4_commit s1_commit s1_verify_hot
+step s4_begin: BEGIN;
+step s4_select_for_key_share: SELECT * FROM hot_test WHERE id = 50 FOR KEY SHARE;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|initial50      
+(1 row)
+
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50;
+step s4_commit: COMMIT;
+step s1_commit: COMMIT;
+step s1_verify_hot: 
+    -- Verify HOT chain exists for row with id=50
+    SELECT EXISTS (
+        SELECT 1 FROM heap_page_items(get_raw_page('hot_test', 0))
+        WHERE lp_flags = 2
+           OR (t_ctid IS NOT NULL
+               AND (t_ctid::text::point)[0]::int = 0
+               AND t_ctid != ('(0,' || lp || ')')::tid)
+    ) AS has_hot_chain;
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s4_begin s4_select_for_key_share s1_begin s1_non_hot_update s4_commit s1_commit
+step s4_begin: BEGIN;
+step s4_select_for_key_share: SELECT * FROM hot_test WHERE id = 50 FOR KEY SHARE;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|initial50      
+(1 row)
+
+step s1_begin: BEGIN;
+step s1_non_hot_update: UPDATE hot_test SET indexed_col = 555 WHERE id = 50;
+step s4_commit: COMMIT;
+step s1_commit: COMMIT;
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 4e466580cd4..46525b0a62a 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -19,6 +19,9 @@ test: multiple-row-versions
 test: index-only-scan
 test: index-only-bitmapscan
 test: predicate-lock-hot-tuple
+test: hot_updates_concurrent
+test: hot_updates_index_scan
+test: hot_updates_chain
 test: update-conflict-out
 test: deadlock-simple
 test: deadlock-hard
diff --git a/src/test/isolation/specs/hot_updates_chain.spec b/src/test/isolation/specs/hot_updates_chain.spec
new file mode 100644
index 00000000000..85cd2176133
--- /dev/null
+++ b/src/test/isolation/specs/hot_updates_chain.spec
@@ -0,0 +1,110 @@
+# Test HOT update chains and their interaction with VACUUM and page pruning
+#
+# This test verifies that HOT update chains are correctly maintained when
+# multiple HOT updates occur on the same row, and that VACUUM correctly
+# handles HOT chains.
+
+setup
+{
+    CREATE EXTENSION IF NOT EXISTS pageinspect;
+
+    CREATE TABLE hot_test (
+        id int PRIMARY KEY,
+        indexed_col int,
+        non_indexed_col text
+    );
+
+    CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+
+    INSERT INTO hot_test VALUES (1, 100, 'initial');
+    INSERT INTO hot_test VALUES (2, 200, 'initial');
+}
+
+teardown
+{
+    DROP TABLE hot_test;
+    DROP EXTENSION pageinspect;
+}
+
+# Session 1: Create HOT chain with multiple updates
+session s1
+step s1_begin { BEGIN; }
+step s1_hot_update1 { UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1; }
+step s1_hot_update2 { UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1; }
+step s1_hot_update3 { UPDATE hot_test SET non_indexed_col = 'update3' WHERE id = 1; }
+step s1_commit { COMMIT; }
+step s1_select { SELECT * FROM hot_test WHERE id = 1; }
+step s1_verify_hot {
+    -- Check for HOT chain: LP_REDIRECT or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+}
+
+# Session 2: Read while HOT chain is being built
+session s2
+step s2_begin { BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step s2_select_before { SELECT non_indexed_col FROM hot_test WHERE id = 1; }
+step s2_select_after { SELECT non_indexed_col FROM hot_test WHERE id = 1; }
+step s2_commit { COMMIT; }
+
+# Session 3: Break HOT chain with non-HOT update
+session s3
+step s3_begin { BEGIN; }
+step s3_non_hot_update { UPDATE hot_test SET indexed_col = 150 WHERE id = 1; }
+step s3_commit { COMMIT; }
+
+# Session 4: Try to build HOT chain after non-HOT update
+session s4
+step s4_begin { BEGIN; }
+step s4_hot_after_non_hot { UPDATE hot_test SET non_indexed_col = 'after_non_hot' WHERE id = 1; }
+step s4_commit { COMMIT; }
+step s4_select { SELECT * FROM hot_test WHERE id = 1; }
+step s4_verify_hot {
+    -- Check for new HOT chain after non-HOT update broke the previous chain
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+}
+
+# Session 5: Multiple sessions building separate HOT chains on different rows
+session s5
+step s5_begin { BEGIN; }
+step s5_hot_update_row2_1 { UPDATE hot_test SET non_indexed_col = 'row2_update1' WHERE id = 2; }
+step s5_hot_update_row2_2 { UPDATE hot_test SET non_indexed_col = 'row2_update2' WHERE id = 2; }
+step s5_commit { COMMIT; }
+step s5_select { SELECT * FROM hot_test WHERE id = 2; }
+step s5_verify_hot {
+    -- Check for HOT chain on page 0
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+}
+
+# Build HOT chain within single transaction
+# All updates should form a HOT chain
+permutation s1_begin s1_hot_update1 s1_hot_update2 s1_hot_update3 s1_commit s1_select s1_verify_hot
+
+# REPEATABLE READ should see consistent snapshot across HOT chain updates
+# Session 2 starts before updates, should see 'initial' throughout
+permutation s2_begin s2_select_before s1_begin s1_hot_update1 s1_hot_update2 s1_commit s2_select_after s2_commit
+
+# HOT chain followed by non-HOT update
+# Non-HOT update breaks the HOT chain
+permutation s1_begin s1_hot_update1 s1_hot_update2 s1_commit s3_begin s3_non_hot_update s3_commit s1_select
+
+# HOT update after non-HOT update can start new HOT chain
+# After breaking chain with indexed column update, new HOT updates can start fresh chain
+permutation s1_begin s1_hot_update1 s1_commit s3_begin s3_non_hot_update s3_commit s4_begin s4_hot_after_non_hot s4_commit s4_select s4_verify_hot
+
+# Multiple sessions building separate HOT chains on different rows
+permutation s1_begin s1_hot_update1 s1_hot_update2 s5_begin s5_hot_update_row2_1 s5_hot_update_row2_2 s1_commit s5_commit s1_select s5_select s1_verify_hot s5_verify_hot
diff --git a/src/test/isolation/specs/hot_updates_concurrent.spec b/src/test/isolation/specs/hot_updates_concurrent.spec
new file mode 100644
index 00000000000..eac78d62ac5
--- /dev/null
+++ b/src/test/isolation/specs/hot_updates_concurrent.spec
@@ -0,0 +1,107 @@
+# Test concurrent HOT updates and validate HOT chains
+#
+# This test verifies that HOT updates work correctly when multiple sessions
+# are updating the same table concurrently, and validates that HOT chains
+# are actually created using heap_page_items().
+
+setup
+{
+    CREATE EXTENSION IF NOT EXISTS pageinspect;
+
+    CREATE TABLE hot_test (
+        id int PRIMARY KEY,
+        indexed_col int,
+        non_indexed_col text
+    );
+
+    CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+
+    INSERT INTO hot_test VALUES (1, 100, 'initial1');
+    INSERT INTO hot_test VALUES (2, 200, 'initial2');
+    INSERT INTO hot_test VALUES (3, 300, 'initial3');
+}
+
+teardown
+{
+    DROP TABLE hot_test;
+    DROP EXTENSION pageinspect;
+}
+
+# Session 1: HOT update (modify non-indexed column)
+session s1
+step s1_begin { BEGIN; }
+step s1_hot_update { UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1; }
+step s1_commit { COMMIT; }
+step s1_select { SELECT * FROM hot_test WHERE id = 1; }
+step s1_verify_hot {
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+}
+
+# Session 2: HOT update (modify non-indexed column on same row)
+session s2
+step s2_begin { BEGIN; }
+step s2_hot_update { UPDATE hot_test SET non_indexed_col = 'updated_s2' WHERE id = 1; }
+step s2_commit { COMMIT; }
+step s2_select { SELECT * FROM hot_test WHERE id = 1; }
+step s2_verify_hot {
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+}
+
+# Session 3: Non-HOT update (modify indexed column)
+session s3
+step s3_begin { BEGIN; }
+step s3_non_hot_update { UPDATE hot_test SET indexed_col = 150 WHERE id = 1; }
+step s3_commit { COMMIT; }
+step s3_select { SELECT * FROM hot_test WHERE id = 1; }
+step s3_verify_index {
+    -- Verify index was updated (proves non-HOT)
+    SELECT COUNT(*) = 1 AS index_updated FROM hot_test WHERE indexed_col = 150;
+    SELECT COUNT(*) = 0 AS old_value_gone FROM hot_test WHERE indexed_col = 100;
+}
+
+# Session 4: Concurrent HOT updates on different rows
+session s4
+step s4_begin { BEGIN; }
+step s4_hot_update_row2 { UPDATE hot_test SET non_indexed_col = 'updated_s4' WHERE id = 2; }
+step s4_commit { COMMIT; }
+step s4_select { SELECT * FROM hot_test WHERE id = 2; }
+step s4_verify_hot {
+    -- Check for HOT chain on page 0
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+}
+
+# Two sessions both doing HOT updates on same row
+# Second session should block until first commits
+# Both should create HOT chains
+permutation s1_begin s1_hot_update s2_begin s2_hot_update s1_commit s2_commit s1_select s2_select s2_verify_hot
+
+# HOT update followed by non-HOT update
+# Non-HOT update should wait for HOT update to commit
+# First update is HOT, second is non-HOT (index updated)
+permutation s1_begin s1_hot_update s3_begin s3_non_hot_update s1_commit s3_commit s3_select s3_verify_index
+
+# Non-HOT update followed by HOT update
+# HOT update should wait for non-HOT update to commit
+# First update is non-HOT (index), second is HOT
+permutation s3_begin s3_non_hot_update s1_begin s1_hot_update s3_commit s1_commit s1_select s1_verify_hot
+
+# Concurrent HOT updates on different rows (should not block)
+# Both sessions should be able to create HOT chains independently
+permutation s1_begin s1_hot_update s4_begin s4_hot_update_row2 s1_commit s4_commit s1_select s4_select s1_verify_hot s4_verify_hot
diff --git a/src/test/isolation/specs/hot_updates_index_scan.spec b/src/test/isolation/specs/hot_updates_index_scan.spec
new file mode 100644
index 00000000000..39db07cc80f
--- /dev/null
+++ b/src/test/isolation/specs/hot_updates_index_scan.spec
@@ -0,0 +1,91 @@
+# Test HOT updates interaction with index scans and SELECT FOR UPDATE
+#
+# This test verifies that HOT updates are correctly handled when concurrent
+# sessions are performing index scans, using SELECT FOR UPDATE, and validates
+# HOT chains using heap_page_items().
+
+setup
+{
+    CREATE EXTENSION IF NOT EXISTS pageinspect;
+
+    CREATE TABLE hot_test (
+        id int PRIMARY KEY,
+        indexed_col int,
+        non_indexed_col text
+    );
+
+    CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+
+    INSERT INTO hot_test SELECT i, i * 10, 'initial' || i FROM generate_series(1, 100) i;
+}
+
+teardown
+{
+    DROP TABLE hot_test;
+    DROP EXTENSION pageinspect;
+}
+
+# Session 1: Perform HOT update
+session s1
+step s1_begin { BEGIN; }
+step s1_hot_update { UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50; }
+step s1_non_hot_update { UPDATE hot_test SET indexed_col = 555 WHERE id = 50; }
+step s1_commit { COMMIT; }
+step s1_verify_hot {
+    -- Verify HOT chain exists for row with id=50
+    SELECT EXISTS (
+        SELECT 1 FROM heap_page_items(get_raw_page('hot_test', 0))
+        WHERE lp_flags = 2
+           OR (t_ctid IS NOT NULL
+               AND (t_ctid::text::point)[0]::int = 0
+               AND t_ctid != ('(0,' || lp || ')')::tid)
+    ) AS has_hot_chain;
+}
+
+# Session 2: Index scan while HOT update in progress
+session s2
+step s2_begin { BEGIN; }
+step s2_index_scan { SELECT * FROM hot_test WHERE indexed_col = 500; }
+step s2_index_scan_new { SELECT * FROM hot_test WHERE indexed_col = 555; }
+step s2_commit { COMMIT; }
+step s2_verify_index {
+    -- After non-HOT update, verify index reflects the change
+    SELECT COUNT(*) = 1 AS found_new_value FROM hot_test WHERE indexed_col = 555;
+    SELECT COUNT(*) = 0 AS old_value_gone FROM hot_test WHERE indexed_col = 500;
+}
+
+# Session 3: SELECT FOR UPDATE
+session s3
+step s3_begin { BEGIN; }
+step s3_select_for_update { SELECT * FROM hot_test WHERE id = 50 FOR UPDATE; }
+step s3_commit { COMMIT; }
+
+# Session 4: SELECT FOR KEY SHARE (should not block HOT update of non-key column)
+session s4
+step s4_begin { BEGIN; }
+step s4_select_for_key_share { SELECT * FROM hot_test WHERE id = 50 FOR KEY SHARE; }
+step s4_commit { COMMIT; }
+
+# Index scan should see consistent snapshot during HOT update
+# Index scan starts before HOT update commits
+permutation s1_begin s1_hot_update s2_begin s2_index_scan s1_commit s2_commit
+
+# Index scan after non-HOT update should see new index entry
+# Index scan starts after non-HOT update commits
+permutation s1_begin s1_non_hot_update s1_commit s2_begin s2_index_scan_new s2_commit s2_verify_index
+
+# SELECT FOR UPDATE blocks HOT update
+# FOR UPDATE should block the UPDATE until SELECT commits
+permutation s3_begin s3_select_for_update s1_begin s1_hot_update s3_commit s1_commit s1_verify_hot
+
+# HOT update blocks SELECT FOR UPDATE
+# SELECT FOR UPDATE should wait for HOT update to commit
+permutation s1_begin s1_hot_update s3_begin s3_select_for_update s1_commit s3_commit
+
+# SELECT FOR KEY SHARE should not block HOT update (non-key column)
+# HOT update of non-indexed column should not conflict with FOR KEY SHARE
+permutation s4_begin s4_select_for_key_share s1_begin s1_hot_update s4_commit s1_commit s1_verify_hot
+
+# Non-HOT update (key column) should block after FOR KEY SHARE
+# Non-HOT update of indexed column should wait for FOR KEY SHARE
+permutation s4_begin s4_select_for_key_share s1_begin s1_non_hot_update s4_commit s1_commit
diff --git a/src/test/regress/expected/hot_updates.out b/src/test/regress/expected/hot_updates.out
new file mode 100644
index 00000000000..04fb86755db
--- /dev/null
+++ b/src/test/regress/expected/hot_updates.out
@@ -0,0 +1,725 @@
+--
+-- HOT_UPDATES
+-- Test Heap-Only Tuple (HOT) update decisions
+--
+-- This test systematically verifies that HOT updates are used when appropriate
+-- and avoided when necessary (e.g., when indexed columns are modified).
+--
+-- We use multiple validation methods:
+-- 1. Index verification (index still works = proves no index update for HOT)
+-- 2. Statistics functions (pg_stat_get_tuples_hot_updated)
+-- 3. pageinspect extension for HOT chain examination
+--
+-- Load required extensions
+CREATE EXTENSION IF NOT EXISTS pageinspect;
+-- Clean up from prior runs
+DROP TABLE IF EXISTS hot_test CASCADE;
+NOTICE:  table "hot_test" does not exist, skipping
+-- Function to get HOT update count
+CREATE OR REPLACE FUNCTION get_hot_count(rel_name text)
+RETURNS TABLE (
+    updates BIGINT,
+    hot BIGINT
+) AS $$
+DECLARE
+  rel_oid oid;
+BEGIN
+  rel_oid := rel_name::regclass::oid;
+  PERFORM pg_stat_force_next_flush();
+
+  updates := COALESCE(pg_stat_get_tuples_updated(rel_oid), 0) +
+             COALESCE(pg_stat_get_xact_tuples_updated(rel_oid), 0);
+  hot := COALESCE(pg_stat_get_tuples_hot_updated(rel_oid), 0) +
+         COALESCE(pg_stat_get_xact_tuples_hot_updated(rel_oid), 0);
+
+  RETURN NEXT;
+END;
+$$ LANGUAGE plpgsql;
+-- Check if a tuple is part of a HOT chain (has a predecessor on same page)
+CREATE OR REPLACE FUNCTION has_hot_chain(rel_name text, target_ctid tid)
+RETURNS boolean AS $$
+DECLARE
+  block_num int;
+  page_item record;
+BEGIN
+  block_num := (target_ctid::text::point)[0]::int;
+
+  -- Look for a different tuple on the same page that points to our target tuple
+  FOR page_item IN
+    SELECT lp, lp_flags, t_ctid
+    FROM heap_page_items(get_raw_page(rel_name, block_num))
+    WHERE lp_flags = 1
+      AND t_ctid IS NOT NULL
+      AND t_ctid = target_ctid
+      AND ('(' || block_num::text || ',' || lp::text || ')')::tid != target_ctid
+  LOOP
+    RETURN true;
+  END LOOP;
+
+  RETURN false;
+END;
+$$ LANGUAGE plpgsql;
+-- Print the HOT chain starting from a given tuple
+CREATE OR REPLACE FUNCTION print_hot_chain(rel_name text, start_ctid tid)
+RETURNS TABLE(chain_position int, ctid tid, lp_flags text, t_ctid tid, chain_end boolean) AS
+$$
+#variable_conflict use_column
+DECLARE
+  block_num int;
+  line_ptr int;
+  current_ctid tid := start_ctid;
+  next_ctid tid;
+  position int := 0;
+  max_iterations int := 100;
+  page_item record;
+  found_predecessor boolean := false;
+  flags_name text;
+BEGIN
+  block_num := (start_ctid::text::point)[0]::int;
+
+  -- Find the predecessor (old tuple pointing to our start_ctid)
+  FOR page_item IN
+    SELECT lp, lp_flags, t_ctid
+    FROM heap_page_items(get_raw_page(rel_name, block_num))
+    WHERE lp_flags = 1
+      AND t_ctid = start_ctid
+  LOOP
+    current_ctid := ('(' || block_num::text || ',' || page_item.lp::text || ')')::tid;
+    found_predecessor := true;
+    EXIT;
+  END LOOP;
+
+  -- If no predecessor found, start with the given ctid
+  IF NOT found_predecessor THEN
+    current_ctid := start_ctid;
+  END IF;
+
+  -- Follow the chain forward
+  WHILE position < max_iterations LOOP
+    line_ptr := (current_ctid::text::point)[1]::int;
+
+    FOR page_item IN
+      SELECT lp, lp_flags, t_ctid
+      FROM heap_page_items(get_raw_page(rel_name, block_num))
+      WHERE lp = line_ptr
+    LOOP
+      -- Map lp_flags to names
+      flags_name := CASE page_item.lp_flags
+        WHEN 0 THEN 'unused (0)'
+        WHEN 1 THEN 'normal (1)'
+        WHEN 2 THEN 'redirect (2)'
+        WHEN 3 THEN 'dead (3)'
+        ELSE 'unknown (' || page_item.lp_flags::text || ')'
+      END;
+
+      RETURN QUERY SELECT
+        position,
+        current_ctid,
+        flags_name,
+        page_item.t_ctid,
+        (page_item.t_ctid IS NULL OR page_item.t_ctid = current_ctid)::boolean
+      ;
+
+      IF page_item.t_ctid IS NULL OR page_item.t_ctid = current_ctid THEN
+        RETURN;
+      END IF;
+
+      next_ctid := page_item.t_ctid;
+
+      IF (next_ctid::text::point)[0]::int != block_num THEN
+        RETURN;
+      END IF;
+
+      current_ctid := next_ctid;
+      position := position + 1;
+    END LOOP;
+
+    IF position = 0 THEN
+      RETURN;
+    END IF;
+  END LOOP;
+END;
+$$ LANGUAGE plpgsql;
+-- Trigger page pruning via table scan
+CREATE OR REPLACE FUNCTION heap_prune_page(rel_name text, target_ctid tid)
+RETURNS void AS $$
+DECLARE
+  block_num int;
+BEGIN
+  -- Extract block number from ctid
+  block_num := (target_ctid::text::point)[0]::int;
+
+  -- Scan only the specific page to trigger pruning on that page
+  EXECUTE 'SELECT COUNT(*) FROM ' || quote_ident(rel_name) ||
+           ' WHERE ctid >= (' || block_num || ',0) AND ctid < (' || (block_num + 1) || ',0)';
+END;
+$$ LANGUAGE plpgsql;
+-- Basic HOT update (update non-indexed column)
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    non_indexed_col text
+) USING heap;
+CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+INSERT INTO hot_test VALUES (1, 100, 'initial');
+INSERT INTO hot_test VALUES (2, 200, 'initial');
+INSERT INTO hot_test VALUES (3, 300, 'initial');
+-- Get baseline and initial ctid
+WITH initial_state AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'Initial State' AS phase,
+  initial_state.ctid,
+  (get_hot_count('hot_test')).updates,
+  (get_hot_count('hot_test')).hot
+FROM initial_state;
+     phase     | ctid  | updates | hot 
+---------------+-------+---------+-----
+ Initial State | (0,1) |       0 |   0
+(1 row)
+
+-- Should be HOT updates (only non-indexed column modified)
+UPDATE hot_test SET non_indexed_col = 'updated1' WHERE id = 1;
+UPDATE hot_test SET non_indexed_col = 'updated2' WHERE id = 2;
+UPDATE hot_test SET non_indexed_col = 'updated3' WHERE id = 3;
+-- Verify HOT updates occurred
+SELECT
+  'After Updates' AS phase,
+  (get_hot_count('hot_test')).updates,
+  (get_hot_count('hot_test')).hot;
+     phase     | updates | hot 
+---------------+---------+-----
+ After Updates |       3 |   3
+(1 row)
+
+-- Dump the HOT chain before pruning
+WITH current_tuple AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'Before VACUUM' AS phase,
+  has_hot_chain('hot_test', current_tuple.ctid) AS has_chain,
+  chain_position,
+  print_hot_chain.ctid,
+  lp_flags,
+  t_ctid
+FROM current_tuple,
+LATERAL print_hot_chain('hot_test', current_tuple.ctid);
+     phase     | has_chain | chain_position | ctid  |  lp_flags  | t_ctid 
+---------------+-----------+----------------+-------+------------+--------
+ Before VACUUM | t         |              0 | (0,1) | normal (1) | (0,4)
+ Before VACUUM | t         |              1 | (0,4) | normal (1) | (0,4)
+(2 rows)
+
+SET SESSION enable_seqscan = OFF;
+SET SESSION enable_bitmapscan = OFF;
+-- Verify indexes still work
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 100;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Index Scan using hot_test_indexed_idx on hot_test
+   Index Cond: (indexed_col = 100)
+(2 rows)
+
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 100;
+ id | indexed_col 
+----+-------------
+  1 |         100
+(1 row)
+
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 200;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Index Scan using hot_test_indexed_idx on hot_test
+   Index Cond: (indexed_col = 200)
+(2 rows)
+
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 200;
+ id | indexed_col 
+----+-------------
+  2 |         200
+(1 row)
+
+-- Vacuum the relation, expect the HOT chain to collapse
+VACUUM hot_test;
+-- Show that there is no chain after vacuum
+WITH current_tuple AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'After VACUUM' AS phase,
+  has_hot_chain('hot_test', current_tuple.ctid) AS has_chain,
+  chain_position,
+  print_hot_chain.ctid,
+  lp_flags,
+  t_ctid
+FROM current_tuple,
+LATERAL print_hot_chain('hot_test', current_tuple.ctid);
+    phase     | has_chain | chain_position | ctid  |  lp_flags  | t_ctid 
+--------------+-----------+----------------+-------+------------+--------
+ After VACUUM | f         |              0 | (0,4) | normal (1) | (0,4)
+(1 row)
+
+-- Non-HOT update (update indexed column)
+UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (4,3)
+(1 row)
+
+-- Verify index was updated (new value findable)
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 150;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Index Scan using hot_test_indexed_idx on hot_test
+   Index Cond: (indexed_col = 150)
+(2 rows)
+
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 150;
+ id | indexed_col 
+----+-------------
+  1 |         150
+(1 row)
+
+-- Verify old value no longer in index
+EXPLAIN (COSTS OFF) SELECT id FROM hot_test WHERE indexed_col = 100;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Index Scan using hot_test_indexed_idx on hot_test
+   Index Cond: (indexed_col = 100)
+(2 rows)
+
+SELECT id FROM hot_test WHERE indexed_col = 100;
+ id 
+----
+(0 rows)
+
+SET SESSION enable_seqscan = ON;
+SET SESSION enable_bitmapscan = ON;
+-- All-or-none property: updating one indexed column requires ALL index updates
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    col_a int,
+    col_b int,
+    col_c int,
+    non_indexed text
+) USING heap;
+CREATE INDEX hot_test_a_idx ON hot_test(col_a);
+CREATE INDEX hot_test_b_idx ON hot_test(col_b);
+CREATE INDEX hot_test_c_idx ON hot_test(col_c);
+INSERT INTO hot_test VALUES (1, 10, 20, 30, 'initial');
+-- Update only col_a - should NOT be HOT because an indexed column changed
+-- This means ALL indexes must be updated (all-or-none property)
+UPDATE hot_test SET col_a = 15 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,0)
+(1 row)
+
+-- Verify all three indexes still work correctly
+SELECT id, col_a FROM hot_test WHERE col_a = 15;  -- updated index
+ id | col_a 
+----+-------
+  1 |    15
+(1 row)
+
+SELECT id, col_b FROM hot_test WHERE col_b = 20;  -- unchanged index
+ id | col_b 
+----+-------
+  1 |    20
+(1 row)
+
+SELECT id, col_c FROM hot_test WHERE col_c = 30;  -- unchanged index
+ id | col_c 
+----+-------
+  1 |    30
+(1 row)
+
+-- Now update only non-indexed column - should be HOT
+UPDATE hot_test SET non_indexed = 'updated';
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,1)
+(1 row)
+
+-- Verify all indexes still work
+SELECT id FROM hot_test WHERE col_a = 15 AND col_b = 20 AND col_c = 30;
+ id 
+----
+  1
+(1 row)
+
+-- Partial index: both old and new outside predicate (conservative = non-HOT)
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    status text,
+    data text
+) USING heap;
+-- Partial index only covers status = 'active'
+CREATE INDEX hot_test_active_idx ON hot_test(status) WHERE status = 'active';
+INSERT INTO hot_test VALUES (1, 'active', 'data1');
+INSERT INTO hot_test VALUES (2, 'inactive', 'data2');
+INSERT INTO hot_test VALUES (3, 'deleted', 'data3');
+-- Update non-indexed column on 'active' row (in predicate, status unchanged)
+-- Should be HOT
+UPDATE hot_test SET data = 'updated1' WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Update non-indexed column on 'inactive' row (outside predicate)
+-- Should be HOT
+UPDATE hot_test SET data = 'updated2' WHERE id = 2;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,2)
+(1 row)
+
+-- Update status from 'inactive' to 'deleted' (both outside predicate)
+-- PostgreSQL is conservative: heap insert happens before predicate check
+-- So this is NON-HOT even though both values are outside predicate
+UPDATE hot_test SET status = 'deleted' WHERE id = 2;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (3,2)
+(1 row)
+
+-- Verify index still works for 'active' rows
+SELECT id, status FROM hot_test WHERE status = 'active';
+ id | status 
+----+--------
+  1 | active
+(1 row)
+
+-- Expression index with JSONB
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    metadata jsonb
+) USING heap;
+-- Index on JSONB expression
+CREATE INDEX hot_test_user_id_idx ON hot_test((metadata->>'user_id'));
+CREATE INDEX hot_test_status_idx ON hot_test((metadata->>'status'));
+INSERT INTO hot_test VALUES (1, '{"user_id": "123", "status": "active"}'::jsonb);
+-- Update JSONB field used in expression index to the same value,
+-- this will be HOT because the entire JSONB field is observed to
+-- be unchanged.
+UPDATE hot_test SET metadata = jsonb_set(metadata, '{user_id}', '"123"')
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Update JSONB field that is no used in any index to some new value, this
+-- will prevent a HOT update despite not changing what is used when forming
+-- the index key, this is counter intuitive and causes index bloat as well
+-- as slows down updates on JSONB data as any change will trigger all
+-- indexes to be updated.
+UPDATE hot_test SET metadata = jsonb_set(metadata, '{food}', '"apple"')
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,1)
+(1 row)
+
+-- Use a few different methods for mutating JSONB data, but don't modify
+-- indexed portions of the document.  None of these will be HOT.
+UPDATE hot_test SET metadata = jsonb_set(
+  jsonb_set(metadata, '{food}', '"pear"'),
+  '{timestamp}',
+  to_jsonb(now())
+)
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (3,1)
+(1 row)
+
+UPDATE hot_test
+SET metadata = metadata || '{"user_id": "123", "timestamp": "2024-01-01"}'::jsonb
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (4,1)
+(1 row)
+
+UPDATE hot_test SET metadata =
+  jsonb_set(
+    jsonb_set(metadata, '{user_id}', '"123"'),
+    '{fruit}',
+    '"plumb"'
+  );
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (5,1)
+(1 row)
+
+UPDATE hot_test SET metadata = metadata || jsonb_build_object(
+  'user_id', '123',
+  'timestamp', now(),
+  'fruit', 'honeydew'
+);
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (6,1)
+(1 row)
+
+-- Only BRIN (summarizing) indexes on non-PK columns
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    ts timestamp,
+    value int,
+    brin_col int
+) USING heap;
+CREATE INDEX hot_test_ts_brin ON hot_test USING brin(ts);
+CREATE INDEX hot_test_brin_col_brin ON hot_test USING brin(brin_col);
+INSERT INTO hot_test VALUES (1, '2024-01-01', 100, 1000);
+-- Update both BRIN columns - should still be HOT (only summarizing indexes)
+UPDATE hot_test SET ts = '2024-01-02', brin_col = 2000 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Verify BRIN indexes work
+SELECT id FROM hot_test WHERE ts >= '2024-01-02';
+ id 
+----
+  1
+(1 row)
+
+SELECT id FROM hot_test WHERE brin_col >= 2000;
+ id 
+----
+  1
+(1 row)
+
+-- Update non-indexed column - should also be HOT
+UPDATE hot_test SET value = 200 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,2)
+(1 row)
+
+-- TOAST and HOT: TOASTed columns can participate in HOT
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    large_text text,
+    small_text text
+) USING heap;
+CREATE INDEX hot_test_idx ON hot_test(indexed_col);
+-- Insert row with TOASTed column (> 2KB)
+INSERT INTO hot_test VALUES (1, 100, repeat('x', 3000), 'small');
+-- Update non-indexed, non-TOASTed column - should be HOT
+UPDATE hot_test SET small_text = 'updated';
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Update TOASTed column - should be HOT if indexed column unchanged
+UPDATE hot_test SET large_text = repeat('y', 3000);
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,2)
+(1 row)
+
+-- Verify index still works
+SELECT id FROM hot_test WHERE indexed_col = 100;
+ id 
+----
+  1
+(1 row)
+
+-- Update indexed column - should NOT be HOT
+UPDATE hot_test SET indexed_col = 200;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (3,2)
+(1 row)
+
+-- Verify index was updated
+SELECT id FROM hot_test WHERE indexed_col = 200;
+ id 
+----
+  1
+(1 row)
+
+-- Unique constraint (unique index) behaves like regular index
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    unique_col int UNIQUE,
+    data text
+) USING heap;
+INSERT INTO hot_test VALUES (1, 100, 'data1');
+INSERT INTO hot_test VALUES (2, 200, 'data2');
+-- Update data (non-indexed) - should be HOT
+UPDATE hot_test SET data = 'updated';
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,2)
+(1 row)
+
+-- Verify unique constraint still enforced
+SELECT id, unique_col, data FROM hot_test ORDER BY id;
+ id | unique_col |  data   
+----+------------+---------
+  1 |        100 | updated
+  2 |        200 | updated
+(2 rows)
+
+-- This should fail (unique violation)
+UPDATE hot_test SET unique_col = 100 WHERE id = 2;
+ERROR:  duplicate key value violates unique constraint "hot_test_unique_col_key"
+DETAIL:  Key (unique_col)=(100) already exists.
+-- Multi-column index: any column change = non-HOT
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    col_a int,
+    col_b int,
+    col_c int,
+    data text
+) USING heap;
+CREATE INDEX hot_test_ab_idx ON hot_test(col_a, col_b);
+INSERT INTO hot_test VALUES (1, 10, 20, 30, 'data');
+-- Update col_a (part of multi-column index) - should NOT be HOT
+UPDATE hot_test SET col_a = 15;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,0)
+(1 row)
+
+-- Reset
+UPDATE hot_test SET col_a = 10;
+-- Update col_b (part of multi-column index) - should NOT be HOT
+UPDATE hot_test SET col_b = 25;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (3,0)
+(1 row)
+
+-- Reset
+UPDATE hot_test SET col_b = 20;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (4,0)
+(1 row)
+
+-- Update col_c (not indexed) - should be HOT
+UPDATE hot_test SET col_c = 35;
+-- Update data (not indexed) - should be HOT
+UPDATE hot_test SET data = 'updated';
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (6,2)
+(1 row)
+
+-- Verify multi-column index works
+SELECT id FROM hot_test WHERE col_a = 10 AND col_b = 20;
+ id 
+----
+  1
+(1 row)
+
+-- Partitioned tables: HOT works within partitions
+DROP TABLE IF EXISTS hot_test_partitioned CASCADE;
+NOTICE:  table "hot_test_partitioned" does not exist, skipping
+CREATE TABLE hot_test_partitioned (
+    id int,
+    partition_key int,
+    indexed_col int,
+    data text,
+    PRIMARY KEY (id, partition_key)
+) PARTITION BY RANGE (partition_key) USING heap;
+CREATE TABLE hot_test_part1 PARTITION OF hot_test_partitioned
+    FOR VALUES FROM (1) TO (100) USING heap;
+CREATE TABLE hot_test_part2 PARTITION OF hot_test_partitioned
+    FOR VALUES FROM (100) TO (200) USING heap;
+CREATE INDEX hot_test_part_idx ON hot_test_partitioned(indexed_col);
+INSERT INTO hot_test_partitioned VALUES (1, 50, 100, 'initial1');
+INSERT INTO hot_test_partitioned VALUES (2, 150, 200, 'initial2');
+-- Update in partition 1 (non-indexed column) - should be HOT
+UPDATE hot_test_partitioned SET data = 'updated1' WHERE id = 1;
+-- Update in partition 2 (non-indexed column) - should be HOT
+UPDATE hot_test_partitioned SET data = 'updated2' WHERE id = 2;
+SELECT get_hot_count('hot_test_part1');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+SELECT get_hot_count('hot_test_part2');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Verify indexes work on partitions
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 100;
+ id 
+----
+  1
+(1 row)
+
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 200;
+ id 
+----
+  2
+(1 row)
+
+-- Update indexed column in partition - should NOT be HOT
+UPDATE hot_test_partitioned SET indexed_col = 150 WHERE id = 1;
+SELECT get_hot_count('hot_test_part1');
+ get_hot_count 
+---------------
+ (2,1)
+(1 row)
+
+-- Verify index was updated
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 150;
+ id 
+----
+  1
+(1 row)
+
+-- Cleanup
+DROP TABLE IF EXISTS hot_test;
+DROP TABLE IF EXISTS hot_test_partitioned CASCADE;
+DROP FUNCTION IF EXISTS has_hot_chain(text, tid);
+DROP FUNCTION IF EXISTS print_hot_chain(text, tid);
+DROP FUNCTION IF EXISTS heap_prune(text);
+NOTICE:  function heap_prune(text) does not exist, skipping
+DROP FUNCTION IF EXISTS get_hot_count(text);
+DROP EXTENSION pageinspect;
diff --git a/src/test/regress/expected/replica_identity_logging.out b/src/test/regress/expected/replica_identity_logging.out
new file mode 100644
index 00000000000..2096510b924
--- /dev/null
+++ b/src/test/regress/expected/replica_identity_logging.out
@@ -0,0 +1,396 @@
+--
+-- REPLICA_IDENTITY_LOGGING
+-- Test that replica identity keys are correctly extracted and logged for logical replication
+--
+-- This test verifies that the correct old key columns are included in WAL records
+-- for logical replication, based on the table's replica identity setting.
+--
+-- Clean up from prior runs
+DROP TABLE IF EXISTS repid_test CASCADE;
+NOTICE:  table "repid_test" does not exist, skipping
+-- Drop replication slot if it exists from prior run
+SELECT pg_drop_replication_slot('repid_test_slot') FROM pg_replication_slots WHERE slot_name = 'repid_test_slot';
+ pg_drop_replication_slot 
+--------------------------
+(0 rows)
+
+-- Enable logical decoding to verify what gets logged
+SELECT 'init' FROM pg_create_logical_replication_slot('repid_test_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- REPLICA IDENTITY DEFAULT (primary key columns only)
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    data text
+);
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+INSERT INTO repid_test VALUES (2, 200, 'initial');
+-- Advance slot to skip inserts (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                             data                                             
+----------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 indexed_col[integer]:100 data[text]:'initial'
+ table public.repid_test: INSERT: id[integer]:2 indexed_col[integer]:200 data[text]:'initial'
+(2 rows)
+
+-- Update non-key column - should log only id in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Update indexed non-key column - should still log only id in old key
+UPDATE repid_test SET indexed_col = 150 WHERE id = 2;
+-- Check logical decoding output - should see old key with only id
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                             data                                             
+----------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: id[integer]:1 indexed_col[integer]:100 data[text]:'updated'
+ table public.repid_test: UPDATE: id[integer]:2 indexed_col[integer]:150 data[text]:'initial'
+(2 rows)
+
+-- REPLICA IDENTITY FULL (all columns in old key)
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                             data                                             
+----------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 indexed_col[integer]:100 data[text]:'initial'
+(1 row)
+
+-- Update any column - should log ALL columns in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output - should see old key with all columns
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                                     data                                                                                     
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 indexed_col[integer]:100 data[text]:'initial' new-tuple: id[integer]:1 indexed_col[integer]:100 data[text]:'updated'
+(1 row)
+
+-- REPLICA IDENTITY USING INDEX (index columns only)
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int,
+    unique_col int UNIQUE NOT NULL,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_unique_col_key;
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+INSERT INTO repid_test VALUES (2, 200, 'initial');
+-- Advance slot past inserts (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                            data                                             
+---------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 unique_col[integer]:100 data[text]:'initial'
+ table public.repid_test: INSERT: id[integer]:2 unique_col[integer]:200 data[text]:'initial'
+(2 rows)
+
+-- Update non-indexed column - should log only unique_col in old key
+UPDATE repid_test SET data = 'updated' WHERE unique_col = 100;
+-- Update id (not in replica identity index) - should still log only unique_col
+UPDATE repid_test SET id = 10 WHERE unique_col = 200;
+-- Check logical decoding output - should see old key with only unique_col
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                             data                                             
+----------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: id[integer]:1 unique_col[integer]:100 data[text]:'updated'
+ table public.repid_test: UPDATE: id[integer]:10 unique_col[integer]:200 data[text]:'initial'
+(2 rows)
+
+-- REPLICA IDENTITY NOTHING (no old key logged)
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY NOTHING;
+INSERT INTO repid_test VALUES (1, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                data                                 
+---------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 data[text]:'initial'
+(1 row)
+
+-- Update - should log no old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output - should see update with no old key
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                data                                 
+---------------------------------------------------------------------
+ table public.repid_test: UPDATE: id[integer]:1 data[text]:'updated'
+(1 row)
+
+-- Multi-column index replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int,
+    col_a int NOT NULL,
+    col_b int NOT NULL,
+    col_c int,
+    data text,
+    UNIQUE (col_a, col_b)
+);
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_col_a_col_b_key;
+INSERT INTO repid_test VALUES (1, 10, 20, 30, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                           data                                                            
+---------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 col_a[integer]:10 col_b[integer]:20 col_c[integer]:30 data[text]:'initial'
+(1 row)
+
+-- Update non-indexed columns - should log col_a and col_b in old key
+UPDATE repid_test SET data = 'updated', col_c = 35 WHERE col_a = 10 AND col_b = 20;
+-- Check logical decoding output - should see old key with col_a and col_b
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                           data                                                            
+---------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: id[integer]:1 col_a[integer]:10 col_b[integer]:20 col_c[integer]:35 data[text]:'updated'
+(1 row)
+
+-- TOAST/external columns in replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    large_text text,
+    data text
+);
+-- REPLICA IDENTITY FULL includes toasted columns
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+-- Insert a large value (large enough to show the concept without excessive output)
+INSERT INTO repid_test VALUES (1, repeat('x', 100), 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                                                            data                                                                                             
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 large_text[text]:'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' data[text]:'initial'
+(1 row)
+
+-- Update small column - should still log large_text column in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output - verify both old and new values are logged
+-- Just check that UPDATE happened and includes both large_text and data columns
+SELECT COUNT(*) as update_count FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%' AND data LIKE '%large_text%' AND data LIKE '%old-key%';
+ update_count 
+--------------
+            1
+(1 row)
+
+-- Test TOAST columns with REPLICA IDENTITY USING INDEX
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_large_text text NOT NULL,
+    data text
+);
+-- Create unique index on the large text column
+CREATE UNIQUE INDEX repid_test_large_idx ON repid_test(indexed_large_text);
+-- Set replica identity to use the index (not FULL)
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_large_idx;
+-- Insert a large value (large enough to be TOASTed)
+INSERT INTO repid_test VALUES (1, repeat('x', 100000), 'initial');
+-- Advance slot past inserts
+SELECT COUNT(*) FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+ count 
+-------
+     1
+(1 row)
+
+-- Update non-indexed column - should still log indexed_large_text in old key
+-- despite being unmodified because it is TOASTed and in the replica key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Verify TOASTed indexed column part of the relica identity is logged in old key
+SELECT COUNT(*) AS toasted_index_logged FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%' AND data LIKE '%indexed_large_text%';
+ toasted_index_logged 
+----------------------
+                    1
+(1 row)
+
+-- Dropped columns and replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    dropped_col int,
+    kept_col int,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+INSERT INTO repid_test VALUES (1, 999, 100, 'initial');
+-- Drop a column
+ALTER TABLE repid_test DROP COLUMN dropped_col;
+-- Advance slot past insert and DDL (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                        data                                                        
+--------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 dropped_col[integer]:999 kept_col[integer]:100 data[text]:'initial'
+(1 row)
+
+-- Update - old key should handle dropped column
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                                  data                                                                                  
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 kept_col[integer]:100 data[text]:'initial' new-tuple: id[integer]:1 kept_col[integer]:100 data[text]:'updated'
+(1 row)
+
+-- DEFAULT replica identity with composite primary key
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id_a int,
+    id_b int,
+    indexed_col int,
+    data text,
+    PRIMARY KEY (id_a, id_b)
+);
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+INSERT INTO repid_test VALUES (1, 10, 100, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                      data                                                       
+-----------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id_a[integer]:1 id_b[integer]:10 indexed_col[integer]:100 data[text]:'initial'
+(1 row)
+
+-- Update non-key columns - should log both id_a and id_b in old key
+UPDATE repid_test SET data = 'updated', indexed_col = 150 WHERE id_a = 1 AND id_b = 10;
+-- Check logical decoding output - should see old key with both primary key columns
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                      data                                                       
+-----------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: id_a[integer]:1 id_b[integer]:10 indexed_col[integer]:150 data[text]:'updated'
+(1 row)
+
+-- Expression index and replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    email text NOT NULL,
+    data text
+);
+-- Create unique expression index
+CREATE UNIQUE INDEX repid_test_lower_email_idx ON repid_test(lower(email));
+-- Cannot use expression index for replica identity (should fail)
+-- PostgreSQL requires the index to be on simple column references
+-- This should produce an error
+DO $$
+BEGIN
+    ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_lower_email_idx;
+    RAISE EXCEPTION 'Should have failed - expression indexes cannot be used for replica identity';
+EXCEPTION
+    WHEN feature_not_supported THEN
+        RAISE NOTICE 'Correctly rejected expression index for replica identity';
+END$$;
+NOTICE:  Correctly rejected expression index for replica identity
+-- Use FULL instead
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+INSERT INTO repid_test VALUES (1, 'user@example.com', 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                data                                                
+----------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 email[text]:'user@example.com' data[text]:'initial'
+(1 row)
+
+-- Update - should log all columns in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                                           data                                                                                           
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 email[text]:'user@example.com' data[text]:'initial' new-tuple: id[integer]:1 email[text]:'user@example.com' data[text]:'updated'
+(1 row)
+
+-- NULL values in replica identity columns
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    nullable_col int,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+INSERT INTO repid_test VALUES (1, NULL, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                              data                                              
+------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 nullable_col[integer]:null data[text]:'initial'
+(1 row)
+
+-- Update - old key should include NULL value
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output - should see old key with NULL
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                         data                                                                          
+-------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 data[text]:'initial' new-tuple: id[integer]:1 nullable_col[integer]:null data[text]:'updated'
+(1 row)
+
+-- Generated columns and replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    base_col int,
+    generated_col int GENERATED ALWAYS AS (base_col * 2) STORED,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+INSERT INTO repid_test (id, base_col, data) VALUES (1, 50, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                        data                                                         
+---------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 base_col[integer]:50 generated_col[integer]:100 data[text]:'initial'
+(1 row)
+
+-- Update base_col - generated_col will change automatically
+UPDATE repid_test SET base_col = 60 WHERE id = 1;
+-- Check logical decoding output - should include old generated_col value
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                                                            data                                                                                                            
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 base_col[integer]:50 generated_col[integer]:100 data[text]:'initial' new-tuple: id[integer]:1 base_col[integer]:60 generated_col[integer]:120 data[text]:'initial'
+(1 row)
+
+-- Cleanup
+SELECT pg_drop_replication_slot('repid_test_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+DROP TABLE repid_test;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 549e9b2d7be..01ed43eba18 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -137,6 +137,13 @@ test: event_trigger_login
 # this test also uses event triggers, so likewise run it by itself
 test: fast_default
 
+# ----------
+# HOT updates and replica identity logging tests
+# Run these sequentially to avoid logical replication slot interference
+# ----------
+test: hot_updates
+test: replica_identity_logging
+
 # run tablespace test at the end because it drops the tablespace created during
 # setup that other tests may use.
 test: tablespace
diff --git a/src/test/regress/sql/hot_updates.sql b/src/test/regress/sql/hot_updates.sql
new file mode 100644
index 00000000000..7030f4fc6db
--- /dev/null
+++ b/src/test/regress/sql/hot_updates.sql
@@ -0,0 +1,553 @@
+--
+-- HOT_UPDATES
+-- Test Heap-Only Tuple (HOT) update decisions
+--
+-- This test systematically verifies that HOT updates are used when appropriate
+-- and avoided when necessary (e.g., when indexed columns are modified).
+--
+-- We use multiple validation methods:
+-- 1. Index verification (index still works = proves no index update for HOT)
+-- 2. Statistics functions (pg_stat_get_tuples_hot_updated)
+-- 3. pageinspect extension for HOT chain examination
+--
+
+-- Load required extensions
+CREATE EXTENSION IF NOT EXISTS pageinspect;
+
+-- Clean up from prior runs
+DROP TABLE IF EXISTS hot_test CASCADE;
+
+-- Function to get HOT update count
+CREATE OR REPLACE FUNCTION get_hot_count(rel_name text)
+RETURNS TABLE (
+    updates BIGINT,
+    hot BIGINT
+) AS $$
+DECLARE
+  rel_oid oid;
+BEGIN
+  rel_oid := rel_name::regclass::oid;
+  PERFORM pg_stat_force_next_flush();
+
+  updates := COALESCE(pg_stat_get_tuples_updated(rel_oid), 0) +
+             COALESCE(pg_stat_get_xact_tuples_updated(rel_oid), 0);
+  hot := COALESCE(pg_stat_get_tuples_hot_updated(rel_oid), 0) +
+         COALESCE(pg_stat_get_xact_tuples_hot_updated(rel_oid), 0);
+
+  RETURN NEXT;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Check if a tuple is part of a HOT chain (has a predecessor on same page)
+CREATE OR REPLACE FUNCTION has_hot_chain(rel_name text, target_ctid tid)
+RETURNS boolean AS $$
+DECLARE
+  block_num int;
+  page_item record;
+BEGIN
+  block_num := (target_ctid::text::point)[0]::int;
+
+  -- Look for a different tuple on the same page that points to our target tuple
+  FOR page_item IN
+    SELECT lp, lp_flags, t_ctid
+    FROM heap_page_items(get_raw_page(rel_name, block_num))
+    WHERE lp_flags = 1
+      AND t_ctid IS NOT NULL
+      AND t_ctid = target_ctid
+      AND ('(' || block_num::text || ',' || lp::text || ')')::tid != target_ctid
+  LOOP
+    RETURN true;
+  END LOOP;
+
+  RETURN false;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Print the HOT chain starting from a given tuple
+CREATE OR REPLACE FUNCTION print_hot_chain(rel_name text, start_ctid tid)
+RETURNS TABLE(chain_position int, ctid tid, lp_flags text, t_ctid tid, chain_end boolean) AS
+$$
+#variable_conflict use_column
+DECLARE
+  block_num int;
+  line_ptr int;
+  current_ctid tid := start_ctid;
+  next_ctid tid;
+  position int := 0;
+  max_iterations int := 100;
+  page_item record;
+  found_predecessor boolean := false;
+  flags_name text;
+BEGIN
+  block_num := (start_ctid::text::point)[0]::int;
+
+  -- Find the predecessor (old tuple pointing to our start_ctid)
+  FOR page_item IN
+    SELECT lp, lp_flags, t_ctid
+    FROM heap_page_items(get_raw_page(rel_name, block_num))
+    WHERE lp_flags = 1
+      AND t_ctid = start_ctid
+  LOOP
+    current_ctid := ('(' || block_num::text || ',' || page_item.lp::text || ')')::tid;
+    found_predecessor := true;
+    EXIT;
+  END LOOP;
+
+  -- If no predecessor found, start with the given ctid
+  IF NOT found_predecessor THEN
+    current_ctid := start_ctid;
+  END IF;
+
+  -- Follow the chain forward
+  WHILE position < max_iterations LOOP
+    line_ptr := (current_ctid::text::point)[1]::int;
+
+    FOR page_item IN
+      SELECT lp, lp_flags, t_ctid
+      FROM heap_page_items(get_raw_page(rel_name, block_num))
+      WHERE lp = line_ptr
+    LOOP
+      -- Map lp_flags to names
+      flags_name := CASE page_item.lp_flags
+        WHEN 0 THEN 'unused (0)'
+        WHEN 1 THEN 'normal (1)'
+        WHEN 2 THEN 'redirect (2)'
+        WHEN 3 THEN 'dead (3)'
+        ELSE 'unknown (' || page_item.lp_flags::text || ')'
+      END;
+
+      RETURN QUERY SELECT
+        position,
+        current_ctid,
+        flags_name,
+        page_item.t_ctid,
+        (page_item.t_ctid IS NULL OR page_item.t_ctid = current_ctid)::boolean
+      ;
+
+      IF page_item.t_ctid IS NULL OR page_item.t_ctid = current_ctid THEN
+        RETURN;
+      END IF;
+
+      next_ctid := page_item.t_ctid;
+
+      IF (next_ctid::text::point)[0]::int != block_num THEN
+        RETURN;
+      END IF;
+
+      current_ctid := next_ctid;
+      position := position + 1;
+    END LOOP;
+
+    IF position = 0 THEN
+      RETURN;
+    END IF;
+  END LOOP;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Trigger page pruning via table scan
+CREATE OR REPLACE FUNCTION heap_prune_page(rel_name text, target_ctid tid)
+RETURNS void AS $$
+DECLARE
+  block_num int;
+BEGIN
+  -- Extract block number from ctid
+  block_num := (target_ctid::text::point)[0]::int;
+
+  -- Scan only the specific page to trigger pruning on that page
+  EXECUTE 'SELECT COUNT(*) FROM ' || quote_ident(rel_name) ||
+           ' WHERE ctid >= (' || block_num || ',0) AND ctid < (' || (block_num + 1) || ',0)';
+END;
+$$ LANGUAGE plpgsql;
+
+-- Basic HOT update (update non-indexed column)
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    non_indexed_col text
+) USING heap;
+
+CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+
+INSERT INTO hot_test VALUES (1, 100, 'initial');
+INSERT INTO hot_test VALUES (2, 200, 'initial');
+INSERT INTO hot_test VALUES (3, 300, 'initial');
+
+-- Get baseline and initial ctid
+WITH initial_state AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'Initial State' AS phase,
+  initial_state.ctid,
+  (get_hot_count('hot_test')).updates,
+  (get_hot_count('hot_test')).hot
+FROM initial_state;
+
+-- Should be HOT updates (only non-indexed column modified)
+UPDATE hot_test SET non_indexed_col = 'updated1' WHERE id = 1;
+UPDATE hot_test SET non_indexed_col = 'updated2' WHERE id = 2;
+UPDATE hot_test SET non_indexed_col = 'updated3' WHERE id = 3;
+
+-- Verify HOT updates occurred
+SELECT
+  'After Updates' AS phase,
+  (get_hot_count('hot_test')).updates,
+  (get_hot_count('hot_test')).hot;
+
+-- Dump the HOT chain before pruning
+WITH current_tuple AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'Before VACUUM' AS phase,
+  has_hot_chain('hot_test', current_tuple.ctid) AS has_chain,
+  chain_position,
+  print_hot_chain.ctid,
+  lp_flags,
+  t_ctid
+FROM current_tuple,
+LATERAL print_hot_chain('hot_test', current_tuple.ctid);
+
+SET SESSION enable_seqscan = OFF;
+SET SESSION enable_bitmapscan = OFF;
+
+-- Verify indexes still work
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 100;
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 100;
+
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 200;
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 200;
+
+-- Vacuum the relation, expect the HOT chain to collapse
+VACUUM hot_test;
+
+-- Show that there is no chain after vacuum
+WITH current_tuple AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'After VACUUM' AS phase,
+  has_hot_chain('hot_test', current_tuple.ctid) AS has_chain,
+  chain_position,
+  print_hot_chain.ctid,
+  lp_flags,
+  t_ctid
+FROM current_tuple,
+LATERAL print_hot_chain('hot_test', current_tuple.ctid);
+
+-- Non-HOT update (update indexed column)
+UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Verify index was updated (new value findable)
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 150;
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 150;
+
+-- Verify old value no longer in index
+EXPLAIN (COSTS OFF) SELECT id FROM hot_test WHERE indexed_col = 100;
+SELECT id FROM hot_test WHERE indexed_col = 100;
+
+SET SESSION enable_seqscan = ON;
+SET SESSION enable_bitmapscan = ON;
+
+-- All-or-none property: updating one indexed column requires ALL index updates
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    col_a int,
+    col_b int,
+    col_c int,
+    non_indexed text
+) USING heap;
+
+CREATE INDEX hot_test_a_idx ON hot_test(col_a);
+CREATE INDEX hot_test_b_idx ON hot_test(col_b);
+CREATE INDEX hot_test_c_idx ON hot_test(col_c);
+
+INSERT INTO hot_test VALUES (1, 10, 20, 30, 'initial');
+
+-- Update only col_a - should NOT be HOT because an indexed column changed
+-- This means ALL indexes must be updated (all-or-none property)
+UPDATE hot_test SET col_a = 15 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Verify all three indexes still work correctly
+SELECT id, col_a FROM hot_test WHERE col_a = 15;  -- updated index
+SELECT id, col_b FROM hot_test WHERE col_b = 20;  -- unchanged index
+SELECT id, col_c FROM hot_test WHERE col_c = 30;  -- unchanged index
+
+-- Now update only non-indexed column - should be HOT
+UPDATE hot_test SET non_indexed = 'updated';
+SELECT get_hot_count('hot_test');
+
+-- Verify all indexes still work
+SELECT id FROM hot_test WHERE col_a = 15 AND col_b = 20 AND col_c = 30;
+
+-- Partial index: both old and new outside predicate (conservative = non-HOT)
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    status text,
+    data text
+) USING heap;
+
+-- Partial index only covers status = 'active'
+CREATE INDEX hot_test_active_idx ON hot_test(status) WHERE status = 'active';
+
+INSERT INTO hot_test VALUES (1, 'active', 'data1');
+INSERT INTO hot_test VALUES (2, 'inactive', 'data2');
+INSERT INTO hot_test VALUES (3, 'deleted', 'data3');
+
+-- Update non-indexed column on 'active' row (in predicate, status unchanged)
+-- Should be HOT
+UPDATE hot_test SET data = 'updated1' WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Update non-indexed column on 'inactive' row (outside predicate)
+-- Should be HOT
+UPDATE hot_test SET data = 'updated2' WHERE id = 2;
+SELECT get_hot_count('hot_test');
+
+-- Update status from 'inactive' to 'deleted' (both outside predicate)
+-- PostgreSQL is conservative: heap insert happens before predicate check
+-- So this is NON-HOT even though both values are outside predicate
+UPDATE hot_test SET status = 'deleted' WHERE id = 2;
+SELECT get_hot_count('hot_test');
+
+-- Verify index still works for 'active' rows
+SELECT id, status FROM hot_test WHERE status = 'active';
+
+-- Expression index with JSONB
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    metadata jsonb
+) USING heap;
+
+-- Index on JSONB expression
+CREATE INDEX hot_test_user_id_idx ON hot_test((metadata->>'user_id'));
+CREATE INDEX hot_test_status_idx ON hot_test((metadata->>'status'));
+
+INSERT INTO hot_test VALUES (1, '{"user_id": "123", "status": "active"}'::jsonb);
+
+-- Update JSONB field used in expression index to the same value,
+-- this will be HOT because the entire JSONB field is observed to
+-- be unchanged.
+UPDATE hot_test SET metadata = jsonb_set(metadata, '{user_id}', '"123"')
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Update JSONB field that is no used in any index to some new value, this
+-- will prevent a HOT update despite not changing what is used when forming
+-- the index key, this is counter intuitive and causes index bloat as well
+-- as slows down updates on JSONB data as any change will trigger all
+-- indexes to be updated.
+UPDATE hot_test SET metadata = jsonb_set(metadata, '{food}', '"apple"')
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Use a few different methods for mutating JSONB data, but don't modify
+-- indexed portions of the document.  None of these will be HOT.
+UPDATE hot_test SET metadata = jsonb_set(
+  jsonb_set(metadata, '{food}', '"pear"'),
+  '{timestamp}',
+  to_jsonb(now())
+)
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+UPDATE hot_test
+SET metadata = metadata || '{"user_id": "123", "timestamp": "2024-01-01"}'::jsonb
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+UPDATE hot_test SET metadata =
+  jsonb_set(
+    jsonb_set(metadata, '{user_id}', '"123"'),
+    '{fruit}',
+    '"plumb"'
+  );
+SELECT get_hot_count('hot_test');
+
+UPDATE hot_test SET metadata = metadata || jsonb_build_object(
+  'user_id', '123',
+  'timestamp', now(),
+  'fruit', 'honeydew'
+);
+SELECT get_hot_count('hot_test');
+
+-- Only BRIN (summarizing) indexes on non-PK columns
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    ts timestamp,
+    value int,
+    brin_col int
+) USING heap;
+
+CREATE INDEX hot_test_ts_brin ON hot_test USING brin(ts);
+CREATE INDEX hot_test_brin_col_brin ON hot_test USING brin(brin_col);
+
+INSERT INTO hot_test VALUES (1, '2024-01-01', 100, 1000);
+
+-- Update both BRIN columns - should still be HOT (only summarizing indexes)
+UPDATE hot_test SET ts = '2024-01-02', brin_col = 2000 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Verify BRIN indexes work
+SELECT id FROM hot_test WHERE ts >= '2024-01-02';
+SELECT id FROM hot_test WHERE brin_col >= 2000;
+
+-- Update non-indexed column - should also be HOT
+UPDATE hot_test SET value = 200 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- TOAST and HOT: TOASTed columns can participate in HOT
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    large_text text,
+    small_text text
+) USING heap;
+
+CREATE INDEX hot_test_idx ON hot_test(indexed_col);
+
+-- Insert row with TOASTed column (> 2KB)
+INSERT INTO hot_test VALUES (1, 100, repeat('x', 3000), 'small');
+
+-- Update non-indexed, non-TOASTed column - should be HOT
+UPDATE hot_test SET small_text = 'updated';
+SELECT get_hot_count('hot_test');
+
+-- Update TOASTed column - should be HOT if indexed column unchanged
+UPDATE hot_test SET large_text = repeat('y', 3000);
+SELECT get_hot_count('hot_test');
+
+-- Verify index still works
+SELECT id FROM hot_test WHERE indexed_col = 100;
+
+-- Update indexed column - should NOT be HOT
+UPDATE hot_test SET indexed_col = 200;
+SELECT get_hot_count('hot_test');
+
+-- Verify index was updated
+SELECT id FROM hot_test WHERE indexed_col = 200;
+
+-- Unique constraint (unique index) behaves like regular index
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    unique_col int UNIQUE,
+    data text
+) USING heap;
+
+INSERT INTO hot_test VALUES (1, 100, 'data1');
+INSERT INTO hot_test VALUES (2, 200, 'data2');
+
+-- Update data (non-indexed) - should be HOT
+UPDATE hot_test SET data = 'updated';
+SELECT get_hot_count('hot_test');
+
+-- Verify unique constraint still enforced
+SELECT id, unique_col, data FROM hot_test ORDER BY id;
+
+-- This should fail (unique violation)
+UPDATE hot_test SET unique_col = 100 WHERE id = 2;
+
+-- Multi-column index: any column change = non-HOT
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    col_a int,
+    col_b int,
+    col_c int,
+    data text
+) USING heap;
+
+CREATE INDEX hot_test_ab_idx ON hot_test(col_a, col_b);
+
+INSERT INTO hot_test VALUES (1, 10, 20, 30, 'data');
+
+-- Update col_a (part of multi-column index) - should NOT be HOT
+UPDATE hot_test SET col_a = 15;
+SELECT get_hot_count('hot_test');
+
+-- Reset
+UPDATE hot_test SET col_a = 10;
+
+-- Update col_b (part of multi-column index) - should NOT be HOT
+UPDATE hot_test SET col_b = 25;
+SELECT get_hot_count('hot_test');
+
+-- Reset
+UPDATE hot_test SET col_b = 20;
+SELECT get_hot_count('hot_test');
+
+-- Update col_c (not indexed) - should be HOT
+UPDATE hot_test SET col_c = 35;
+
+-- Update data (not indexed) - should be HOT
+UPDATE hot_test SET data = 'updated';
+SELECT get_hot_count('hot_test');
+
+-- Verify multi-column index works
+SELECT id FROM hot_test WHERE col_a = 10 AND col_b = 20;
+
+-- Partitioned tables: HOT works within partitions
+DROP TABLE IF EXISTS hot_test_partitioned CASCADE;
+
+CREATE TABLE hot_test_partitioned (
+    id int,
+    partition_key int,
+    indexed_col int,
+    data text,
+    PRIMARY KEY (id, partition_key)
+) PARTITION BY RANGE (partition_key) USING heap;
+
+CREATE TABLE hot_test_part1 PARTITION OF hot_test_partitioned
+    FOR VALUES FROM (1) TO (100) USING heap;
+CREATE TABLE hot_test_part2 PARTITION OF hot_test_partitioned
+    FOR VALUES FROM (100) TO (200) USING heap;
+
+CREATE INDEX hot_test_part_idx ON hot_test_partitioned(indexed_col);
+
+INSERT INTO hot_test_partitioned VALUES (1, 50, 100, 'initial1');
+INSERT INTO hot_test_partitioned VALUES (2, 150, 200, 'initial2');
+
+-- Update in partition 1 (non-indexed column) - should be HOT
+UPDATE hot_test_partitioned SET data = 'updated1' WHERE id = 1;
+
+-- Update in partition 2 (non-indexed column) - should be HOT
+UPDATE hot_test_partitioned SET data = 'updated2' WHERE id = 2;
+
+SELECT get_hot_count('hot_test_part1');
+SELECT get_hot_count('hot_test_part2');
+
+-- Verify indexes work on partitions
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 100;
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 200;
+
+-- Update indexed column in partition - should NOT be HOT
+UPDATE hot_test_partitioned SET indexed_col = 150 WHERE id = 1;
+SELECT get_hot_count('hot_test_part1');
+
+-- Verify index was updated
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 150;
+
+-- Cleanup
+DROP TABLE IF EXISTS hot_test;
+DROP TABLE IF EXISTS hot_test_partitioned CASCADE;
+DROP FUNCTION IF EXISTS has_hot_chain(text, tid);
+DROP FUNCTION IF EXISTS print_hot_chain(text, tid);
+DROP FUNCTION IF EXISTS heap_prune(text);
+DROP FUNCTION IF EXISTS get_hot_count(text);
+DROP EXTENSION pageinspect;
diff --git a/src/test/regress/sql/replica_identity_logging.sql b/src/test/regress/sql/replica_identity_logging.sql
new file mode 100644
index 00000000000..4c45e76e15d
--- /dev/null
+++ b/src/test/regress/sql/replica_identity_logging.sql
@@ -0,0 +1,349 @@
+--
+-- REPLICA_IDENTITY_LOGGING
+-- Test that replica identity keys are correctly extracted and logged for logical replication
+--
+-- This test verifies that the correct old key columns are included in WAL records
+-- for logical replication, based on the table's replica identity setting.
+--
+
+-- Clean up from prior runs
+DROP TABLE IF EXISTS repid_test CASCADE;
+
+-- Drop replication slot if it exists from prior run
+SELECT pg_drop_replication_slot('repid_test_slot') FROM pg_replication_slots WHERE slot_name = 'repid_test_slot';
+
+-- Enable logical decoding to verify what gets logged
+SELECT 'init' FROM pg_create_logical_replication_slot('repid_test_slot', 'test_decoding');
+
+-- REPLICA IDENTITY DEFAULT (primary key columns only)
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    data text
+);
+
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+INSERT INTO repid_test VALUES (2, 200, 'initial');
+
+-- Advance slot to skip inserts (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-key column - should log only id in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Update indexed non-key column - should still log only id in old key
+UPDATE repid_test SET indexed_col = 150 WHERE id = 2;
+
+-- Check logical decoding output - should see old key with only id
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- REPLICA IDENTITY FULL (all columns in old key)
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update any column - should log ALL columns in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output - should see old key with all columns
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- REPLICA IDENTITY USING INDEX (index columns only)
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int,
+    unique_col int UNIQUE NOT NULL,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_unique_col_key;
+
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+INSERT INTO repid_test VALUES (2, 200, 'initial');
+
+-- Advance slot past inserts (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-indexed column - should log only unique_col in old key
+UPDATE repid_test SET data = 'updated' WHERE unique_col = 100;
+
+-- Update id (not in replica identity index) - should still log only unique_col
+UPDATE repid_test SET id = 10 WHERE unique_col = 200;
+
+-- Check logical decoding output - should see old key with only unique_col
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- REPLICA IDENTITY NOTHING (no old key logged)
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY NOTHING;
+
+INSERT INTO repid_test VALUES (1, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update - should log no old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output - should see update with no old key
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- Multi-column index replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int,
+    col_a int NOT NULL,
+    col_b int NOT NULL,
+    col_c int,
+    data text,
+    UNIQUE (col_a, col_b)
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_col_a_col_b_key;
+
+INSERT INTO repid_test VALUES (1, 10, 20, 30, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-indexed columns - should log col_a and col_b in old key
+UPDATE repid_test SET data = 'updated', col_c = 35 WHERE col_a = 10 AND col_b = 20;
+
+-- Check logical decoding output - should see old key with col_a and col_b
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- TOAST/external columns in replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    large_text text,
+    data text
+);
+
+-- REPLICA IDENTITY FULL includes toasted columns
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+-- Insert a large value (large enough to show the concept without excessive output)
+INSERT INTO repid_test VALUES (1, repeat('x', 100), 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update small column - should still log large_text column in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output - verify both old and new values are logged
+-- Just check that UPDATE happened and includes both large_text and data columns
+SELECT COUNT(*) as update_count FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%' AND data LIKE '%large_text%' AND data LIKE '%old-key%';
+
+-- Test TOAST columns with REPLICA IDENTITY USING INDEX
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_large_text text NOT NULL,
+    data text
+);
+
+-- Create unique index on the large text column
+CREATE UNIQUE INDEX repid_test_large_idx ON repid_test(indexed_large_text);
+
+-- Set replica identity to use the index (not FULL)
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_large_idx;
+
+-- Insert a large value (large enough to be TOASTed)
+INSERT INTO repid_test VALUES (1, repeat('x', 100000), 'initial');
+
+-- Advance slot past inserts
+SELECT COUNT(*) FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-indexed column - should still log indexed_large_text in old key
+-- despite being unmodified because it is TOASTed and in the replica key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Verify TOASTed indexed column part of the relica identity is logged in old key
+SELECT COUNT(*) AS toasted_index_logged FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%' AND data LIKE '%indexed_large_text%';
+-- Dropped columns and replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    dropped_col int,
+    kept_col int,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+INSERT INTO repid_test VALUES (1, 999, 100, 'initial');
+
+-- Drop a column
+ALTER TABLE repid_test DROP COLUMN dropped_col;
+
+-- Advance slot past insert and DDL (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update - old key should handle dropped column
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- DEFAULT replica identity with composite primary key
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id_a int,
+    id_b int,
+    indexed_col int,
+    data text,
+    PRIMARY KEY (id_a, id_b)
+);
+
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+
+INSERT INTO repid_test VALUES (1, 10, 100, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-key columns - should log both id_a and id_b in old key
+UPDATE repid_test SET data = 'updated', indexed_col = 150 WHERE id_a = 1 AND id_b = 10;
+
+-- Check logical decoding output - should see old key with both primary key columns
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- Expression index and replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    email text NOT NULL,
+    data text
+);
+
+-- Create unique expression index
+CREATE UNIQUE INDEX repid_test_lower_email_idx ON repid_test(lower(email));
+
+-- Cannot use expression index for replica identity (should fail)
+-- PostgreSQL requires the index to be on simple column references
+-- This should produce an error
+DO $$
+BEGIN
+    ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_lower_email_idx;
+    RAISE EXCEPTION 'Should have failed - expression indexes cannot be used for replica identity';
+EXCEPTION
+    WHEN feature_not_supported THEN
+        RAISE NOTICE 'Correctly rejected expression index for replica identity';
+END$$;
+
+-- Use FULL instead
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+INSERT INTO repid_test VALUES (1, 'user@example.com', 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update - should log all columns in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- NULL values in replica identity columns
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    nullable_col int,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+INSERT INTO repid_test VALUES (1, NULL, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update - old key should include NULL value
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output - should see old key with NULL
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- Generated columns and replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    base_col int,
+    generated_col int GENERATED ALWAYS AS (base_col * 2) STORED,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+INSERT INTO repid_test (id, base_col, data) VALUES (1, 50, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update base_col - generated_col will change automatically
+UPDATE repid_test SET base_col = 60 WHERE id = 1;
+
+-- Check logical decoding output - should include old generated_col value
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- Cleanup
+SELECT pg_drop_replication_slot('repid_test_slot');
+DROP TABLE repid_test;
-- 
2.51.2

