From bf2dd159b3de2f44cdb248b658f4c518ead98476 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Fri, 5 Jun 2026 15:28:45 +0530 Subject: [PATCH] top-up changes --- doc/src/sgml/config.sgml | 32 +++--- src/backend/replication/slot.c | 106 ++++++++++-------- .../053_synchronized_standby_slots_quorum.pl | 16 --- 3 files changed, 80 insertions(+), 74 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 7d8f4b75717..44276fe2bc0 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5225,19 +5225,6 @@ ANY num_sync ( num_sync must not exceed - the number of unique listed slots. - A plain comma-separated list without a keyword specifies that all listed physical slots must confirm WAL @@ -5268,6 +5255,25 @@ ANY num_sync ( num_sync must not exceed + the number of unique listed slots. Such a configuration results in an + error to prevent indefinite waits in WAL sender processes due to a + misconfigured synchronized_standby_slots setting. + FIRST and ANY are case-insensitive. If these keywords are used as the name of a replication slot, diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fb45a1cd581..1e2e631d6a4 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -103,7 +103,7 @@ typedef struct int config_size; /* total size of this struct, in bytes */ int num_sync; /* number of slots that must confirm WAL * receipt before logical decoding proceeds */ - uint8 syncrep_method; /* SYNC_REP_* method */ + uint8 syncrep_method; /* SYNC_REP_* method */ int nslotnames; /* number of slot names that follow */ /* @@ -117,12 +117,12 @@ typedef struct */ typedef enum { - SS_SLOT_NOT_FOUND, /* slot does not exist */ - SS_SLOT_LOGICAL, /* slot is logical, not physical */ - SS_SLOT_INVALIDATED, /* slot has been invalidated */ + SS_SLOT_NOT_FOUND, /* slot does not exist */ + SS_SLOT_LOGICAL, /* slot is logical, not physical */ + SS_SLOT_INVALIDATED, /* slot has been invalidated */ SS_SLOT_INACTIVE_LAGGING, /* slot is inactive and behind wait_for_lsn */ - SS_SLOT_ACTIVE_LAGGING, /* slot is active and behind wait_for_lsn */ -} SyncStandbySlotsState; + SS_SLOT_ACTIVE_LAGGING, /* slot is active and behind wait_for_lsn */ +} SyncStandbySlotsState; /* * Information about a synchronized standby slot's state. @@ -131,8 +131,9 @@ typedef struct { const char *slot_name; /* name of the slot */ SyncStandbySlotsState state; /* state of the slot */ - XLogRecPtr restart_lsn; /* current restart_lsn (valid for lagging states) */ -} SyncStandbySlotsStateInfo; + XLogRecPtr restart_lsn; /* current restart_lsn (valid for lagging + * states) */ +} SyncStandbySlotsStateInfo; /* * Lookup table for slot invalidation causes. @@ -2994,16 +2995,20 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause) } /* - * Remove duplicate member names from a flat SyncRepConfigData in place. + * Remove duplicate member names from a SyncRepConfigData object. * - * The first occurrence of each name is kept and input order is preserved. + * The member_names array of SyncRepConfigData is compacted in place so + * that only the first occurrence of each member name is retained. The + * original ordering of retained names is preserved, and nmembers and + * config_size are updated to describe only the compacted portion of + * the array. */ static void CompactSyncRepConfigMemberNames(SyncRepConfigData *config) { char *src_name; char *dst_name; - int unique_members = 0; + int nunique_members = 0; Size unique_size = offsetof(SyncRepConfigData, member_names); src_name = config->member_names; @@ -3011,38 +3016,47 @@ CompactSyncRepConfigMemberNames(SyncRepConfigData *config) for (int i = 0; i < config->nmembers; i++) { - char *existing_name; + char *unique_name; size_t name_size; bool duplicate = false; name_size = strlen(src_name) + 1; - existing_name = config->member_names; - for (int j = 0; j < unique_members; j++) + /* + * Check whether src_name matches any previously retained unique name. + * Only the first nunique_members entries in member_names need to be + * examined for this. + */ + unique_name = config->member_names; + for (int j = 0; j < nunique_members; j++) { - if (strcmp(existing_name, src_name) == 0) + if (strcmp(unique_name, src_name) == 0) { duplicate = true; break; } - existing_name += strlen(existing_name) + 1; + unique_name += strlen(unique_name) + 1; } if (!duplicate) { + /* + * This src_name is a new unique name. Copy it immediately after the + * unique names retained so far. + */ if (dst_name != src_name) memmove(dst_name, src_name, name_size); dst_name += name_size; - unique_members++; + nunique_members++; unique_size += name_size; } src_name += name_size; } - config->nmembers = unique_members; + config->nmembers = nunique_members; config->config_size = (int) unique_size; } @@ -3125,14 +3139,14 @@ check_synchronized_standby_slots(char **newval, void **extra, GucSource source) mname += strlen(mname) + 1; } - /* Omit duplicate slot names so one slot is considered only once. */ + /* Omit duplicate slot names to ensure each slot is considered only once. */ CompactSyncRepConfigMemberNames(syncrep_parse_result); /* * For synchronized_standby_slots, a comma-separated list means all * listed slots are required. The syncrep parser preserves this shape - * as SYNC_REP_DEFAULT, so map num_sync to nmembers to enforce all-mode - * semantics after removing duplicate names. + * as SYNC_REP_DEFAULT, so map num_sync to nmembers to enforce + * all-mode semantics after removing duplicate names. */ if (syncrep_parse_result->syncrep_method == SYNC_REP_DEFAULT) syncrep_parse_result->num_sync = syncrep_parse_result->nmembers; @@ -3224,7 +3238,7 @@ SlotExistsInSyncStandbySlots(const char *slot_name) * catch-up requirement from being met. */ static void -ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo *slot_states, +ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo * slot_states, int num_slot_states, int elevel, XLogRecPtr wait_for_lsn) { @@ -3285,15 +3299,15 @@ ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo *slot_states, errmsg("replication slot \"%s\" specified in parameter \"%s\" has not caught up", slot_name, "synchronized_standby_slots"), errdetail("The slot's restart_lsn is not yet set; required LSN is %X/%X.", - LSN_FORMAT_ARGS(wait_for_lsn))); + LSN_FORMAT_ARGS(wait_for_lsn))); else ereport(DEBUG1, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slot \"%s\" specified in parameter \"%s\" has not caught up", slot_name, "synchronized_standby_slots"), errdetail("The slot's restart_lsn %X/%X is behind the required %X/%X.", - LSN_FORMAT_ARGS(slot_states[i].restart_lsn), - LSN_FORMAT_ARGS(wait_for_lsn))); + LSN_FORMAT_ARGS(slot_states[i].restart_lsn), + LSN_FORMAT_ARGS(wait_for_lsn))); break; default: @@ -3364,14 +3378,14 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) * Determine how many slots are required and whether we're in "wait for * ALL" mode versus "wait for N-of-M" mode. * - * wait_for_all = true means we need ALL slots to be ready (simple - * list syntax like "slot1, slot2"). In this mode, we stop checking - * on the first slot that is missing/invalid/logical, or the first slot - * that is lagging (inactive or active). + * wait_for_all = true means we need ALL slots to be ready (simple list + * syntax like "slot1, slot2"). In this mode, we stop checking on the + * first slot that is missing/invalid/logical, or the first slot that is + * lagging (inactive or active). * * wait_for_all = false means we select N from M candidates (FIRST N or - * ANY N syntax). In this mode, slots already caught up are counted even if - * inactive. In FIRST N mode, we skip missing/invalid/logical slots and + * ANY N syntax). In this mode, slots already caught up are counted even + * if inactive. In FIRST N mode, we skip missing/invalid/logical slots and * lagging inactive slots, but wait for an active lagging slot with higher * priority. In ANY N mode, we skip lagging slots (inactive or active) to * find any N that have caught up. Duplicate configured slot names do not @@ -3387,7 +3401,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) * configured slots since in the worst case all could have problem states. */ slot_states = palloc_array(SyncStandbySlotsStateInfo, - synchronized_standby_slots_config->nslotnames); + synchronized_standby_slots_config->nslotnames); /* * To prevent concurrent slot dropping and creation while filtering the @@ -3396,8 +3410,8 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); /* - * Iterate through configured slots, checking their state and tracking - * how many have caught up. Problem states are recorded for deferred + * Iterate through configured slots, checking their state and tracking how + * many have caught up. Problem states are recorded for deferred * reporting: missing/logical/invalidated slots, and lagging slots * (inactive or active). Messages are only emitted if the catch-up * requirement isn't met. @@ -3457,13 +3471,12 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn) { /* - * If a slot is inactive and lagging, report it as inactive. - * If it is active and lagging, report it as lagging. + * If a slot is inactive and lagging, report it as inactive. If it + * is active and lagging, report it as lagging. * - * In ALL mode: must wait for it. - * In FIRST N (priority) mode: lagging active slots block, while - * inactive slots can be skipped. - * In ANY N (quorum) mode: skip and use another slot. + * In ALL mode: must wait for it. In FIRST N (priority) mode: + * lagging active slots block, while inactive slots can be + * skipped. In ANY N (quorum) mode: skip and use another slot. */ slot_states[num_slot_states].slot_name = name; slot_states[num_slot_states].state = @@ -3497,12 +3510,12 @@ next_slot: LWLockRelease(ReplicationSlotControlLock); /* - * If the required number of slots have not caught up, report any - * recorded problem states and return false. + * If the required number of slots have not caught up, report any recorded + * problem states and return false. * * We only emit messages when the requirement is not met to avoid - * misleading messages in quorum/priority mode where other slots may - * have satisfied the condition despite some slots having issues. + * misleading messages in quorum/priority mode where other slots may have + * satisfied the condition despite some slots having issues. */ if (caught_up_slot_num < required) { @@ -3550,7 +3563,10 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) ProcessConfigFile(PGC_SIGHUP); } - /* Exit once the configured synchronized_standby_slots requirement is met. */ + /* + * Exit once the configured synchronized_standby_slots requirement is + * met. + */ if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING)) break; diff --git a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl index 67a5b1d9657..be54b788807 100644 --- a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl +++ b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl @@ -15,22 +15,6 @@ # # synchronous_standby_names = 'ANY 1 (standby1, standby2)' # -# Test scenarios: -# -# A) Plain list 'sb1_slot, sb2_slot' (ALL mode) -# - Works when all slots are available -# - Blocks immediately if ANY slot is unavailable -# -# B) ANY N (sb1_slot, sb2_slot, ...) (quorum mode) -# - Proceeds when at least N slots have caught up -# - Skips missing/invalid/logical slots and lagging slots (inactive or active) -# to find N caught-up slots -# -# C) FIRST N (sb1_slot, sb2_slot) (priority mode) -# - Selects first N slots in priority order (list order) -# - Skips missing/invalid/logical slots and inactive lagging slots, -# but waits for active lagging slots -# - FIRST 1 works with one slot down (unlike plain list) use strict; use warnings FATAL => 'all'; -- 2.34.1