From 67ce45537f9766da93de828a40bb46dee75dd846 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Tue, 7 Feb 2023 08:57:56 +0000 Subject: [PATCH v63 1/6] Handle logical slot conflicts on standby. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During WAL replay on standby, when slot conflict is identified, invalidate such slots. Also do the same thing if wal_level on the primary server is reduced to below logical and there are existing logical slots on standby. Introduce a new ProcSignalReason value for slot conflict recovery. Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello, Bharath Rupireddy, Amit Kapila, Álvaro Herrera --- src/backend/access/gist/gistxlog.c | 2 + src/backend/access/hash/hash_xlog.c | 1 + src/backend/access/heap/heapam.c | 3 + src/backend/access/nbtree/nbtxlog.c | 2 + src/backend/access/spgist/spgxlog.c | 1 + src/backend/access/transam/xlog.c | 20 ++- .../replication/logical/logicalfuncs.c | 13 +- src/backend/replication/slot.c | 170 +++++++++++++----- src/backend/replication/slotfuncs.c | 3 +- src/backend/replication/walsender.c | 7 + src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 14 +- src/backend/tcop/postgres.c | 18 ++ src/include/replication/slot.h | 61 ++++++- src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 2 + 16 files changed, 270 insertions(+), 51 deletions(-) 7.5% src/backend/access/transam/ 5.6% src/backend/replication/logical/ 58.7% src/backend/replication/ 5.2% src/backend/storage/ipc/ 4.8% src/backend/tcop/ 14.2% src/include/replication/ 3.5% src/ diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index b7678f3c14..9a86fb3fef 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, + xldata->isCatalogRel, rlocator); } @@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, xlrec->locator); } diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index f2dd9be8d3..e8e06c62a9 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, + xldata->isCatalogRel, rlocator); } diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 8b13e3f892..f389ceee1e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, rlocator); /* @@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL, rlocator); /* @@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, rlocator); } diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 414ca4f6de..c87e46ed66 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, rlocator); } @@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, xlrec->locator); } diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index b071b59c8a..459ac929ba 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &locator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, + xldata->isCatalogRel, locator); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 46821ad605..713b61a9bf 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6806,7 +6806,8 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(_logSegNo)) + if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, + InvalidTransactionId, false)) { /* * Some slots have been invalidated; recalculate the old-segment @@ -7250,7 +7251,8 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(_logSegNo)) + if (InvalidateObsoleteReplicationSlots(_logSegNo, InvalidOid, + InvalidTransactionId, false)) { /* * Some slots have been invalidated; recalculate the old-segment @@ -7963,6 +7965,20 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); + /* + * Invalidate logical slots if we are in hot standby and the primary + * does not have a WAL level sufficient for logical decoding. No need + * to search for potentially conflicting logically slots if standby is + * running with wal_level lower than logical, because in that case, we + * would have either disallowed creation of logical slots or + * invalidated existing ones. + */ + if (InRecovery && InHotStandby && + xlrec.wal_level < WAL_LEVEL_LOGICAL && + wal_level >= WAL_LEVEL_LOGICAL) + InvalidateObsoleteReplicationSlots(0, InvalidOid, + InvalidTransactionId, true); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fa1b641a2b..575a047e53 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -216,9 +216,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* * After the sanity checks in CreateDecodingContext, make sure the - * restart_lsn is valid. Avoid "cannot get changes" wording in this - * errmsg because that'd be confusingly ambiguous about no changes - * being available. + * restart_lsn is valid or both effective_xmin and catalog_xmin are + * valid. Avoid "cannot get changes" wording in this errmsg because + * that'd be confusingly ambiguous about no changes being available. */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, @@ -227,6 +227,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(*name)), errdetail("This slot has never previously reserved WAL, or it has been invalidated."))); + if (LogicalReplicationSlotIsInvalid(MyReplicationSlot)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot read from logical replication slot \"%s\"", + NameStr(*name)), + errdetail("This slot has been invalidated because it was conflicting with recovery."))); + MemoryContextSwitchTo(oldcontext); /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2293c0c6fc..376d453374 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -110,6 +110,13 @@ static void RestoreSlotFromDisk(const char *name); static void CreateSlotOnDisk(ReplicationSlot *slot); static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel); +/* to report termination/invalidation */ +static void ReportTerminationInvalidation(bool terminating, bool check_on_xid, + int pid, NameData slotname, + TransactionId xid, + XLogRecPtr restart_lsn, + XLogRecPtr oldestLSN); + /* * Report shared-memory space needed by ReplicationSlotsShmemInit. */ @@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) SpinLockAcquire(&s->mutex); effective_xmin = s->effective_xmin; effective_catalog_xmin = s->effective_catalog_xmin; - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) && - XLogRecPtrIsInvalid(s->data.restart_lsn)); + invalidated = ObsoleteSlotIsInvalid(s, true) || LogicalReplicationSlotIsInvalid(s); SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ @@ -1225,28 +1231,85 @@ ReplicationSlotReserveWal(void) } } + +/* + * Report terminating or conflicting message. + * + * For both, logical conflict on standby and obsolete slot are handled. + */ +static void +ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid, + NameData slotname, TransactionId xid, + XLogRecPtr restart_lsn, XLogRecPtr oldestLSN) +{ + StringInfoData err_detail; + bool hint = false; + + initStringInfo(&err_detail); + + if (check_on_xid) + { + if (TransactionIdIsValid(xid)) + appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."), xid); + else + appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server")); + } + else + { + appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."), + LSN_FORMAT_ARGS(restart_lsn), + (unsigned long long) (oldestLSN - restart_lsn)); + + hint = true; + } + + ereport(LOG, + terminating ? errmsg("terminating process %d to release replication slot \"%s\"", pid, NameStr(slotname)) : + errmsg("invalidating obsolete replication slot \"%s\"", NameStr(slotname)), + errdetail_internal("%s", err_detail.data), + hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0); + + pfree(err_detail.data); +} + /* - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot - * and mark it invalid, if necessary and possible. + * Helper for InvalidateObsoleteReplicationSlots + * + * Acquires the given slot and mark it invalid, if necessary and possible. * * Returns whether ReplicationSlotControlLock was released in the interim (and * in that case we're not holding the lock at return, otherwise we are). * - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.) * * This is inherently racy, because we release the LWLock * for syscalls, so caller must restart if we return true. */ static bool InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, - bool *invalidated) + Oid dboid, bool *invalidated, TransactionId xid, + bool check_on_xid) { int last_signaled_pid = 0; bool released_lock = false; + if (check_on_xid) + { + /* we are only dealing with *logical* slot conflicts */ + if (!SlotIsLogical(s)) + return false; + + /* + * not the database of interest and we don't want all the + * database, skip + */ + if (s->data.database != dboid && TransactionIdIsValid(xid)) + return false; + } for (;;) { XLogRecPtr restart_lsn; + NameData slotname; int active_pid = 0; @@ -1263,19 +1326,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, * Check if the slot needs to be invalidated. If it needs to be * invalidated, and is not currently acquired, acquire it and mark it * as having been invalidated. We do this with the spinlock held to - * avoid race conditions -- for example the restart_lsn could move - * forward, or the slot could be dropped. + * avoid race conditions -- for example the restart_lsn (or the + * xmin(s) could) move forward or the slot could be dropped. */ SpinLockAcquire(&s->mutex); restart_lsn = s->data.restart_lsn; /* - * If the slot is already invalid or is fresh enough, we don't need to - * do anything. + * If the slot is already invalid or is a non conflicting slot, we + * don't need to do anything. */ - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) + if (DoNotInvalidateSlot(s, xid, oldestLSN, check_on_xid)) { + /* then, we are not forcing for invalidation */ SpinLockRelease(&s->mutex); if (released_lock) LWLockRelease(ReplicationSlotControlLock); @@ -1294,9 +1358,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, { MyReplicationSlot = s; s->active_pid = MyProcPid; - s->data.invalidated_at = restart_lsn; - s->data.restart_lsn = InvalidXLogRecPtr; - + if (check_on_xid) + { + s->effective_xmin = InvalidTransactionId; + s->data.catalog_xmin = InvalidTransactionId; + } + else + { + s->data.invalidated_at = restart_lsn; + s->data.restart_lsn = InvalidXLogRecPtr; + } /* Let caller know */ *invalidated = true; } @@ -1329,15 +1400,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, */ if (last_signaled_pid != active_pid) { - ereport(LOG, - errmsg("terminating process %d to release replication slot \"%s\"", - active_pid, NameStr(slotname)), - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", - LSN_FORMAT_ARGS(restart_lsn), - (unsigned long long) (oldestLSN - restart_lsn)), - errhint("You might need to increase max_slot_wal_keep_size.")); - - (void) kill(active_pid, SIGTERM); + ReportTerminationInvalidation(true, check_on_xid, active_pid, + slotname, xid, restart_lsn, + oldestLSN); + + if (check_on_xid) + (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId); + else + (void) kill(active_pid, SIGTERM); + last_signaled_pid = active_pid; } @@ -1370,14 +1441,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, ReplicationSlotMarkDirty(); ReplicationSlotSave(); ReplicationSlotRelease(); + pgstat_drop_replslot(s); - ereport(LOG, - errmsg("invalidating obsolete replication slot \"%s\"", - NameStr(slotname)), - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", - LSN_FORMAT_ARGS(restart_lsn), - (unsigned long long) (oldestLSN - restart_lsn)), - errhint("You might need to increase max_slot_wal_keep_size.")); + ReportTerminationInvalidation(false, check_on_xid, active_pid, + slotname, xid, restart_lsn, + oldestLSN); /* done with this slot for now */ break; @@ -1390,20 +1458,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, } /* - * Mark any slot that points to an LSN older than the given segment - * as invalid; it requires WAL that's about to be removed. + * Invalidate Obsolete slots. + * + * WAL case (aka check_on_xid is false): + * + * Mark any slot that points to an LSN older than the given segment + * as invalid; it requires WAL that's about to be removed. + * invalidated is set to true when any slot have got invalidated. * - * Returns true when any slot have got invalidated. + * Xid case (aka check_on_xid is true): + * + * When xid is valid, it means that we are about to remove rows older than xid. + * Therefore we need to invalidate slots that depend on seeing those rows. + * When xid is invalid, invalidate all logical slots. This is required when the + * master wal_level is set back to replica, so existing logical slots need to + * be invalidated. Note that WaitExceedsMaxStandbyDelay() is not taken into + * account here (as opposed to ResolveRecoveryConflictWithVirtualXIDs()): XXXX * - * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ bool -InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) +InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid, + TransactionId xid, bool check_on_xid) { - XLogRecPtr oldestLSN; + + XLogRecPtr oldestLSN = InvalidXLogRecPtr; bool invalidated = false; - XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + Assert(max_replication_slots >= 0); + + if (max_replication_slots == 0) + return invalidated; + + if (!check_on_xid) + XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); restart: LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); @@ -1414,16 +1501,15 @@ restart: if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) - { - /* if the lock was released, start from scratch */ + if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, dboid, &invalidated, xid, check_on_xid)) goto restart; - } } + LWLockRelease(ReplicationSlotControlLock); /* - * If any slots have been invalidated, recalculate the resource limits. + * If any slots have been invalidated, recalculate the required xmin and + * the required lsn (if appropriate). */ if (invalidated) { diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2f3c964824..015d276fd9 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -319,8 +319,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) * certain that the slot has been invalidated. Otherwise, test * availability from restart_lsn. */ - if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) && - !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at)) + if (ObsoleteSlotIsInvalid(slot, true)) walstate = WALAVAIL_REMOVED; else walstate = GetWALAvailability(slot_contents.data.restart_lsn); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 75e8363e24..b686691ca2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1253,6 +1253,13 @@ StartLogicalReplication(StartReplicationCmd *cmd) ReplicationSlotAcquire(cmd->slotname, true); + if (LogicalReplicationSlotIsInvalid(MyReplicationSlot)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot read from logical replication slot \"%s\"", + cmd->slotname), + errdetail("This slot has been invalidated because it was conflicting with recovery."))); + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 395b2cf690..c85cb5cc18 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 9f56b4e95c..a23220cae7 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -24,6 +24,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, */ void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator) { VirtualTransactionId *backends; @@ -491,6 +493,10 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, true); + + if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) + InvalidateObsoleteReplicationSlots(0, locator.dbOid, + snapshotConflictHorizon, true); } /* @@ -499,6 +505,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, */ void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator) { /* @@ -517,7 +524,9 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor TransactionId truncated; truncated = XidFromFullTransactionId(snapshotConflictHorizon); - ResolveRecoveryConflictWithSnapshot(truncated, locator); + ResolveRecoveryConflictWithSnapshot(truncated, + isCatalogRel, + locator); } } @@ -1478,6 +1487,9 @@ get_recovery_conflict_desc(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: reasonDesc = _("recovery conflict on snapshot"); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + reasonDesc = _("recovery conflict on replication slot"); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: reasonDesc = _("recovery conflict on buffer deadlock"); break; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index a10ecbaf50..4ec64b0a4a 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: errdetail("User query might have needed to see row versions that must be removed."); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + errdetail("User was using the logical slot that must be dropped."); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; @@ -3143,6 +3146,21 @@ RecoveryConflictInterrupt(ProcSignalReason reason) InterruptPending = true; break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + /* This signal is only used for logical sloti, sanity check */ + Assert(MyReplicationSlot && SlotIsLogical(MyReplicationSlot)); + RecoveryConflictPending = true; + QueryCancelPending = true; + InterruptPending = true; + + /* + * Normal backends should exit, so that the startup process + * can mark the slot invalid. + */ + if (!am_cascading_walsender) + ProcDiePending = true; + break; + default: elog(FATAL, "unrecognized conflict mode: %d", (int) reason); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8872c80cdf..f2838022e5 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -168,6 +168,64 @@ typedef struct ReplicationSlot #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid) +static inline bool +ObsoleteSlotIsInvalid(ReplicationSlot *s, bool check_invalidated_at) +{ + if (check_invalidated_at) + return (!XLogRecPtrIsInvalid(s->data.invalidated_at) && + XLogRecPtrIsInvalid(s->data.restart_lsn)); + else + return (XLogRecPtrIsInvalid(s->data.restart_lsn)); +} + +static inline bool +LogicalReplicationSlotIsInvalid(ReplicationSlot *s) +{ + return (!TransactionIdIsValid(s->effective_xmin) && + !TransactionIdIsValid(s->data.catalog_xmin)); +} + +static inline bool +TransactionIdIsValidPrecedesOrEquals(TransactionId xid1, TransactionId xid2) +{ + return (TransactionIdIsValid(xid1) && TransactionIdPrecedesOrEquals(xid1, xid2)); +} + +static inline bool +LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid) +{ + TransactionId slot_effective_xmin; + TransactionId slot_catalog_xmin; + + slot_effective_xmin = s->effective_xmin; + slot_catalog_xmin = s->data.catalog_xmin; + + return (TransactionIdIsValidPrecedesOrEquals(slot_effective_xmin, xid) || + TransactionIdIsValidPrecedesOrEquals(slot_catalog_xmin, xid)); +} + +static inline bool +SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN) +{ + return (s->data.restart_lsn >= oldestLSN); +} + +static inline bool +LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId xid) +{ + return (TransactionIdIsValid(xid) && !LogicalReplicationSlotXidsConflict(s, xid)); +} + +static inline bool +DoNotInvalidateSlot(ReplicationSlot *s, TransactionId xid, XLogRecPtr oldestLSN, bool check_on_xid) +{ + if (check_on_xid) + return (LogicalReplicationSlotIsInvalid(s) || LogicalSlotIsNotConflicting(s, xid)); + else + return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, oldestLSN)); + +} + /* * Shared memory control area for all of replication slots. */ @@ -215,7 +273,8 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); -extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); +extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, Oid dboid, + TransactionId xid, bool check_on_xid); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 905af2231b..2f52100b00 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -42,6 +42,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_TABLESPACE, PROCSIG_RECOVERY_CONFLICT_LOCK, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 2effdea126..41f4dc372e 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator); extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); -- 2.34.1