diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ffd5b31eb2..6add59f18c 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -108,14 +108,20 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys); + List *sync_standbys, + SyncRepStandbyData *sdata); static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, - List *sync_standbys, uint8 nth); + List *sync_standbys, + SyncRepStandbyData *sdata, uint8 nth); static int SyncRepGetStandbyPriority(void); -static List *SyncRepGetSyncStandbysPriority(bool *am_sync); -static List *SyncRepGetSyncStandbysQuorum(bool *am_sync); +static List *SyncRepGetSyncStandbysInternal(bool *am_sync, + SyncRepStandbyData *sdata); +static List *SyncRepGetSyncStandbysPriority(bool *am_sync, + SyncRepStandbyData *sdata); +static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, + SyncRepStandbyData *sdata); static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING @@ -537,6 +543,7 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync) { List *sync_standbys; + static SyncRepStandbyData *sdata = NULL; Assert(LWLockHeldByMe(SyncRepLock)); @@ -545,8 +552,12 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, *applyPtr = InvalidXLogRecPtr; *am_sync = false; + if (sdata == NULL) + sdata = (SyncRepStandbyData *) + palloc(max_wal_senders * sizeof(SyncRepStandbyData)); + /* Get standbys that are considered as synchronous at this moment */ - sync_standbys = SyncRepGetSyncStandbys(am_sync); + sync_standbys = SyncRepGetSyncStandbysInternal(am_sync, sdata); /* * Quick exit if we are not managing a sync standby or there are not @@ -576,12 +587,13 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) { SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys); + sync_standbys, sdata); } else { SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, SyncRepConfig->num_sync); + sync_standbys, sdata, + SyncRepConfig->num_sync); } list_free(sync_standbys); @@ -593,7 +605,8 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, */ static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys) + XLogRecPtr *applyPtr, List *sync_standbys, + SyncRepStandbyData *sdata) { ListCell *cell; @@ -603,23 +616,14 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, */ foreach(cell, sync_standbys) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; - XLogRecPtr write; - XLogRecPtr flush; - XLogRecPtr apply; + SyncRepStandbyData *sby = &(sdata[lfirst_int(cell)]); - SpinLockAcquire(&walsnd->mutex); - write = walsnd->write; - flush = walsnd->flush; - apply = walsnd->apply; - SpinLockRelease(&walsnd->mutex); - - if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write) - *writePtr = write; - if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush) - *flushPtr = flush; - if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply) - *applyPtr = apply; + if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > sby->write) + *writePtr = sby->write; + if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > sby->flush) + *flushPtr = sby->flush; + if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > sby->apply) + *applyPtr = sby->apply; } } @@ -629,7 +633,8 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, */ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth) + XLogRecPtr *applyPtr, List *sync_standbys, + SyncRepStandbyData *sdata, uint8 nth) { ListCell *cell; XLogRecPtr *write_array; @@ -645,13 +650,11 @@ SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, foreach(cell, sync_standbys) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + SyncRepStandbyData *sby = &(sdata[lfirst_int(cell)]); - SpinLockAcquire(&walsnd->mutex); - write_array[i] = walsnd->write; - flush_array[i] = walsnd->flush; - apply_array[i] = walsnd->apply; - SpinLockRelease(&walsnd->mutex); + write_array[i] = sby->write; + flush_array[i] = sby->flush; + apply_array[i] = sby->apply; i++; } @@ -688,16 +691,22 @@ cmp_lsn(const void *a, const void *b) return 1; } -/* - * Return the list of sync standbys, or NIL if no sync standby is connected. - * - * The caller must hold SyncRepLock. - * - * On return, *am_sync is set to true if this walsender is connecting to - * sync standby. Otherwise it's set to false. - */ List * SyncRepGetSyncStandbys(bool *am_sync) +{ + return SyncRepGetSyncStandbysInternal(am_sync, NULL); +} + +/* + * Return the list of sync standbys, or NIL if no sync standby is connected. + * + * The caller must hold SyncRepLock. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +static List * +SyncRepGetSyncStandbysInternal(bool *am_sync, SyncRepStandbyData *sdata) { Assert(LWLockHeldByMe(SyncRepLock)); @@ -710,8 +719,8 @@ SyncRepGetSyncStandbys(bool *am_sync) return NIL; return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? - SyncRepGetSyncStandbysPriority(am_sync) : - SyncRepGetSyncStandbysQuorum(am_sync); + SyncRepGetSyncStandbysPriority(am_sync, sdata) : + SyncRepGetSyncStandbysQuorum(am_sync, sdata); } /* @@ -725,7 +734,7 @@ SyncRepGetSyncStandbys(bool *am_sync) * sync standby. Otherwise it's set to false. */ static List * -SyncRepGetSyncStandbysQuorum(bool *am_sync) +SyncRepGetSyncStandbysQuorum(bool *am_sync, SyncRepStandbyData *sdata) { List *result = NIL; int i; @@ -736,7 +745,9 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr write; XLogRecPtr flush; + XLogRecPtr apply; WalSndState state; int pid; @@ -744,7 +755,9 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) SpinLockAcquire(&walsnd->mutex); pid = walsnd->pid; + write = walsnd->write; flush = walsnd->flush; + apply = walsnd->apply; state = walsnd->state; SpinLockRelease(&walsnd->mutex); @@ -772,6 +785,12 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) result = lappend_int(result, i); if (am_sync != NULL && walsnd == MyWalSnd) *am_sync = true; + if (sdata != NULL) + { + sdata[i].write = write; + sdata[i].flush = flush; + sdata[i].apply = apply; + } } return result; @@ -791,12 +810,12 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) * sync standby. Otherwise it's set to false. */ static List * -SyncRepGetSyncStandbysPriority(bool *am_sync) +SyncRepGetSyncStandbysPriority(bool *am_sync, SyncRepStandbyData *sdata) { List *result = NIL; List *pending = NIL; - int lowest_priority; - int next_highest_priority; + int lowest_priority = -1; + int next_highest_priority = -1; int this_priority; int priority; int i; @@ -806,9 +825,6 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY); - lowest_priority = SyncRepConfig->nmembers; - next_highest_priority = lowest_priority + 1; - /* * Find the sync standbys which have the highest priority (i.e, 1). Also * store all the other potential sync standbys into the pending list, in @@ -816,7 +832,9 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) */ for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr write; XLogRecPtr flush; + XLogRecPtr apply; WalSndState state; int pid; @@ -824,7 +842,9 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) SpinLockAcquire(&walsnd->mutex); pid = walsnd->pid; + write = walsnd->write; flush = walsnd->flush; + apply = walsnd->apply; state = walsnd->state; SpinLockRelease(&walsnd->mutex); @@ -856,6 +876,12 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) result = lappend_int(result, i); if (am_sync != NULL && walsnd == MyWalSnd) *am_sync = true; + if (sdata != NULL) + { + sdata[i].write = write; + sdata[i].flush = flush; + sdata[i].apply = apply; + } if (list_length(result) == SyncRepConfig->num_sync) { list_free(pending); @@ -867,16 +893,27 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) pending = lappend_int(pending, i); if (am_sync != NULL && walsnd == MyWalSnd) am_in_pending = true; + if (sdata != NULL) + { + sdata[i].write = write; + sdata[i].flush = flush; + sdata[i].apply = apply; + } /* - * Track the highest priority among the standbys in the pending - * list, in order to use it as the starting priority for later - * scan of the list. This is useful to find quickly the sync - * standbys from the pending list later because we can skip - * unnecessary scans for the unused priorities. + * Track the highest and lowest priorities among the standbys + * in the pending list, in order to use them as the starting and + * ending priorities for later scan of the list. This is useful to + * find quickly the sync standbys from the pending list later + * because we can skip unnecessary scans for the unused + * priorities. */ - if (this_priority < next_highest_priority) + if (next_highest_priority == -1 || + this_priority < next_highest_priority) next_highest_priority = this_priority; + if (lowest_priority == -1 || + lowest_priority < this_priority) + lowest_priority = this_priority; } } diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index c5f0e91aad..fd97eee206 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -36,6 +36,18 @@ #define SYNC_REP_PRIORITY 0 #define SYNC_REP_QUORUM 1 +/* + * SyncRepGetSyncStandbys returns an array of these structs, + * one per synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; +} SyncRepStandbyData; + /* * Struct for the configuration of synchronous replication. *