From 0fe0ea265c830125b6f9e12cd1cdebf773b44354 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Tue, 6 Apr 2021 12:03:32 +0000 Subject: [PATCH v15 3/5] Handle logical slot conflicts on standby. During WAL replay on standby, when slot conflict is identified, drop such slots. Also do the same thing if wal_level on master is reduced to below logical and there are existing logical slots on standby. Introduce a new ProcSignalReason value for slot conflict recovery. Arrange for a new pg_stat_get_activity field: confl_logicalslot. Amit Khandekar, reviewed by Andres Freund. --- doc/src/sgml/monitoring.sgml | 10 ++ src/backend/access/gist/gistxlog.c | 4 +- src/backend/access/hash/hash_xlog.c | 3 +- src/backend/access/heap/heapam.c | 13 +- src/backend/access/nbtree/nbtxlog.c | 4 +- src/backend/access/spgist/spgxlog.c | 1 + src/backend/access/transam/xlog.c | 14 ++ src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/pgstat.c | 22 ++++ src/backend/replication/slot.c | 183 +++++++++++++++++++++++++++ src/backend/storage/ipc/procarray.c | 4 + src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 11 +- src/backend/tcop/postgres.c | 22 ++++ src/backend/utils/adt/pgstatfuncs.c | 16 +++ src/include/catalog/pg_proc.dat | 5 + src/include/pgstat.h | 2 + src/include/replication/slot.h | 2 + src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 4 +- src/test/regress/expected/rules.out | 3 +- 21 files changed, 316 insertions(+), 14 deletions(-) 6.3% src/backend/access/heap/ 6.0% src/backend/access/transam/ 5.8% src/backend/access/ 4.8% src/backend/postmaster/ 48.3% src/backend/replication/ 6.4% src/backend/storage/ipc/ 8.0% src/backend/tcop/ 3.1% src/backend/utils/adt/ 6.3% src/include/ diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 56018745c8..850cc97b1b 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -3985,6 +3985,16 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i deadlocks + + + + confl_logicalslot bigint + + + Number of queries in this database that have been canceled due to + logical slots + + diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 97d814b927..b6c0d8b290 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) @@ -395,6 +396,7 @@ gistRedoPageReuse(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->latestRemovedFullXid, + xlrec->onCatalogTable, xlrec->node); } diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index 02d9e6cdfd..b1ff596eb7 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) RelFileNode rnode; XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ce6b66aa58..ca769b7862 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8522,7 +8522,8 @@ heap_xlog_cleanup_info(XLogReaderState *record) xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record); if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); /* * Actual operation is a no-op. Record type exists to provide a means for @@ -8558,7 +8559,8 @@ heap_xlog_clean(XLogReaderState *record) * latestRemovedXid is invalid, skip conflict processing. */ if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid)) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); /* * If we have a full-page image, restore it (using a cleanup lock) and @@ -8654,7 +8656,9 @@ heap_xlog_visible(XLogReaderState *record) * rather than killing the transaction outright. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, + xlrec->onCatalogTable, + rnode); /* * Read the heap page, if it still exists. If the heap file has dropped or @@ -8791,7 +8795,8 @@ heap_xlog_freeze_page(XLogReaderState *record) TransactionIdRetreat(latestRemovedXid); XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 1779b6ba47..36ee313428 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -669,7 +669,8 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); } /* @@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->latestRemovedFullXid, + xlrec->onCatalogTable, xlrec->node); } diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index 3dfd2aa317..add4da4e74 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + xldata->onCatalogTable, node); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6a21cba362..c5b5d6b610 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -10373,6 +10373,20 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); + /* + * Drop logical slots if we are in hot standby and the primary does not + * have a WAL level sufficient for logical decoding. No need to search + * for potentially conflicting logically slots if standby is running + * with wal_level lower than logical, because in that case, we would + * have either disallowed creation of logical slots or dropped existing + * ones. + */ + if (InRecovery && InHotStandby && + xlrec.wal_level < WAL_LEVEL_LOGICAL && + wal_level >= WAL_LEVEL_LOGICAL) + ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId, + gettext_noop("Logical decoding on standby requires wal_level >= logical on master.")); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5f2541d316..374007df9d 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1015,7 +1015,8 @@ CREATE VIEW pg_stat_database_conflicts AS pg_stat_get_db_conflict_lock(D.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin, - pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock + pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock, + pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_logicalslot FROM pg_database D; CREATE VIEW pg_stat_user_functions AS diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 5ba776e789..03c5dbea48 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -2928,6 +2928,24 @@ pgstat_send_archiver(const char *xlog, bool failed) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_send_droplogicalslot() - + * + * Tell the collector about a logical slot being dropped + * due to conflict. + * ---------- + */ +void +pgstat_send_droplogicalslot(Oid dbOid) +{ + PgStat_MsgRecoveryConflict msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT); + msg.m_databaseid = dbOid; + msg.m_reason = PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT; + pgstat_send(&msg, sizeof(msg)); +} + /* ---------- * pgstat_send_bgwriter() - * @@ -3402,6 +3420,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) dbentry->n_conflict_tablespace = 0; dbentry->n_conflict_lock = 0; dbentry->n_conflict_snapshot = 0; + dbentry->n_conflict_logicalslot = 0; dbentry->n_conflict_bufferpin = 0; dbentry->n_conflict_startup_deadlock = 0; dbentry->n_temp_files = 0; @@ -5290,6 +5309,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: dbentry->n_conflict_snapshot++; break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + dbentry->n_conflict_logicalslot++; + break; case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: dbentry->n_conflict_bufferpin++; break; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2ec7127947..4945dd1a4f 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,6 +46,7 @@ #include "pgstat.h" #include "replication/slot.h" #include "storage/fd.h" +#include "storage/lock.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" @@ -103,6 +104,7 @@ static int ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name, SlotAcquireBehavior behavior); static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); +static void ReplicationSlotDropConflicting(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -719,6 +721,70 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) LWLockRelease(ReplicationSlotAllocationLock); } +/* + * Permanently drop a conflicting replication slot. If it's already active by + * another backend, send it a recovery conflict signal, and then try again. + */ +static void +ReplicationSlotDropConflicting(ReplicationSlot *slot) +{ + pid_t active_pid; + PGPROC *proc; + VirtualTransactionId vxid; + bool initially_not_active; + + ConditionVariablePrepareToSleep(&slot->active_cv); + initially_not_active = true; + while (1) + { + SpinLockAcquire(&slot->mutex); + active_pid = slot->active_pid; + if (active_pid == 0) + active_pid = slot->active_pid = MyProcPid; + SpinLockRelease(&slot->mutex); + + /* Drop the acquired slot, unless it is acquired by another backend */ + if (active_pid == MyProcPid) + { + elog(DEBUG1, "acquired conflicting slot, now dropping it"); + if (initially_not_active) + pgstat_send_droplogicalslot(slot->data.database); + ReplicationSlotDropPtr(slot); + break; + } + + /* slot was active */ + initially_not_active = false; + + /* Send the other backend, a conflict recovery signal */ + SetInvalidVirtualTransactionId(vxid); + LWLockAcquire(ProcArrayLock, LW_SHARED); + proc = BackendPidGetProcWithLock(active_pid); + if (proc) + GET_VXID_FROM_PGPROC(vxid, *proc); + LWLockRelease(ProcArrayLock); + + /* + * If coincidently that process finished, some other backend may + * acquire the slot again. So start over again. + * Note: Even if vxid.localTransactionId is invalid, we need to cancel + * that backend, because there is no other way to make it release the + * slot. So don't bother to validate vxid.localTransactionId. + */ + if (vxid.backendId == InvalidBackendId) + continue; + + elog(DEBUG1, "cancelling pid %d (backendId: %d) for releasing slot", + active_pid, vxid.backendId); + + CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + ConditionVariableSleep(&slot->active_cv, + WAIT_EVENT_REPLICATION_SLOT_DROP); + } + + ConditionVariableCancelSleep(); +} + /* * Serialize the currently acquired slot's state from memory to disk, thereby * guaranteeing the current state will survive a crash. @@ -1173,6 +1239,123 @@ ReplicationSlotReserveWal(void) } } +/* + * Resolve recovery conflicts with logical slots. + * + * When xid is valid, it means that rows older than xid might have been + * removed. Therefore we need to drop slots that depend on seeing those rows. + * When xid is invalid, drop all logical slots. This is required when the + * master wal_level is set back to replica, so existing logical slots need to + * be dropped. Also, when xid is invalid, a common 'conflict_reason' is + * provided for the error detail; otherwise it is NULL, in which case it is + * constructed out of the xid value. + */ +void +ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, + char *conflict_reason) +{ + int i; + bool found_conflict = false; + + Assert(max_replication_slots >= 0); + + if (max_replication_slots == 0) + return; + +restart: + if (found_conflict) + { + CHECK_FOR_INTERRUPTS(); + found_conflict = false; + } + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + /* We are only dealing with *logical* slot conflicts. */ + if (!SlotIsLogical(s)) + continue; + + /* Invalid xid means caller is asking to drop all logical slots */ + if (!TransactionIdIsValid(xid)) + found_conflict = true; + else + { + TransactionId slot_xmin; + TransactionId slot_catalog_xmin; + StringInfoData conflict_str, conflict_xmins; + char *conflict_sentence = + gettext_noop("Slot conflicted with xid horizon which was being increased to"); + + /* not our database, skip */ + if (s->data.database != InvalidOid && s->data.database != dboid) + continue; + + SpinLockAcquire(&s->mutex); + slot_xmin = s->data.xmin; + slot_catalog_xmin = s->data.catalog_xmin; + SpinLockRelease(&s->mutex); + + /* + * Build the conflict_str which will look like : + * "Slot conflicted with xid horizon which was being increased + * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)." + */ + initStringInfo(&conflict_xmins); + if (TransactionIdIsValid(slot_xmin) && + TransactionIdPrecedesOrEquals(slot_xmin, xid)) + { + appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin); + } + if (TransactionIdIsValid(slot_catalog_xmin) && + TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)) + appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d", + conflict_xmins.len > 0 ? ", " : "", + slot_catalog_xmin); + + if (conflict_xmins.len > 0) + { + initStringInfo(&conflict_str); + appendStringInfo(&conflict_str, "%s %d (%s).", + conflict_sentence, xid, conflict_xmins.data); + found_conflict = true; + conflict_reason = conflict_str.data; + } + } + + if (found_conflict) + { + NameData slotname; + + SpinLockAcquire(&s->mutex); + slotname = s->data.name; + SpinLockRelease(&s->mutex); + + /* ReplicationSlotDropConflicting() will acquire the lock below */ + LWLockRelease(ReplicationSlotControlLock); + + ReplicationSlotDropConflicting(s); + + ereport(LOG, + (errmsg("dropped conflicting slot %s", NameStr(slotname)), + errdetail("%s", conflict_reason))); + + /* We released the lock above; so re-scan the slots. */ + goto restart; + } + } + + LWLockRelease(ReplicationSlotControlLock); +} + /* * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index e113a85aed..9054715e82 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -3372,6 +3372,10 @@ SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode, GET_VXID_FROM_PGPROC(procvxid, *proc); + /* + * Note: vxid.localTransactionId can be invalid, which means the + * request is to signal the pid that is not running a transaction. + */ if (procvxid.backendId == vxid.backendId && procvxid.localTransactionId == vxid.localTransactionId) { diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index eac6895141..a3fa6bdc01 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 1465ee44a1..d155a1de20 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -23,6 +23,7 @@ #include "access/xloginsert.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -440,7 +441,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, } void -ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node) +ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, + bool onCatalogTable, RelFileNode node) { VirtualTransactionId *backends; @@ -465,6 +467,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, true); + + if (onCatalogTable) + ResolveRecoveryConflictWithLogicalSlots(node.dbNode, latestRemovedXid, NULL); } /* @@ -473,7 +478,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode */ void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId latestRemovedFullXid, - RelFileNode node) + bool onCatalogTable, RelFileNode node) { /* * ResolveRecoveryConflictWithSnapshot operates on 32-bit TransactionIds, @@ -491,7 +496,7 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId latestRemovedFullXi TransactionId latestRemovedXid; latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, node); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, onCatalogTable, node); } } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 330ec5b028..c5edb7f4f7 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2441,6 +2441,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: errdetail("User query might have needed to see row versions that must be removed."); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + errdetail("User was using the logical slot that must be dropped."); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; @@ -3011,6 +3014,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_LOCK: case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + /* + * For conflicts that require a logical slot to be dropped, the + * requirement is for the signal receiver to release the slot, + * so that it could be dropped by the signal sender. So for + * normal backends, the transaction should be aborted, just + * like for other recovery conflicts. But if it's walsender on + * standby, then it has to be killed so as to release an + * acquired logical slot. + */ + if (am_cascading_walsender && + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && + MyReplicationSlot && SlotIsLogical(MyReplicationSlot)) + { + RecoveryConflictPending = true; + QueryCancelPending = true; + InterruptPending = true; + break; + } /* * If we aren't in a transaction any longer then ignore. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 9ffbca685c..fe7f3105f2 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1494,6 +1494,21 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS) PG_RETURN_INT64(result); } +Datum +pg_stat_get_db_conflict_logicalslot(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + int64 result; + PgStat_StatDBEntry *dbentry; + + if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL) + result = 0; + else + result = (int64) (dbentry->n_conflict_logicalslot); + + PG_RETURN_INT64(result); +} + Datum pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS) { @@ -1537,6 +1552,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS) result = (int64) (dbentry->n_conflict_tablespace + dbentry->n_conflict_lock + dbentry->n_conflict_snapshot + + dbentry->n_conflict_logicalslot + dbentry->n_conflict_bufferpin + dbentry->n_conflict_startup_deadlock); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 4309fa40dd..3afe542864 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5432,6 +5432,11 @@ proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's', proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', prosrc => 'pg_stat_get_db_conflict_snapshot' }, +{ oid => '4544', + descr => 'statistics: recovery conflicts in database caused by logical replication slot', + proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's', + proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', + prosrc => 'pg_stat_get_db_conflict_logicalslot' }, { oid => '3068', descr => 'statistics: recovery conflicts in database caused by shared buffer pin', proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 7cd137506e..19d13caab5 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -719,6 +719,7 @@ typedef struct PgStat_StatDBEntry PgStat_Counter n_conflict_tablespace; PgStat_Counter n_conflict_lock; PgStat_Counter n_conflict_snapshot; + PgStat_Counter n_conflict_logicalslot; PgStat_Counter n_conflict_bufferpin; PgStat_Counter n_conflict_startup_deadlock; PgStat_Counter n_temp_files; @@ -1064,6 +1065,7 @@ extern void pgstat_twophase_postabort(TransactionId xid, uint16 info, void *recdata, uint32 len); extern void pgstat_send_archiver(const char *xlog, bool failed); +extern void pgstat_send_droplogicalslot(Oid dbOid); extern void pgstat_send_bgwriter(void); extern void pgstat_report_wal(void); extern bool pgstat_send_wal(bool force); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1ad5e6c50d..b6e5ffff79 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -232,4 +232,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); +extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason); + #endif /* SLOT_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index eec186be2e..f86b070dbc 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -41,6 +41,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_TABLESPACE, PROCSIG_RECOVERY_CONFLICT_LOCK, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 38fd85a431..3ba1882216 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -30,9 +30,9 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, - RelFileNode node); + bool onCatalogTable, RelFileNode node); extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId latestRemovedFullXid, - RelFileNode node); + bool onCatalogTable, RelFileNode node); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 9b59a7b4a5..5913dc96b9 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1870,7 +1870,8 @@ pg_stat_database_conflicts| SELECT d.oid AS datid, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, - pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock + pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock, + pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_logicalslot FROM pg_database d; pg_stat_gssapi| SELECT s.pid, s.gss_auth AS gss_authenticated, -- 2.18.4