From 8569bc9dcca5986fe86ff8343cdb4c582ef2ce3b Mon Sep 17 00:00:00 2001 From: Ayush Tiwari Date: Tue, 28 Apr 2026 16:37:15 +0530 Subject: [PATCH v5] Fix stale relation close in sequence synchronization copy_sequences() processes publisher sequence data one result row at a time, but kept the Relation pointer used for the local sequence at batch scope. If pg_get_sequence_data() returned NULL sequence data for a later row, get_and_validate_seq_info() could return COPYSEQ_SKIPPED before assigning a new relation to that output argument. The caller could then close the stale Relation pointer left from the previous row, tripping the relcache refcount assertion in assert-enabled builds. This can happen due to concurrent drops or insufficient privileges, since pg_get_sequence_data() returns NULL data in those cases. Keep the caller-side Relation pointer local to each publisher result row, so early return paths cannot reuse a relation from a previous row. Report publisher-side sequences that are not returned as missing or inaccessible, rather than only missing, since the same condition can also be caused by privileges. Add a subscription test that reuses the existing subscription and verifies insufficient privileges on one published sequence do not disrupt the subscriber. --- .../replication/logical/sequencesync.c | 52 +++++++++---------- src/test/subscription/t/036_sequences.pl | 41 ++++++++++++++- 2 files changed, 66 insertions(+), 27 deletions(-) diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index ec7e76abf93..7293cbcde3e 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -167,17 +167,17 @@ get_sequences_string(List *seqindexes, StringInfo buf) * the publisher and subscriber. Emits warnings for: * a) mismatched definitions or concurrent rename * b) insufficient privileges - * c) missing sequences on the subscriber + * c) missing or inaccessible sequences on the publisher * Then raises an ERROR to indicate synchronization failure. */ static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, - List *missing_seqs_idx) + List *unavailable_seqs_idx) { StringInfoData seqstr; /* Quick exit if there are no errors to report */ - if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx) + if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !unavailable_seqs_idx) return; initStringInfo(&seqstr); @@ -204,14 +204,14 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, seqstr.data)); } - if (missing_seqs_idx) + if (unavailable_seqs_idx) { - get_sequences_string(missing_seqs_idx, &seqstr); + get_sequences_string(unavailable_seqs_idx, &seqstr); ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg_plural("missing sequence on publisher (%s)", - "missing sequences on publisher (%s)", - list_length(missing_seqs_idx), + errmsg_plural("missing or inaccessible sequence on publisher (%s)", + "missing or inaccessible sequences on publisher (%s)", + list_length(unavailable_seqs_idx), seqstr.data)); } @@ -254,8 +254,8 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx); /* - * last_value can be NULL if the sequence was dropped concurrently (see - * pg_get_sequence_data()). + * The sequence data can be NULL due to insufficient privileges or if the + * sequence was dropped concurrently (see pg_get_sequence_data()). */ datum = slot_getattr(slot, ++col, &isnull); if (isnull) @@ -386,7 +386,7 @@ copy_sequences(WalReceiverConn *conn) int cur_batch_base_index = 0; int n_seqinfos = list_length(seqinfos); List *mismatched_seqs_idx = NIL; - List *missing_seqs_idx = NIL; + List *unavailable_seqs_idx = NIL; List *insuffperm_seqs_idx = NIL; StringInfoData seqstr; StringInfoData cmd; @@ -410,8 +410,7 @@ copy_sequences(WalReceiverConn *conn) int batch_mismatched_count = 0; int batch_skipped_count = 0; int batch_insuffperm_count = 0; - int batch_missing_count; - Relation sequence_rel = NULL; + int batch_unavailable_count; WalRcvExecResult *res; TupleTableSlot *slot; @@ -494,6 +493,7 @@ copy_sequences(WalReceiverConn *conn) { CopySeqResult sync_status; LogicalRepSequenceInfo *seqinfo; + Relation sequence_rel = NULL; int seqidx; CHECK_FOR_INTERRUPTS(); @@ -573,45 +573,45 @@ copy_sequences(WalReceiverConn *conn) resetStringInfo(&seqstr); resetStringInfo(&cmd); - batch_missing_count = batch_size - (batch_succeeded_count + - batch_mismatched_count + - batch_insuffperm_count + - batch_skipped_count); + batch_unavailable_count = batch_size - (batch_succeeded_count + + batch_mismatched_count + + batch_insuffperm_count + + batch_skipped_count); elog(DEBUG1, - "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped", + "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing or inaccessible on publisher, %d skipped", MySubscription->name, (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1, batch_size, batch_succeeded_count, batch_mismatched_count, - batch_insuffperm_count, batch_missing_count, batch_skipped_count); + batch_insuffperm_count, batch_unavailable_count, batch_skipped_count); /* Commit this batch, and prepare for next batch */ CommitTransactionCommand(); - if (batch_missing_count) + if (batch_unavailable_count) { for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++) { LogicalRepSequenceInfo *seqinfo = (LogicalRepSequenceInfo *) list_nth(seqinfos, idx); - /* If the sequence was not found on publisher, record it */ + /* If the sequence was not returned by the publisher, record it */ if (!seqinfo->found_on_pub) - missing_seqs_idx = lappend_int(missing_seqs_idx, idx); + unavailable_seqs_idx = lappend_int(unavailable_seqs_idx, idx); } } /* * cur_batch_base_index is not incremented sequentially because some - * sequences may be missing, and the number of fetched rows may not - * match the batch size. + * sequences may be unavailable on the publisher, and the number of + * fetched rows may not match the batch size. */ cur_batch_base_index += batch_size; } - /* Report mismatches, permission issues, or missing sequences */ + /* Report mismatches, permission issues, or unavailable sequences */ report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx, - missing_seqs_idx); + unavailable_seqs_idx); } /* diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl index 471780a3585..513f219503b 100644 --- a/src/test/subscription/t/036_sequences.pl +++ b/src/test/subscription/t/036_sequences.pl @@ -218,7 +218,46 @@ $node_subscriber->wait_for_log( $node_publisher->safe_psql('postgres', qq(DROP SEQUENCE regress_s4;)); $node_subscriber->wait_for_log( - qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)/, + qr/WARNING: ( [A-Z0-9]+:)? missing or inaccessible sequence on publisher \("public.regress_s4"\)/, + $log_offset); + +# Recreate regress_s4 so later tests that reuse the subscription do not keep +# reporting the intentionally-missing sequence from the previous test. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SEQUENCE regress_s4 START 10 INCREMENT 2; +)); + +########## +# Ensure that insufficient privileges on the publisher for a sequence do not +# disrupt the subscriber. The subscriber should log a warning and continue +# retrying. +########## + +$node_publisher->safe_psql( + 'postgres', qq( + CREATE ROLE regress_seq_repl LOGIN REPLICATION; + GRANT USAGE ON SCHEMA public TO regress_seq_repl; + GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO regress_seq_repl; + REVOKE ALL ON SEQUENCE regress_s2 FROM regress_seq_repl; +)); + +my $publisher_limited_connstr = + $node_publisher->connstr . ' dbname=postgres user=regress_seq_repl'; +$log_offset = -s $node_subscriber->logfile; + +$node_subscriber->safe_psql( + 'postgres', + "ALTER SUBSCRIPTION regress_seq_sub CONNECTION '$publisher_limited_connstr'" +); + +$node_subscriber->safe_psql( + 'postgres', + "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES" +); + +$node_subscriber->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? missing or inaccessible sequence on publisher \("public.regress_s2"\)/, $log_offset); done_testing(); -- 2.34.1