From 673c54267064e17a35a1907644bf6859759827b0 Mon Sep 17 00:00:00 2001 From: Ashutosh Sharma Date: Thu, 4 Jun 2026 07:09:08 +0000 Subject: [PATCH 2/3] Add ANY N semantics to synchronized_standby_slots Extend synchronized_standby_slots with quorum syntax for logical failover slot synchronization: - ANY N (slot1, slot2, ...) Plain-list semantics are preserved as-is: - slot1, slot2 continues to mean all listed slots are required Implementation details: - Reuse syncrep parser infrastructure in the GUC check hook and map parsed output into synchronized_standby_slots semantics. - Consume SYNC_REP_DEFAULT from the preparatory parser refactor to distinguish plain-list syntax from explicit parser modes. - In StandbySlotsHaveCaughtup(), enforce mode-specific behavior for: - existing all-listed-slots semantics (plain list) - quorum N-of-M behavior (ANY N) - Validation rejects configurations where N exceeds the number of listed slots. - Ignore duplicate synchronized_standby_slots entries, preserving the first occurrence and applying semantics to the resulting unique list. - Clarify synchronized_standby_slots comments and lagging restart_lsn reporting to match the implemented behavior. Tests and docs: - Add recovery coverage for plain-list behavior and ANY quorum behavior, including lagging-slot and validation-error scenarios. - Add duplicate-entry recovery coverage for synchronized_standby_slots. - Document ANY syntax and clarify plain-list behavior for this GUC. - Document that duplicate slot names are ignored and counted only once. Author: Satya Narlapuram Author: Ashutosh Sharma Reviewed-by: Shveta Malik Reviewed-by: Ajin Cherian Reviewed-by: Hou, Zhijie Reviewed-by: Dilip Kumar Reviewed-by: Surya Poondla Reviewed-by: Japin Li --- doc/src/sgml/config.sgml | 81 ++- src/backend/replication/slot.c | 496 ++++++++++++++---- .../053_synchronized_standby_slots_quorum.pl | 378 +++++++++++++ 3 files changed, 830 insertions(+), 125 deletions(-) create mode 100644 src/test/recovery/t/053_synchronized_standby_slots_quorum.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index cebae4b6d1b..a03ae94a12f 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5190,17 +5190,78 @@ ANY num_sync ( num_sync ( slot_name [, ...] ) +slot_name [, ...] + + where num_sync is + the number of physical replication slots that must confirm WAL + receipt before logical decoding proceeds, + and slot_name + is the name of a physical replication slot. + num_sync + must be an integer value greater than zero and must not exceed the + number of listed slots. + Other forms supported by + , such as priority + syntax, are not supported. + + + If the same physical replication slot name appears more than once, + duplicate entries are ignored and only the first occurrence is used. + The semantics of synchronized_standby_slots are + therefore based on the unique set of listed slot names, preserving the + original order of first occurrence. In particular, + num_sync must not exceed + the number of unique listed slots. + + + A plain comma-separated list without a keyword specifies that + all listed physical slots must confirm WAL + receipt. This differs from + where a simple list means FIRST 1. For + synchronized_standby_slots, requiring all slots + provides safer failover semantics by default. + + + The keyword ANY, coupled with + num_sync, specifies + quorum-based semantics. Logical decoding proceeds once at least + num_sync of the listed + slots have caught up. Missing, logical, and invalidated slots are + skipped when determining candidates. Lagging slots (inactive or + active) simply do not count toward the required number until they + catch up. + If fewer than num_sync + slots have caught up at a given moment, logical decoding waits until + that threshold is reached. + i.e., there is no priority ordering. + For example, a setting of ANY 1 (sb1_slot, sb2_slot) + allows logical decoding to proceed as soon as either physical slot has + confirmed WAL receipt. If none of the slots are available or have + caught up, logical decoding waits until at least one slot meets the + required condition. This is useful in conjunction with + quorum-based synchronous replication + (synchronous_standby_names = 'ANY ...'), so that + logical decoding availability matches the commit durability guarantee. + + + ANY is case-insensitive. + + + The use of synchronized_standby_slots guarantees + that logical replication failover slots do not consume changes until those changes are received - and flushed to corresponding physical standbys. If a + and flushed to the required physical standbys. If a logical replication connection is meant to switch to a physical standby after the standby is promoted, the physical replication slot for the standby should be listed here. Note that logical replication will not - proceed if the slots specified in the - synchronized_standby_slots do not exist or are invalidated. + proceed if the required number of physical slots specified in + synchronized_standby_slots do not exist or are + invalidated. Additionally, the replication management functions pg_replication_slot_advance, @@ -5208,9 +5269,9 @@ ANY num_sync ( pg_logical_slot_peek_changes, - when used with logical failover slots, will block until all - physical slots specified in synchronized_standby_slots have - confirmed WAL receipt. + when used with logical failover slots, will block until the required + physical slots specified in synchronized_standby_slots + have confirmed WAL receipt. The standbys corresponding to the physical replication slots in diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index d7fb9f5a67f..b89becaf6ba 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -50,6 +50,7 @@ #include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/slot.h" +#include "replication/syncrep.h" #include "replication/walsender_private.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -91,11 +92,19 @@ typedef struct ReplicationSlotOnDisk * Note: this must be a flat representation that can be held in a single chunk * of guc_malloc'd memory, so that it can be stored as the "extra" data for the * synchronized_standby_slots GUC. + * + * The layout mirrors SyncRepConfigData so that the same quorum and priority + * semantics can be expressed. The syncrep_method field uses the + * SYNC_REP_DEFAULT, SYNC_REP_PRIORITY, and SYNC_REP_QUORUM constants from + * syncrep.h. */ typedef struct { - /* Number of slot names in the slot_names[] */ - int nslotnames; + int config_size; /* total size of this struct, in bytes */ + int num_sync; /* number of slots that must confirm WAL + * receipt before logical decoding proceeds */ + uint8 syncrep_method; /* SYNC_REP_* method */ + int nslotnames; /* number of slot names that follow */ /* * slot_names contains 'nslotnames' consecutive null-terminated C strings. @@ -103,6 +112,28 @@ typedef struct char slot_names[FLEXIBLE_ARRAY_MEMBER]; } SyncStandbySlotsConfigData; +/* + * State of a replication slot specified in synchronized_standby_slots GUC. + */ +typedef enum +{ + SS_SLOT_NOT_FOUND, /* slot does not exist */ + SS_SLOT_LOGICAL, /* slot is logical, not physical */ + SS_SLOT_INVALIDATED, /* slot has been invalidated */ + SS_SLOT_INACTIVE_LAGGING, /* slot is inactive and behind wait_for_lsn */ + SS_SLOT_ACTIVE_LAGGING, /* slot is active and behind wait_for_lsn */ +} SyncStandbySlotsState; + +/* + * Information about a synchronized standby slot's state. + */ +typedef struct +{ + const char *slot_name; /* name of the slot */ + SyncStandbySlotsState state; /* state of the slot */ + XLogRecPtr restart_lsn; /* current restart_lsn (valid for lagging states) */ +} SyncStandbySlotsStateInfo; + /* * Lookup table for slot invalidation causes. */ @@ -2963,94 +2994,190 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause) } /* - * A helper function to validate slots specified in GUC synchronized_standby_slots. + * Remove duplicate member names from a flat SyncRepConfigData in place. * - * The rawname will be parsed, and the result will be saved into *elemlist. + * The first occurrence of each name is kept and input order is preserved. */ -static bool -validate_sync_standby_slots(char *rawname, List **elemlist) +static void +CompactSyncRepConfigMemberNames(SyncRepConfigData *config) { - /* Verify syntax and parse string into a list of identifiers */ - if (!SplitIdentifierString(rawname, ',', elemlist)) - { - GUC_check_errdetail("List syntax is invalid."); - return false; - } + char *src_name; + char *dst_name; + int unique_members = 0; + Size unique_size = offsetof(SyncRepConfigData, member_names); + + src_name = config->member_names; + dst_name = config->member_names; - /* Iterate the list to validate each slot name */ - foreach_ptr(char, name, *elemlist) + for (int i = 0; i < config->nmembers; i++) { - int err_code; - char *err_msg = NULL; - char *err_hint = NULL; + char *existing_name; + size_t name_size; + bool duplicate = false; - if (!ReplicationSlotValidateNameInternal(name, false, &err_code, - &err_msg, &err_hint)) + name_size = strlen(src_name) + 1; + existing_name = config->member_names; + + for (int j = 0; j < unique_members; j++) { - GUC_check_errcode(err_code); - GUC_check_errdetail("%s", err_msg); - if (err_hint != NULL) - GUC_check_errhint("%s", err_hint); - return false; + if (strcmp(existing_name, src_name) == 0) + { + duplicate = true; + break; + } + + existing_name += strlen(existing_name) + 1; } + + if (!duplicate) + { + if (dst_name != src_name) + memmove(dst_name, src_name, name_size); + + dst_name += name_size; + unique_members++; + unique_size += name_size; + } + + src_name += name_size; } - return true; + config->nmembers = unique_members; + config->config_size = (int) unique_size; } /* * GUC check_hook for synchronized_standby_slots + * + * This reuses the syncrep_yyparse/syncrep_scanner infrastructure that is + * also used for synchronous_standby_names, and accepts these forms: + * + * slot1, slot2 -- wait for ALL listed slots + * ANY N (slot1, slot2, ...) -- wait for any N-of-M (quorum) + * + * Note: Simple list syntax is interpreted as "wait for ALL" for this GUC, + * unlike synchronous_standby_names where it means "FIRST 1". + * + * After parsing, we validate every name as a legal replication slot name, + * omit duplicate entries while preserving first-occurrence order, and then + * apply the resulting unique list to the configured semantics. */ bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source) { - char *rawname; - char *ptr; - List *elemlist; - int size; - bool ok; - SyncStandbySlotsConfigData *config; - - if ((*newval)[0] == '\0') - return true; + if (*newval != NULL && (*newval)[0] != '\0') + { + yyscan_t scanner; + int parse_rc; + SyncStandbySlotsConfigData *config; + const char *mname; + + /* Result of parsing is returned in one of these two variables */ + SyncRepConfigData *syncrep_parse_result = NULL; + char *syncrep_parse_error_msg = NULL; + + /* Parse the synchronized standby slots configuration */ + syncrep_scanner_init(*newval, &scanner); + parse_rc = syncrep_yyparse(&syncrep_parse_result, + &syncrep_parse_error_msg, + scanner); + syncrep_scanner_finish(scanner); + + if (parse_rc != 0 || syncrep_parse_result == NULL) + { + GUC_check_errcode(ERRCODE_SYNTAX_ERROR); + if (syncrep_parse_error_msg) + GUC_check_errdetail("%s", syncrep_parse_error_msg); + else + GUC_check_errdetail("\"%s\" parser failed.", + "synchronized_standby_slots"); + return false; + } - /* Need a modifiable copy of the GUC string */ - rawname = pstrdup(*newval); + if (syncrep_parse_result->syncrep_method == SYNC_REP_PRIORITY) + { + GUC_check_errcode(ERRCODE_INVALID_PARAMETER_VALUE); + GUC_check_errmsg("priority syntax is not supported for parameter \"%s\"", + "synchronized_standby_slots"); + return false; + } - /* Now verify if the specified slots exist and have correct type */ - ok = validate_sync_standby_slots(rawname, &elemlist); + if (syncrep_parse_result->num_sync <= 0) + { + GUC_check_errmsg("number of synchronized standby slots (%d) must be greater than zero", + syncrep_parse_result->num_sync); + return false; + } - if (!ok || elemlist == NIL) - { - pfree(rawname); - list_free(elemlist); - return ok; - } + /* validate every member name as a slot name */ + mname = syncrep_parse_result->member_names; - /* Compute the size required for the SyncStandbySlotsConfigData struct */ - size = offsetof(SyncStandbySlotsConfigData, slot_names); - foreach_ptr(char, slot_name, elemlist) - size += strlen(slot_name) + 1; + for (int i = 0; i < syncrep_parse_result->nmembers; i++) + { + int err_code; + char *err_msg = NULL; + char *err_hint = NULL; - /* GUC extra value must be guc_malloc'd, not palloc'd */ - config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size); - if (!config) - return false; + if (!ReplicationSlotValidateNameInternal(mname, false, &err_code, + &err_msg, &err_hint)) + { + GUC_check_errcode(err_code); + GUC_check_errdetail("%s", err_msg); + if (err_hint != NULL) + GUC_check_errhint("%s", err_hint); + return false; + } - /* Transform the data into SyncStandbySlotsConfigData */ - config->nslotnames = list_length(elemlist); + mname += strlen(mname) + 1; + } - ptr = config->slot_names; - foreach_ptr(char, slot_name, elemlist) - { - strcpy(ptr, slot_name); - ptr += strlen(slot_name) + 1; - } + /* Omit duplicate slot names so one slot is considered only once. */ + CompactSyncRepConfigMemberNames(syncrep_parse_result); - *extra = config; + /* + * For synchronized_standby_slots, a comma-separated list means all + * listed slots are required. The syncrep parser preserves this shape + * as SYNC_REP_DEFAULT, so map num_sync to nmembers to enforce all-mode + * semantics after removing duplicate names. + */ + if (syncrep_parse_result->syncrep_method == SYNC_REP_DEFAULT) + syncrep_parse_result->num_sync = syncrep_parse_result->nmembers; + + /* Reject num_sync > nmembers after duplicates have been omitted. */ + if (syncrep_parse_result->num_sync > syncrep_parse_result->nmembers) + { + GUC_check_errmsg("number of synchronized standby slots (%d) must not exceed the number of unique listed slots (%d)", + syncrep_parse_result->num_sync, + syncrep_parse_result->nmembers); + return false; + } + + /* + * Build SyncStandbySlotsConfigData from the parsed SyncRepConfigData. + * Since the structures have identical layout, we can use the same + * config_size. + */ + config = (SyncStandbySlotsConfigData *) + guc_malloc(LOG, syncrep_parse_result->config_size); + if (!config) + return false; + + config->config_size = syncrep_parse_result->config_size; + config->num_sync = syncrep_parse_result->num_sync; + config->syncrep_method = syncrep_parse_result->syncrep_method; + config->nslotnames = syncrep_parse_result->nmembers; + + /* Copy all slot names in one operation */ + memcpy(config->slot_names, + syncrep_parse_result->member_names, + syncrep_parse_result->config_size - + offsetof(SyncRepConfigData, member_names)); + + *extra = config; + } + else + *extra = NULL; - pfree(rawname); - list_free(elemlist); return true; } @@ -3099,18 +3226,117 @@ SlotExistsInSyncStandbySlots(const char *slot_name) } /* - * Return true if the slots specified in synchronized_standby_slots have caught up to - * the given WAL location, false otherwise. + * Report problem states for synchronized standby slots that prevented the + * catch-up requirement from being met. + */ +static void +ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo *slot_states, + int num_slot_states, int elevel, + XLogRecPtr wait_for_lsn) +{ + for (int i = 0; i < num_slot_states; i++) + { + const char *slot_name = slot_states[i].slot_name; + + switch (slot_states[i].state) + { + case SS_SLOT_NOT_FOUND: + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist", + slot_name, "synchronized_standby_slots"), + errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", + slot_name), + errhint("Create the replication slot \"%s\" or amend parameter \"%s\".", + slot_name, "synchronized_standby_slots")); + break; + + case SS_SLOT_LOGICAL: + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"", + slot_name, "synchronized_standby_slots"), + errdetail("Logical replication is waiting for correction on replication slot \"%s\".", + slot_name), + errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".", + slot_name, "synchronized_standby_slots")); + break; + + case SS_SLOT_INVALIDATED: + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated", + slot_name, "synchronized_standby_slots"), + errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", + slot_name), + errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".", + slot_name, "synchronized_standby_slots")); + break; + + case SS_SLOT_INACTIVE_LAGGING: + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid", + slot_name, "synchronized_standby_slots"), + errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", + slot_name), + errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".", + slot_name, "synchronized_standby_slots")); + break; + + case SS_SLOT_ACTIVE_LAGGING: + if (!XLogRecPtrIsValid(slot_states[i].restart_lsn)) + ereport(DEBUG1, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" specified in parameter \"%s\" has not caught up", + slot_name, "synchronized_standby_slots"), + errdetail("The slot's restart_lsn is not yet set; required LSN is %X/%X.", + LSN_FORMAT_ARGS(wait_for_lsn))); + else + ereport(DEBUG1, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" specified in parameter \"%s\" has not caught up", + slot_name, "synchronized_standby_slots"), + errdetail("The slot's restart_lsn %X/%X is behind the required %X/%X.", + LSN_FORMAT_ARGS(slot_states[i].restart_lsn), + LSN_FORMAT_ARGS(wait_for_lsn))); + break; + + default: + /* Should not happen */ + Assert(false); + break; + } + } +} + +/* + * Return true if the required standby slots have caught up to the given WAL + * location, false otherwise. + * + * The behavior depends on the synchronized_standby_slots configuration: * - * The elevel parameter specifies the error level used for logging messages - * related to slots that do not exist, are invalidated, or are inactive. + * Simple list (e.g., "slot1, slot2"): + * ALL slots must have caught up. Returns false otherwise. + * + * ANY N (e.g., "ANY 2 (slot1, slot2, slot3)"): + * Wait for any N eligible slots. Skips missing, invalid, logical, and + * lagging slots (inactive or active) to find N slots that have caught up. + * + * The elevel parameter specifies the error level used for reporting issues + * related to the slots specified in synchronized_standby_slots when the + * catch-up requirement is not met. */ bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) { const char *name; int caught_up_slot_num = 0; + int required; XLogRecPtr min_restart_lsn = InvalidXLogRecPtr; + bool wait_for_all; + SyncStandbySlotsStateInfo *slot_states; + int num_slot_states = 0; /* * Don't need to wait for the standbys to catch up if there is no value in @@ -3134,12 +3360,44 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) ss_oldest_flush_lsn >= wait_for_lsn) return true; + /* + * Determine how many slots are required and whether we're in "wait for + * ALL" mode versus "wait for N-of-M" mode. + * + * wait_for_all = true means we need ALL slots to be ready (simple + * list syntax like "slot1, slot2"). In this mode, we stop checking + * on the first slot that is missing/invalid/logical, or the first slot + * that is lagging (inactive or active). + * + * wait_for_all = false means we select N from M candidates (ANY N syntax). + * In this mode, slots already caught up are counted even if inactive, and + * lagging slots are skipped until enough slots have caught up. + * Duplicate configured slot names do not appear here because the check hook + * compacts them out of the parsed configuration. + */ + required = synchronized_standby_slots_config->num_sync; + wait_for_all = (required == synchronized_standby_slots_config->nslotnames); + + /* + * Allocate array to track slot states. Size it to the total number of + * configured slots since in the worst case all could have problem states. + */ + slot_states = palloc_array(SyncStandbySlotsStateInfo, + synchronized_standby_slots_config->nslotnames); + /* * To prevent concurrent slot dropping and creation while filtering the * slots, take the ReplicationSlotControlLock outside of the loop. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + /* + * Iterate through configured slots, checking their state and tracking + * how many have caught up. Problem states are recorded for deferred + * reporting: missing/logical/invalidated slots, and lagging slots + * (inactive or active). Messages are only emitted if the catch-up + * requirement isn't met. + */ name = synchronized_standby_slots_config->slot_names; for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++) { @@ -3150,35 +3408,28 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) slot = SearchNamedReplicationSlot(name, false); - /* - * If a slot name provided in synchronized_standby_slots does not - * exist, report a message and exit the loop. - */ if (!slot) { - ereport(elevel, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist", - name, "synchronized_standby_slots"), - errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", - name), - errhint("Create the replication slot \"%s\" or amend parameter \"%s\".", - name, "synchronized_standby_slots")); - break; + /* Record Slot State */ + slot_states[num_slot_states].slot_name = name; + slot_states[num_slot_states].state = SS_SLOT_NOT_FOUND; + num_slot_states++; + + if (wait_for_all) + break; + goto next_slot; } - /* Same as above: if a slot is not physical, exit the loop. */ if (SlotIsLogical(slot)) { - ereport(elevel, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"", - name, "synchronized_standby_slots"), - errdetail("Logical replication is waiting for correction on replication slot \"%s\".", - name), - errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".", - name, "synchronized_standby_slots")); - break; + /* Record Slot State */ + slot_states[num_slot_states].slot_name = name; + slot_states[num_slot_states].state = SS_SLOT_LOGICAL; + num_slot_states++; + + if (wait_for_all) + break; + goto next_slot; } SpinLockAcquire(&slot->mutex); @@ -3189,33 +3440,34 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) if (invalidated) { - /* Specified physical slot has been invalidated */ - ereport(elevel, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated", - name, "synchronized_standby_slots"), - errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", - name), - errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".", - name, "synchronized_standby_slots")); - break; + /* Record Slot State */ + slot_states[num_slot_states].slot_name = name; + slot_states[num_slot_states].state = SS_SLOT_INVALIDATED; + num_slot_states++; + + if (wait_for_all) + break; + goto next_slot; } if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn) { - /* Log a message if no active_pid for this physical slot */ - if (inactive) - ereport(elevel, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid", - name, "synchronized_standby_slots"), - errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", - name), - errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".", - name, "synchronized_standby_slots")); + /* + * If a slot is inactive and lagging, report it as inactive. + * If it is active and lagging, report it as lagging. + * + * In ALL mode: must wait for it. + * In ANY N (quorum) mode: skip and use another slot. + */ + slot_states[num_slot_states].slot_name = name; + slot_states[num_slot_states].state = + inactive ? SS_SLOT_INACTIVE_LAGGING : SS_SLOT_ACTIVE_LAGGING; + slot_states[num_slot_states].restart_lsn = restart_lsn; + num_slot_states++; - /* Continue if the current slot hasn't caught up. */ - break; + if (wait_for_all) + break; + goto next_slot; } Assert(restart_lsn >= wait_for_lsn); @@ -3226,17 +3478,30 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) caught_up_slot_num++; + /* Stop processing if the required number of slots have caught up. */ + if (caught_up_slot_num >= required) + break; + +next_slot: name += strlen(name) + 1; } LWLockRelease(ReplicationSlotControlLock); /* - * Return false if not all the standbys have caught up to the specified - * WAL location. + * If the required number of slots have not caught up, report any + * recorded problem states and return false. + * + * We only emit messages when the requirement is not met to avoid + * misleading messages in quorum/priority mode where other slots may + * have satisfied the condition despite some slots having issues. */ - if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames) + if (caught_up_slot_num < required) + { + ReportUnavailableSyncStandbySlots(slot_states, num_slot_states, elevel, wait_for_lsn); + pfree(slot_states); return false; + } /* The ss_oldest_flush_lsn must not retreat. */ Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) || @@ -3244,6 +3509,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) ss_oldest_flush_lsn = min_restart_lsn; + pfree(slot_states); return true; } diff --git a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl new file mode 100644 index 00000000000..760f10f38a0 --- /dev/null +++ b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl @@ -0,0 +1,378 @@ + +# Copyright (c) 2024-2026, PostgreSQL Global Development Group + +# Test synchronized_standby_slots with different syntax modes: +# - Plain list (ALL mode): slot1, slot2 +# - ANY N (quorum mode): ANY N (slot1, slot2, ...) +# +# Setup: a 3-node cluster with one primary, two physical standbys, and a +# logical decoding client using a failover-enabled slot. +# +# | ----> standby1 (primary_slot_name = sb1_slot) +# primary ------| +# | ----> standby2 (primary_slot_name = sb2_slot) +# +# synchronous_standby_names = 'ANY 1 (standby1, standby2)' +# +# Test scenarios: +# +# A) Plain list 'sb1_slot, sb2_slot' (ALL mode) +# - Works when all slots are available +# - Blocks immediately if ANY slot is unavailable +# +# B) ANY N (sb1_slot, sb2_slot, ...) (quorum mode) +# - Proceeds when at least N slots have caught up +# - Skips missing/invalid/logical slots and lagging slots (inactive or active) +# to find N caught-up slots +# +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# --------------------------------------------------------------------------- +# 1. Create a primary with logical replication level, autovacuum off +# --------------------------------------------------------------------------- +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); +$primary->append_conf( + 'postgresql.conf', qq{ +autovacuum = off +}); +$primary->start; + +# Physical replication slots for the two standbys +$primary->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('sb1_slot');"); +$primary->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('sb2_slot');"); + +# --------------------------------------------------------------------------- +# 2. Create standby1 and standby2 from a fresh backup +# --------------------------------------------------------------------------- +my $backup_name = 'base_backup'; +$primary->backup($backup_name); + +my $connstr = $primary->connstr; + +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr dbname=postgres' +)); + +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb2_slot' +primary_conninfo = '$connstr dbname=postgres' +)); + +$standby1->start; +$standby2->start; + +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +# --------------------------------------------------------------------------- +# 3. Create a logical failover slot on the primary +# --------------------------------------------------------------------------- +$primary->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('logical_failover', 'test_decoding', false, false, true);" +); + +# --------------------------------------------------------------------------- +# 4. Configure quorum sync rep with ALL-mode synchronized_standby_slots +# --------------------------------------------------------------------------- +$primary->append_conf( + 'postgresql.conf', qq{ +synchronous_standby_names = 'ANY 1 (standby1, standby2)' +synchronized_standby_slots = 'sb1_slot, sb2_slot' +}); +$primary->reload; + +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +# --------------------------------------------------------------------------- +# 5. Confirm that quorum sync rep is active for both standbys +# --------------------------------------------------------------------------- +is( $primary->safe_psql( + 'postgres', + q{SELECT count(*) FROM pg_stat_replication WHERE sync_state = 'quorum';} + ), + '2', + 'both standbys are in quorum sync state'); + +################################################## +# PART A: Plain list (ALL mode) blocks when any slot is unavailable +################################################## + +$standby1->stop; + +# Commit succeeds since standby2 satisfies the quorum. +my $emit_lsn = $primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'all_mode_blocks');" +); +like($emit_lsn, qr/^[0-9A-F]+\/[0-9A-F]+$/, + 'synchronous commit succeeds with quorum (standby2 alive)'); + +$primary->wait_for_replay_catchup($standby2); + +my $log_offset = -s $primary->logfile; + +my $bg = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +# Wait for the primary to log a warning about sb1_slot not being active. +$primary->wait_for_log( + qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/, + $log_offset); + +pass('plain list (ALL mode): logical decoding blocked by unavailable sb1_slot'); + +# Unblock by clearing synchronized_standby_slots. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg->quit; + +# Consume the change so the slot is clean for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +################################################## +# PART B: ANY mode (quorum) — logical decoding proceeds with N-of-M slots +################################################## + +# Switch synchronized_standby_slots to quorum mode: need only 1 of 2 slots. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'ANY 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +# standby1 is still down; standby2 is up. + +# Emit another transactional message — commits via quorum. +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'quorum_mode_works');" +); +$primary->wait_for_replay_catchup($standby2); + +# In quorum mode, logical decoding should NOT block because sb2_slot has +# caught up and 1-of-2 is sufficient. +my $decoded = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%quorum_mode_works%';}); +is($decoded, '1', + 'ANY mode: logical decoding proceeds with only sb2_slot caught up'); + +################################################## +# PART C: Re-check plain list (ALL mode) works when both standbys are up +################################################## + +# Bring standby1 back. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Switch to plain list (ALL mode) with both slots. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'sb1_slot, sb2_slot'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'both_caught_up');" +); +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_bc = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%both_caught_up%';}); +is($decoded_bc, '1', + 'plain list: works when all standbys are up'); + +################################################## +# PART D: ANY 2 waits on an active lagging slot +################################################## + +# Stop standby1 so sb1_slot can be controlled by a raw replication connection +# that keeps the slot active while lagging. +$standby1->stop; + +my $old_lsn = $primary->safe_psql('postgres', + "SELECT pg_current_wal_lsn();"); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'ANY 2 (sb1_slot, sb2_slot)'"); +$primary->reload; + +my $any2_lag_lsn = $primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'any_2_lagging_blocks');" +); +$primary->wait_for_replay_catchup($standby2); + +my $repl_any2 = $primary->background_psql( + 'postgres', + replication => 'database', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$repl_any2->query_until( + qr/^$/, + "START_REPLICATION SLOT sb1_slot PHYSICAL $old_lsn;\n"); + +$primary->poll_query_until('postgres', q{ + SELECT active_pid IS NOT NULL + FROM pg_replication_slots + WHERE slot_name = 'sb1_slot' +}) or die "replication connection did not activate sb1_slot"; + +my $bg_any2 = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg_any2->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +ok( $primary->poll_query_until( + 'postgres', q{ +SELECT EXISTS ( + SELECT 1 + FROM pg_stat_activity + WHERE wait_event = 'WaitForStandbyConfirmation' + AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%' +); +}), + 'ANY 2: decoding waits when only one slot has caught up'); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg_any2->quit; +$repl_any2->quit; + +# Consume the change for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +# Bring standby1 back up for the remaining tests. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + + +################################################## +# PART E: Duplicate entries are ignored for quorum counting +################################################## + +# Stop standby2 so only sb1_slot can catch up. +$standby2->stop; + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'ANY 2 (sb1_slot, sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'duplicate_entries_ignored');" +); +$primary->wait_for_replay_catchup($standby1); + +my $bg_dup = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg_dup->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +ok( $primary->poll_query_until( + 'postgres', q{ +SELECT EXISTS ( + SELECT 1 + FROM pg_stat_activity + WHERE wait_event = 'WaitForStandbyConfirmation' + AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%' +); +}), + 'duplicate entries are ignored when counting quorum slots'); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg_dup->quit; + +# Consume the change for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +# Bring standby2 back up for validation tests. +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + + +################################################## +# PART F: Verify GUC validation rejects bad values +################################################## + +my ($result, $stdout, $stderr); + +# N exceeds number of listed slots +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 3 (sb1_slot, sb2_slot)';"); +like($stderr, qr/ERROR/, + 'GUC rejects ANY N when N > number of listed slots'); + +# Missing closing parenthesis +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (sb1_slot, sb2_slot';"); +like($stderr, qr/ERROR/, + 'GUC rejects malformed ANY syntax'); + +# Priority syntax is not supported by synchronized_standby_slots yet +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'FIRST 1 (sb1_slot, sb2_slot)';"); +like($stderr, qr/priority syntax is not supported/, + 'GUC rejects FIRST syntax'); + +# Legacy priority syntax is not supported by synchronized_standby_slots yet +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = '1 (sb1_slot, sb2_slot)';"); +like($stderr, qr/priority syntax is not supported/, + 'GUC rejects legacy priority syntax'); + +# Invalid slot name +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (INVALID_UPPER)';"); +like($stderr, qr/ERROR/, + 'GUC rejects invalid slot name in ANY syntax'); + +# --------------------------------------------------------------------------- +# Cleanup +# --------------------------------------------------------------------------- +$primary->safe_psql('postgres', + "SELECT pg_drop_replication_slot('logical_failover');"); + +done_testing(); -- 2.43.0