From e89e1bd7c5057c4eef69ac5fa7ffffc8e6dd13c6 Mon Sep 17 00:00:00 2001 From: Ashutosh Sharma Date: Mon, 15 Jun 2026 10:22:47 +0000 Subject: [PATCH 3/3] Add FIRST N and N (...) priority syntax to synchronized_standby_slots Extend synchronized_standby_slots to support explicit priority forms aligned with synchronous_standby_names. - FIRST N (slot1, slot2, ...) - N (slot1, slot2, ...) as shorthand for FIRST N Implementation details: - Use the SYNC_REP_DEFAULT parser distinction from the earlier refactor so plain-list syntax remains separate from priority syntax. - Extend StandbySlotsHaveCaughtup() priority handling. - Select slots in list order. - Skip missing, logical, invalidated, and inactive lagging slots. - Wait for active lagging higher-priority slots. - Clarify duplicate handling for priority syntax in the synchronized_standby_slots documentation. - Simplify caught-up comments and clarify standby confirmation wait comments to match the final control flow. Tests and docs: - Add coverage for FIRST behavior and shorthand N (...) behavior. - Add plain-list disambiguation with first-prefixed slot names. - Add FIRST duplicate-entry recovery coverage to show duplicates do not create extra priority positions. - Update docs for FIRST and shorthand priority syntax semantics. - Clarify that duplicate slot names are ignored in priority-based forms and preserve first-occurrence order. Author: Satya Narlapuram Author: Ashutosh Sharma Reviewed-by: Shveta Malik Reviewed-by: Ajin Cherian Reviewed-by: Hou, Zhijie Reviewed-by: Dilip Kumar Reviewed-by: Surya Poondla Reviewed-by: Japin Li Reviewed-by: Shlok Kyal --- doc/src/sgml/config.sgml | 45 ++- src/backend/replication/slot.c | 44 +-- .../053_synchronized_standby_slots_quorum.pl | 262 ++++++++++++++++-- 3 files changed, 304 insertions(+), 47 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 1f176bd48f4..473f2641b90 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5194,6 +5194,7 @@ ANY num_sync ( num_sync ( slot_name [, ...] ) ANY num_sync ( slot_name [, ...] ) slot_name [, ...] @@ -5205,9 +5206,24 @@ ANY num_sync ( num_sync must be an integer value greater than zero and must not exceed the number of listed slots. - Other forms supported by - , such as priority - syntax, are not supported. + + + The keyword FIRST, coupled with + num_sync, specifies + priority-based semantics. Logical decoding will wait for the first + num_sync available + physical slots in priority order (the order they appear in the list). + Missing, logical, or invalidated slots are skipped. Inactive slots are + skipped only while they are lagging. However, if a slot exists and is + valid and active but has not yet caught up, the system will wait for it + rather than skipping to lower-priority slots. If, after skipping + unusable slots, fewer than + num_sync usable slots + remain, logical decoding waits until enough slots become usable and + caught up, or until the configuration is changed. The keyword + FIRST is optional in this form, so + 2 (slot1, slot2, slot3) and + FIRST 2 (slot1, slot2, slot3) are equivalent. A plain comma-separated list without a keyword specifies that @@ -5244,19 +5260,26 @@ ANY num_sync ( num_sync must not exceed the number of unique listed slots. Such a configuration results in an error to prevent indefinite waits in WAL sender processes due to a misconfigured synchronized_standby_slots setting. - - ANY is case-insensitive. - + + FIRST and ANY are case-insensitive. + If these keywords are used as the name of a replication slot, + the slot_name must + be double-quoted. + The use of synchronized_standby_slots guarantees that logical replication diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e06c428d853..dde241eefc9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -3068,6 +3068,8 @@ CompactSyncRepConfigMemberNames(SyncRepConfigData *config) * * slot1, slot2 -- wait for ALL listed slots * ANY N (slot1, slot2, ...) -- wait for any N-of-M (quorum) + * FIRST N (slot1, slot2, ...) -- wait for first N in priority order + * N (slot1, slot2, ...) -- shorthand for FIRST N * * Note: Simple list syntax is interpreted as "wait for ALL" for this GUC, * unlike synchronous_standby_names where it means "FIRST 1". @@ -3108,14 +3110,6 @@ check_synchronized_standby_slots(char **newval, void **extra, GucSource source) return false; } - if (syncrep_parse_result->syncrep_method == SYNC_REP_PRIORITY) - { - GUC_check_errcode(ERRCODE_INVALID_PARAMETER_VALUE); - GUC_check_errmsg("priority syntax is not supported for parameter \"%s\"", - "synchronized_standby_slots"); - return false; - } - if (syncrep_parse_result->num_sync <= 0) { GUC_check_errcode(ERRCODE_INVALID_PARAMETER_VALUE); @@ -3337,6 +3331,12 @@ ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo *slot_states, * Simple list (e.g., "slot1, slot2"): * ALL slots must have caught up. Returns false otherwise. * + * FIRST N (e.g., "FIRST 2 (slot1, slot2, slot3)"): + * Wait for the first N eligible slots in priority order. Skips missing, + * invalid, logical, and inactive-lagging slots to find N eligible slots. + * If an active slot is lagging, waits for it (does not skip to lower + * priority slots). + * * ANY N (e.g., "ANY 2 (slot1, slot2, slot3)"): * Wait for any N eligible slots. Skips missing, invalid, logical, and * lagging slots (inactive or active) to find N slots that have caught up. @@ -3387,11 +3387,14 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) * first slot that is missing/invalid/logical, or the first slot that is * lagging (inactive or active). * - * wait_for_all = false means we select N from M candidates (ANY N syntax). - * In this mode, slots already caught up are counted even if inactive, and - * lagging slots are skipped until enough slots have caught up. - * Duplicate configured slot names do not appear here because the check hook - * compacts them out of the parsed configuration. + * wait_for_all = false means we select N from M candidates (FIRST N or + * ANY N syntax). In this mode, slots already caught up are counted even if + * inactive. In FIRST N mode, we skip missing/invalid/logical slots and + * lagging inactive slots, but wait for an active lagging slot with higher + * priority. In ANY N mode, we skip lagging slots (inactive or active) to + * find any N that have caught up. Duplicate configured slot names do not + * appear here because the check hook compacts them out of the parsed + * configuration. */ required = synchronized_standby_slots_config->num_sync; wait_for_all = (synchronized_standby_slots_config->syncrep_method == SYNC_REP_DEFAULT); @@ -3474,8 +3477,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) * If a slot is inactive and lagging, report it as inactive. If it * is active and lagging, report it as lagging. * - * In ALL mode: must wait for it. In ANY N (quorum) mode: skip and - * use another slot. + * In ALL mode: must wait for it. In FIRST N (priority) mode: + * lagging active slots block, while inactive slots can be + * skipped. In ANY N (quorum) mode: skip and use another slot. */ slot_states[num_slot_states].slot_name = name; slot_states[num_slot_states].state = @@ -3483,7 +3487,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) slot_states[num_slot_states].restart_lsn = restart_lsn; num_slot_states++; - if (wait_for_all) + if (wait_for_all || + (!inactive && + synchronized_standby_slots_config->syncrep_method == SYNC_REP_PRIORITY)) break; goto next_slot; } @@ -3496,7 +3502,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) caught_up_slot_num++; - /* Stop processing if the required number of slots have caught up. */ + /* Stop once the required number of slots have caught up. */ if (caught_up_slot_num >= required) break; @@ -3511,8 +3517,8 @@ next_slot: * problem states and return false. * * We only emit messages when the requirement is not met to avoid - * misleading messages in quorum mode where other slots may have satisfied - * the condition despite some slots having issues. + * misleading messages in quorum/priority mode where other slots may have + * satisfied the condition despite some slots having issues. */ if (caught_up_slot_num < required) { diff --git a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl index d387e0c7e7e..a4153a44b37 100644 --- a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl +++ b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl @@ -4,6 +4,7 @@ # Test synchronized_standby_slots with different syntax modes: # - Plain list (ALL mode): slot1, slot2 # - ANY N (quorum mode): ANY N (slot1, slot2, ...) +# - FIRST N (priority mode): FIRST N (slot1, slot2, ...) # # Setup: a 3-node cluster with one primary, two physical standbys, and a # logical decoding client using a failover-enabled slot. @@ -200,16 +201,168 @@ is($decoded_bc, '1', 'plain list: works when all standbys are up'); ################################################## -# PART D: ANY 2 waits on an active lagging slot +# PART D: Verify FIRST N priority semantics ################################################## -# Stop standby1 so sb1_slot can be controlled by a raw replication connection -# that keeps the slot active while lagging. +# FIRST N should: +# 1. Select first N slots in priority order (list order) +# 2. Skip missing/invalid/logical slots and inactive lagging slots to find +# N caught-up slots +# 3. Wait for active lagging slots (not skip to lower priority) + +# Test FIRST 2 (sb1_slot, sb2_slot) with both up; should wait for both. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 2 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_2_both_up');" +); +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_e2 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%first_2_both_up%';}); +is($decoded_e2, '1', + 'FIRST 2: decoding works when all required slots are up'); + +# Test FIRST 1 (sb1_slot, sb2_slot) with sb1_slot unavailable. $standby1->stop; +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_1_skip_unavailable');" +); +$primary->wait_for_replay_catchup($standby2); + +# FIRST 1 should skip sb1_slot (unavailable) and use sb2_slot. +my $decoded_e1 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%first_1_skip_unavailable%';}); +is($decoded_e1, '1', + 'FIRST 1: skips unavailable first slot, uses second slot'); + +# Test shorthand priority syntax: N (...) means FIRST N (...). +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'num_1_shorthand_priority');" +); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_num1 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%num_1_shorthand_priority%';}); +is($decoded_num1, '1', + '1 (...): shorthand priority syntax behaves like FIRST 1'); + +################################################## +# PART E: FIRST 1 and ANY 2 wait on an active lagging slot +################################################## + +# Bring standby1 back so sb1_slot is active and caught up. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# To test the active-but-lagging slot path deterministically, we open a raw +# replication connection to sb1_slot starting from a deliberately old LSN. +# psql in replication mode never sends Standby Status Update messages, so +# the walsender keeps sb1_slot's active_pid set but restart_lsn never +# advances. + +# Stop standby1 so its walsender releases sb1_slot, allowing our replication +# connection below to acquire it. +$standby1->stop; + +# Capture a safely old LSN to stream from, before the test WAL record. my $old_lsn = $primary->safe_psql('postgres', "SELECT pg_current_wal_lsn();"); +# FIRST 1 must wait for the highest-priority slot when it is active but lagging. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +my $first_lag_lsn = $primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_1_lagging_blocks');" +); +$primary->wait_for_replay_catchup($standby2); + +# Open a raw replication connection to sb1_slot starting from $old_lsn. +# This activates the slot (active_pid IS NOT NULL) while keeping restart_lsn +# frozen below $first_lag_lsn for the lifetime of the connection. +my $repl_first = $primary->background_psql( + 'postgres', + replication => 'database', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$repl_first->query_until( + qr/^$/, + "START_REPLICATION SLOT sb1_slot PHYSICAL $old_lsn;\n"); + +# Wait until sb1_slot shows active_pid, confirming the walsender is live. +$primary->poll_query_until('postgres', q{ + SELECT active_pid IS NOT NULL + FROM pg_replication_slots + WHERE slot_name = 'sb1_slot' +}) or die "replication connection did not activate sb1_slot"; + +# sb1_slot is now active and its restart_lsn is behind $first_lag_lsn. +# Start logical decoding in the background; it must block. +my $bg_first = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg_first->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +ok( $primary->poll_query_until( + 'postgres', q{ +SELECT EXISTS ( + SELECT 1 + FROM pg_stat_activity + WHERE wait_event = 'WaitForStandbyConfirmation' + AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%' +); +}), + 'FIRST 1: decoding waits for active lagging higher-priority slot'); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg_first->quit; +$repl_first->quit; + +# Ensure the previous replication connection has fully released sb1_slot +# before reusing it in the next subtest. +$primary->poll_query_until('postgres', q{ + SELECT active_pid IS NULL + FROM pg_replication_slots + WHERE slot_name = 'sb1_slot' +}) or die "replication connection did not release sb1_slot"; + +# Consume the change so the slot is clean for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +# ANY 2 must also wait when only one of two required slots has caught up. +# Reuse the same technique: open a raw replication connection to sb1_slot +# from $old_lsn so it is active but its restart_lsn stays behind the target. + +# Capture another old LSN baseline before the next test WAL record. +$old_lsn = $primary->safe_psql('postgres', + "SELECT pg_current_wal_lsn();"); + $primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "'ANY 2 (sb1_slot, sb2_slot)'"); $primary->reload; @@ -272,7 +425,54 @@ $primary->wait_for_replay_catchup($standby1); ################################################## -# PART E: Duplicate entries are ignored for quorum counting +# PART F: Plain list with first-prefixed slot name still means ALL mode +################################################## + +# Create a slot name starting with "first_" for parser disambiguation checks. +$primary->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('first_slot');"); + +# If simple-list syntax starts with a slot name like "first_slot", it must +# still be treated as ALL mode (not as explicit FIRST N syntax). +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'first_slot, sb2_slot'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_prefix_all_mode_blocks');" +); +$primary->wait_for_replay_catchup($standby2); + +$log_offset = -s $primary->logfile; + +$bg = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +# Plain list must require all listed slots; first_slot is intentionally inactive. +$primary->wait_for_log( + qr/replication slot \"first_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/, + $log_offset); + +pass('plain list with first-prefixed slot name blocks in ALL mode'); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg->quit; + +# Consume the change for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +################################################## +# PART G: Duplicate entries are ignored for quorum counting ################################################## # Stop standby2 so only sb1_slot can catch up. @@ -313,6 +513,46 @@ $primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); $primary->reload; $bg_dup->quit; +# Consume the change for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +# FIRST duplicates must also not create extra priority positions. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 2 (sb1_slot, sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_duplicate_entries_ignored');" +); +$primary->wait_for_replay_catchup($standby1); + +my $bg_first_dup = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg_first_dup->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +ok( $primary->poll_query_until( + 'postgres', q{ +SELECT EXISTS ( + SELECT 1 + FROM pg_stat_activity + WHERE wait_event = 'WaitForStandbyConfirmation' + AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%' +); +}), + 'FIRST duplicates are ignored when counting priority slots'); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg_first_dup->quit; + # Consume the change for the next test. $primary->safe_psql('postgres', q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); @@ -323,7 +563,7 @@ $primary->wait_for_replay_catchup($standby2); ################################################## -# PART F: Verify GUC validation rejects bad values +# PART H: Verify GUC validation rejects bad values ################################################## my ($result, $stdout, $stderr); @@ -340,18 +580,6 @@ like($stderr, qr/ERROR/, like($stderr, qr/ERROR/, 'GUC rejects malformed ANY syntax'); -# Priority syntax is not supported by synchronized_standby_slots yet -($result, $stdout, $stderr) = $primary->psql('postgres', - "ALTER SYSTEM SET synchronized_standby_slots = 'FIRST 1 (sb1_slot, sb2_slot)';"); -like($stderr, qr/priority syntax is not supported/, - 'GUC rejects FIRST syntax'); - -# Legacy priority syntax is not supported by synchronized_standby_slots yet -($result, $stdout, $stderr) = $primary->psql('postgres', - "ALTER SYSTEM SET synchronized_standby_slots = '1 (sb1_slot, sb2_slot)';"); -like($stderr, qr/priority syntax is not supported/, - 'GUC rejects legacy priority syntax'); - # Invalid slot name ($result, $stdout, $stderr) = $primary->psql('postgres', "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (INVALID_UPPER)';"); -- 2.43.0