From 59d6c2ff336974d1671a85e5de0c5f73882c8480 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Fri, 6 Aug 2021 09:41:36 +0000 Subject: [PATCH v23 2/5] Handle logical slot conflicts on standby. During WAL replay on standby, when slot conflict is identified, invalidate such slots. Also do the same thing if wal_level on master is reduced to below logical and there are existing logical slots on standby. Introduce a new ProcSignalReason value for slot conflict recovery. Arrange for a new pg_stat_get_activity field: confl_logicalslot. Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello --- doc/src/sgml/monitoring.sgml | 10 + src/backend/access/gist/gistxlog.c | 4 +- src/backend/access/hash/hash_xlog.c | 3 +- src/backend/access/heap/heapam.c | 10 +- src/backend/access/nbtree/nbtxlog.c | 4 +- src/backend/access/spgist/spgxlog.c | 1 + src/backend/access/transam/xlog.c | 13 ++ src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/pgstat.c | 20 ++ .../replication/logical/logicalfuncs.c | 7 +- src/backend/replication/slot.c | 209 ++++++++++++++++++ src/backend/replication/walsender.c | 8 + src/backend/storage/ipc/procarray.c | 4 + src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 11 +- src/backend/tcop/postgres.c | 22 ++ src/backend/utils/adt/pgstatfuncs.c | 16 ++ src/include/catalog/pg_proc.dat | 5 + src/include/pgstat.h | 2 + src/include/replication/slot.h | 3 + src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 4 +- src/test/regress/expected/rules.out | 3 +- 23 files changed, 351 insertions(+), 15 deletions(-) 4.1% src/backend/access/heap/ 4.7% src/backend/access/transam/ 5.1% src/backend/access/ 4.3% src/backend/postmaster/ 3.5% src/backend/replication/logical/ 52.2% src/backend/replication/ 5.6% src/backend/storage/ipc/ 7.1% src/backend/tcop/ 3.6% src/backend/ 6.3% src/include/ diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 74a58a916c..6072eee73e 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -4034,6 +4034,16 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i deadlocks + + + + confl_logicalslot bigint + + + Number of queries in this database that have been canceled due to + logical slots + + diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 46aee6f2a9..5963e639d8 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) @@ -395,6 +396,7 @@ gistRedoPageReuse(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->latestRemovedFullXid, + xlrec->onCatalogTable, xlrec->node); } diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index 27475fcbd6..e5c6124400 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) RelFileNode rnode; XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 4019a2122e..75ca5f79f4 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8419,7 +8419,8 @@ heap_xlog_prune(XLogReaderState *record) * no queries running for which the removed tuples are still visible. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); /* * If we have a full-page image, restore it (using a cleanup lock) and @@ -8587,7 +8588,9 @@ heap_xlog_visible(XLogReaderState *record) * rather than killing the transaction outright. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, + xlrec->onCatalogTable, + rnode); /* * Read the heap page, if it still exists. If the heap file has dropped or @@ -8724,7 +8727,8 @@ heap_xlog_freeze_page(XLogReaderState *record) TransactionIdRetreat(latestRemovedXid); XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 786c08c0ce..41b7ec8e2d 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -668,7 +668,8 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); } /* @@ -1006,6 +1007,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->latestRemovedFullXid, + xlrec->onCatalogTable, xlrec->node); } diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index 3dfd2aa317..add4da4e74 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + xldata->onCatalogTable, node); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index d0ec6a834b..7b6d75cd4b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -10391,6 +10391,19 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); + /* + * Invalidate logical slots if we are in hot standby and the primary does not + * have a WAL level sufficient for logical decoding. No need to search + * for potentially conflicting logically slots if standby is running + * with wal_level lower than logical, because in that case, we would + * have either disallowed creation of logical slots or invalidated existing + * ones. + */ + if (InRecovery && InHotStandby && + xlrec.wal_level < WAL_LEVEL_LOGICAL && + wal_level >= WAL_LEVEL_LOGICAL) + InvalidateConflictingLogicalReplicationSlots(InvalidOid,InvalidTransactionId); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 55f6e3711d..cd1fc88d17 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1020,7 +1020,8 @@ CREATE VIEW pg_stat_database_conflicts AS pg_stat_get_db_conflict_lock(D.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin, - pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock + pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock, + pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_logicalslot FROM pg_database D; CREATE VIEW pg_stat_user_functions AS diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 56755cb92b..4fd2117443 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1865,6 +1865,22 @@ pgstat_report_replslot_drop(const char *slotname) pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } +/* ---------- + * pgstat_report_replslot_conflict() + * Tell the collector about a logical slot being conflicting + * with recovery. + * ---------- + */ +void +pgstat_report_replslot_conflict(Oid dboid) +{ + PgStat_MsgRecoveryConflict msg; + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT); + msg.m_databaseid = dboid; + msg.m_reason = PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT; + pgstat_send(&msg, sizeof(msg)); +} + /* ---------- * pgstat_ping() - * @@ -3558,6 +3574,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) dbentry->n_conflict_tablespace = 0; dbentry->n_conflict_lock = 0; dbentry->n_conflict_snapshot = 0; + dbentry->n_conflict_logicalslot = 0; dbentry->n_conflict_bufferpin = 0; dbentry->n_conflict_startup_deadlock = 0; dbentry->n_temp_files = 0; @@ -5507,6 +5524,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: dbentry->n_conflict_snapshot++; break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + dbentry->n_conflict_logicalslot++; + break; case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: dbentry->n_conflict_bufferpin++; break; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 1f38c5b33e..1d9400ea63 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -241,11 +241,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* * After the sanity checks in CreateDecodingContext, make sure the - * restart_lsn is valid. Avoid "cannot get changes" wording in this + * restart_lsn is valid or both xmin and catalog_xmin are valid. + * Avoid "cannot get changes" wording in this * errmsg because that'd be confusingly ambiguous about no changes * being available. */ - if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn) + || (!TransactionIdIsValid(MyReplicationSlot->data.xmin) + && !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin))) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("can no longer get changes from replication slot \"%s\"", diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 33e9acab4a..6e89bab255 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1338,6 +1338,215 @@ restart: return invalidated; } +/* + * Helper for InvalidateConflictingLogicalReplicationSlot -- acquires the given slot + * and mark it invalid, if necessary and possible. + * + * Returns whether ReplicationSlotControlLock was released in the interim (and + * in that case we're not holding the lock at return, otherwise we are). + * + * This is inherently racy, because we release the LWLock + * for syscalls, so caller must restart if we return true. + */ +static bool +InvalidatePossiblyConflictingLogicalReplicationSlot(ReplicationSlot *s, TransactionId xid) +{ + int last_signaled_pid = 0; + bool released_lock = false; + + for (;;) + { + TransactionId slot_xmin; + TransactionId slot_catalog_xmin; + NameData slotname; + int active_pid = 0; + + Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); + + if (!s->in_use) + { + if (released_lock) + LWLockRelease(ReplicationSlotControlLock); + break; + } + + /* + * Check if the slot needs to be invalidated. If it needs to be + * invalidated, and is not currently acquired, acquire it and mark it + * as having been invalidated. We do this with the spinlock held to + * avoid race conditions -- for example the xmin(s) could move forward + * , or the slot could be dropped. + */ + SpinLockAcquire(&s->mutex); + + slot_xmin = s->data.xmin; + slot_catalog_xmin = s->data.catalog_xmin; + + /* + * If the slot is already invalid or is not conflicting, we don't need to + * do anything. + */ + + /* slot has been invalidated */ + if ((!TransactionIdIsValid(slot_xmin) && !TransactionIdIsValid(slot_catalog_xmin)) + || + /* + * we are not forcing for invalidation because the xid is valid + * and this is a non conflicting slot + */ + (TransactionIdIsValid(xid) && !( + (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid)) + || + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)) + )) + ) + { + SpinLockRelease(&s->mutex); + if (released_lock) + LWLockRelease(ReplicationSlotControlLock); + break; + } + + slotname = s->data.name; + active_pid = s->active_pid; + + /* + * If the slot can be acquired, do so and mark it invalidated + * immediately. Otherwise we'll signal the owning process, below, and + * retry. + */ + if (active_pid == 0) + { + MyReplicationSlot = s; + s->active_pid = MyProcPid; + s->data.xmin = InvalidTransactionId; + s->data.catalog_xmin = InvalidTransactionId; + } + + SpinLockRelease(&s->mutex); + + if (active_pid != 0) + { + /* + * Prepare the sleep on the slot's condition variable before + * releasing the lock, to close a possible race condition if the + * slot is released before the sleep below. + */ + + ConditionVariablePrepareToSleep(&s->active_cv); + + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* + * Signal to terminate the process that owns the slot, if we + * haven't already signalled it. (Avoidance of repeated + * signalling is the only reason for there to be a loop in this + * routine; otherwise we could rely on caller's restart loop.) + * + * There is the race condition that other process may own the slot + * after its current owner process is terminated and before this + * process owns it. To handle that, we signal only if the PID of + * the owning process has changed from the previous time. (This + * logic assumes that the same PID is not reused very quickly.) + */ + if (last_signaled_pid != active_pid) + { + ereport(LOG, + (errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery", + active_pid, NameStr(slotname)))); + + (void) kill(active_pid, SIGTERM); + last_signaled_pid = active_pid; + } + + /* Wait until the slot is released. */ + ConditionVariableSleep(&s->active_cv, + WAIT_EVENT_REPLICATION_SLOT_DROP); + + /* + * Re-acquire lock and start over; we expect to invalidate the + * slot next time (unless another process acquires the slot in the + * meantime). + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + continue; + } + else + { + /* + * We hold the slot now and have already invalidated it; flush it + * to ensure that state persists. + * + * Don't want to hold ReplicationSlotControlLock across file + * system operations, so release it now but be sure to tell caller + * to restart from scratch. + */ + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); + pgstat_report_replslot_conflict(s->data.database); + + ereport(LOG, + (errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)))); + + /* done with this slot for now */ + break; + } + } + + Assert(!released_lock == LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); + + return released_lock; +} + +/* + * Resolve recovery conflicts with logical slots. + * + * When xid is valid, it means that we are about to remove rows older than xid. + * Therefore we need to invalidate slots that depend on seeing those rows. + * When xid is invalid, invalidate all logical slots. This is required when the + * master wal_level is set back to replica, so existing logical slots need to + * be invalidated. + */ +void +InvalidateConflictingLogicalReplicationSlots(Oid dboid, TransactionId xid) +{ + + Assert(max_replication_slots >= 0); + + if (max_replication_slots == 0) + return; +restart: + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (!s->in_use) + continue; + + /* We are only dealing with *logical* slot conflicts. */ + if (!SlotIsLogical(s)) + continue; + + /* not our database and we don't want all the database, skip */ + if (s->data.database != dboid && TransactionIdIsValid(xid)) + continue; + + if (InvalidatePossiblyConflictingLogicalReplicationSlot(s, xid)) + { + /* if the lock was released, we need to restart from scratch */ + goto restart; + } + } + LWLockRelease(ReplicationSlotControlLock); +} + /* * Flush all replication slots to disk. * diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3ca2a11389..d0e247b104 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1147,6 +1147,14 @@ StartLogicalReplication(StartReplicationCmd *cmd) ReplicationSlotAcquire(cmd->slotname, true); + if (!TransactionIdIsValid(MyReplicationSlot->data.xmin) + && !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot read from logical replication slot \"%s\"", + cmd->slotname), + errdetail("This slot has been invalidated because it was conflicting with recovery."))); + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index c7816fcfb3..0c555a390c 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -3443,6 +3443,10 @@ SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode, GET_VXID_FROM_PGPROC(procvxid, *proc); + /* + * Note: vxid.localTransactionId can be invalid, which means the + * request is to signal the pid that is not running a transaction. + */ if (procvxid.backendId == vxid.backendId && procvxid.localTransactionId == vxid.localTransactionId) { diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index defb75aa26..315d5a1e33 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 077251c1a6..28121f0658 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -34,6 +34,7 @@ #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" +#include "replication/slot.h" /* User-settable GUC parameters */ int vacuum_defer_cleanup_age; @@ -440,7 +441,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, } void -ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node) +ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, + bool onCatalogTable, RelFileNode node) { VirtualTransactionId *backends; @@ -465,6 +467,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, true); + + if (onCatalogTable) + InvalidateConflictingLogicalReplicationSlots(node.dbNode, latestRemovedXid); } /* @@ -473,7 +478,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode */ void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId latestRemovedFullXid, - RelFileNode node) + bool onCatalogTable, RelFileNode node) { /* * ResolveRecoveryConflictWithSnapshot operates on 32-bit TransactionIds, @@ -491,7 +496,7 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId latestRemovedFullXi TransactionId latestRemovedXid; latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, node); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, onCatalogTable, node); } } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 58b5960e27..e4cb07abeb 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2467,6 +2467,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: errdetail("User query might have needed to see row versions that must be removed."); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + errdetail("User was using the logical slot that must be dropped."); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; @@ -3037,6 +3040,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_LOCK: case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + /* + * For conflicts that require a logical slot to be invalidated, the + * requirement is for the signal receiver to release the slot, + * so that it could be invalidated by the signal sender. So for + * normal backends, the transaction should be aborted, just + * like for other recovery conflicts. But if it's walsender on + * standby, then it has to be killed so as to release an + * acquired logical slot. + */ + if (am_cascading_walsender && + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && + MyReplicationSlot && SlotIsLogical(MyReplicationSlot)) + { + RecoveryConflictPending = true; + QueryCancelPending = true; + InterruptPending = true; + break; + } /* * If we aren't in a transaction any longer then ignore. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index ff5aedc99c..073b402bac 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1497,6 +1497,21 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS) PG_RETURN_INT64(result); } +Datum +pg_stat_get_db_conflict_logicalslot(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + int64 result; + PgStat_StatDBEntry *dbentry; + + if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL) + result = 0; + else + result = (int64) (dbentry->n_conflict_logicalslot); + + PG_RETURN_INT64(result); +} + Datum pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS) { @@ -1540,6 +1555,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS) result = (int64) (dbentry->n_conflict_tablespace + dbentry->n_conflict_lock + dbentry->n_conflict_snapshot + + dbentry->n_conflict_logicalslot + dbentry->n_conflict_bufferpin + dbentry->n_conflict_startup_deadlock); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b603700ed9..94ce48eca7 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5502,6 +5502,11 @@ proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's', proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', prosrc => 'pg_stat_get_db_conflict_snapshot' }, +{ oid => '4544', + descr => 'statistics: recovery conflicts in database caused by logical replication slot', + proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's', + proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', + prosrc => 'pg_stat_get_db_conflict_logicalslot' }, { oid => '3068', descr => 'statistics: recovery conflicts in database caused by shared buffer pin', proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 2068a68a5f..ead9390d06 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -755,6 +755,7 @@ typedef struct PgStat_StatDBEntry PgStat_Counter n_conflict_tablespace; PgStat_Counter n_conflict_lock; PgStat_Counter n_conflict_snapshot; + PgStat_Counter n_conflict_logicalslot; PgStat_Counter n_conflict_bufferpin; PgStat_Counter n_conflict_startup_deadlock; PgStat_Counter n_temp_files; @@ -1047,6 +1048,7 @@ extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat); extern void pgstat_report_replslot_create(const char *slotname); extern void pgstat_report_replslot_drop(const char *slotname); +extern void pgstat_report_replslot_conflict(Oid dbOid); extern void pgstat_initialize(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index e32fb85db8..4779617cd7 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -214,6 +214,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); +extern void InvalidateConflictingLogicalReplicationSlots(Oid dboid, TransactionId xid); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot); extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); @@ -223,4 +224,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); +extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason); + #endif /* SLOT_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index eec186be2e..f86b070dbc 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -41,6 +41,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_TABLESPACE, PROCSIG_RECOVERY_CONFLICT_LOCK, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 38fd85a431..3ba1882216 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -30,9 +30,9 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, - RelFileNode node); + bool onCatalogTable, RelFileNode node); extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId latestRemovedFullXid, - RelFileNode node); + bool onCatalogTable, RelFileNode node); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e5ab11275d..66f28b649d 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1871,7 +1871,8 @@ pg_stat_database_conflicts| SELECT d.oid AS datid, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, - pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock + pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock, + pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_logicalslot FROM pg_database d; pg_stat_gssapi| SELECT s.pid, s.gss_auth AS gss_authenticated, -- 2.18.4