diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a4153518fd..8dd8d40c07 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6818,7 +6818,7 @@ CreateCheckPoint(int flags) XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, - InvalidTransactionId)) + InvalidTransactionId, RS_INVAL_WAL)) { /* * Some slots have been invalidated; recalculate the old-segment @@ -7263,7 +7263,7 @@ CreateRestartPoint(int flags) endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, - InvalidTransactionId)) + InvalidTransactionId, RS_INVAL_WAL)) { /* * Some slots have been invalidated; recalculate the old-segment @@ -7988,7 +7988,8 @@ xlog_redo(XLogReaderState *record) xlrec.wal_level < WAL_LEVEL_LOGICAL && wal_level >= WAL_LEVEL_LOGICAL) InvalidateObsoleteReplicationSlots(0, InvalidOid, - InvalidTransactionId); + InvalidTransactionId, + RS_INVAL_WAL_LEVEL); LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 411f6c0149..3c527742de 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1322,12 +1322,13 @@ LogicalReplicationSlotXidsConflict(ReplicationSlot *s, Oid dboid, TransactionId */ static bool InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, - Oid dboid, TransactionId xid, bool *invalidated) + Oid dboid, TransactionId xid, bool *invalidated, + ReplicationSlotInvalidationCause cause) { int last_signaled_pid = 0; bool released_lock = false; bool invalidate_all_logical = !TransactionIdIsValid(xid) && - oldestLSN == InvalidXLogRecPtr; + oldestLSN == InvalidXLogRecPtr; for (;;) @@ -1363,14 +1364,30 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, */ if (s->data.invalidated == RS_INVAL_NONE) { - if (oldestLSN != InvalidXLogRecPtr && s->data.restart_lsn != InvalidXLogRecPtr && - s->data.restart_lsn < oldestLSN) - conflict = RS_INVAL_WAL; - if (TransactionIdIsValid(xid) && SlotIsLogical(s) && - LogicalReplicationSlotXidsConflict(s, dboid, xid)) - conflict = RS_INVAL_XID; - else if (invalidate_all_logical && SlotIsLogical(s)) - conflict = RS_INVAL_WAL_LEVEL; + switch (cause) + { + case RS_INVAL_WAL: + if (oldestLSN != InvalidXLogRecPtr && s->data.restart_lsn != InvalidXLogRecPtr && + s->data.restart_lsn < oldestLSN) + conflict = cause; + break; + case RS_INVAL_XID: + if (TransactionIdIsValid(xid) && SlotIsLogical(s) && + LogicalReplicationSlotXidsConflict(s, dboid, xid)) + conflict = cause; + break; + case RS_INVAL_WAL_LEVEL: + if (invalidate_all_logical && SlotIsLogical(s)) + conflict = cause; + break; + default: + break; + + /* + * this runs as part of checkpoint, so avoid raising + * errors if possible + */ + } } /* if there's no conflict, we're done */ @@ -1511,16 +1528,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, * be invalidated. Note that WaitExceedsMaxStandbyDelay() is not taken into * account here (as opposed to ResolveRecoveryConflictWithVirtualXIDs()): XXXX * - * - * XXX: Should we have the caller pass in a specific - * ReplicationSlotInvalidationCause that we should search for? That'd likely - * make some things a bit neater. - * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid, - TransactionId xid) + TransactionId snapshotConflictHorizon, + ReplicationSlotInvalidationCause cause) { XLogRecPtr oldestLSN; bool invalidated = false; @@ -1539,7 +1552,9 @@ restart: if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, dboid, xid, &invalidated)) + if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, dboid, + snapshotConflictHorizon, &invalidated, + cause)) { /* if the lock was released, start from scratch */ goto restart; diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index ce5842b0db..77166127f2 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -496,7 +496,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) InvalidateObsoleteReplicationSlots(0, locator.dbOid, - snapshotConflictHorizon); + snapshotConflictHorizon, RS_INVAL_XID); } /* diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 35cafe94bc..1430c7e908 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -231,7 +231,8 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid, - TransactionId xid); + TransactionId snapshotConflictHorizon, + ReplicationSlotInvalidationCause cause); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name);