From ff74347f19fab702531dc58b620080cce21fdd7d Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Thu, 4 Feb 2021 16:24:35 +0000 Subject: [PATCH v9 3/5] Handle logical slot conflicts on standby. During WAL replay on standby, when slot conflict is identified, drop such slots. Also do the same thing if wal_level on master is reduced to below logical and there are existing logical slots on standby. Introduce a new ProcSignalReason value for slot conflict recovery. Arrange for a new pg_stat_get_activity field: confl_logicalslot. Amit Khandekar, reviewed by Andres Freund. --- doc/src/sgml/monitoring.sgml | 10 ++ src/backend/access/gist/gistxlog.c | 4 +- src/backend/access/hash/hash_xlog.c | 3 +- src/backend/access/heap/heapam.c | 13 ++- src/backend/access/nbtree/nbtxlog.c | 4 +- src/backend/access/spgist/spgxlog.c | 1 + src/backend/access/transam/xlog.c | 14 +++ src/backend/catalog/system_views.sql | 1 + src/backend/postmaster/pgstat.c | 4 + src/backend/replication/slot.c | 188 +++++++++++++++++++++++++++++++++++ src/backend/storage/ipc/procarray.c | 4 + src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 7 +- src/backend/tcop/postgres.c | 22 ++++ src/backend/utils/adt/pgstatfuncs.c | 16 +++ src/include/catalog/pg_proc.dat | 5 + src/include/pgstat.h | 1 + src/include/replication/slot.h | 2 + src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 2 +- src/test/regress/expected/rules.out | 1 + 21 files changed, 297 insertions(+), 9 deletions(-) 3.0% doc/src/sgml/ 6.8% src/backend/access/heap/ 6.4% src/backend/access/transam/ 6.2% src/backend/access/ 52.7% src/backend/replication/ 4.6% src/backend/storage/ipc/ 8.5% src/backend/tcop/ 3.3% src/backend/utils/adt/ 5.4% src/include/ diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index c602ee4427..5fede5684c 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -3923,6 +3923,16 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i deadlocks + + + + confl_logicalslot bigint + + + Number of queries in this database that have been canceled due to + logical slots + + diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 82271ef8a9..0ff503bfb6 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) @@ -413,6 +414,7 @@ gistRedoPageReuse(XLogReaderState *record) latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid); ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); } } diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index 02d9e6cdfd..b1ff596eb7 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) RelFileNode rnode; XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, + xldata->onCatalogTable, rnode); } action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 027c047904..b6d8effc03 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8367,7 +8367,8 @@ heap_xlog_cleanup_info(XLogReaderState *record) xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record); if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); /* * Actual operation is a no-op. Record type exists to provide a means for @@ -8403,7 +8404,8 @@ heap_xlog_clean(XLogReaderState *record) * latestRemovedXid is invalid, skip conflict processing. */ if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid)) - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); /* * If we have a full-page image, restore it (using a cleanup lock) and @@ -8499,7 +8501,9 @@ heap_xlog_visible(XLogReaderState *record) * rather than killing the transaction outright. */ if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, + xlrec->onCatalogTable, + rnode); /* * Read the heap page, if it still exists. If the heap file has dropped or @@ -8636,7 +8640,8 @@ heap_xlog_freeze_page(XLogReaderState *record) TransactionIdRetreat(latestRemovedXid); XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(latestRemovedXid, + xlrec->onCatalogTable, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index c1d578cc01..2a61e02348 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -669,7 +669,8 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); - ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); + ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, rnode); } /* @@ -981,6 +982,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) { ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, + xlrec->onCatalogTable, xlrec->node); } } diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index d40c7b5877..1a5c8959a1 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &node, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid, + xldata->onCatalogTable, node); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 856b8412e7..1696557cfb 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -10309,6 +10309,20 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); + /* + * Drop logical slots if we are in hot standby and the primary does not + * have a WAL level sufficient for logical decoding. No need to search + * for potentially conflicting logically slots if standby is running + * with wal_level lower than logical, because in that case, we would + * have either disallowed creation of logical slots or dropped existing + * ones. + */ + if (InRecovery && InHotStandby && + xlrec.wal_level < WAL_LEVEL_LOGICAL && + wal_level >= WAL_LEVEL_LOGICAL) + ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId, + gettext_noop("Logical decoding on standby requires wal_level >= logical on master.")); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fa58afd9d7..d8f606ed49 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -945,6 +945,7 @@ CREATE VIEW pg_stat_database_conflicts AS pg_stat_get_db_conflict_tablespace(D.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(D.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot, + pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_logicalslot, pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock FROM pg_database D; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index f75b52719d..cfc078873d 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -5068,6 +5068,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) dbentry->n_conflict_tablespace = 0; dbentry->n_conflict_lock = 0; dbentry->n_conflict_snapshot = 0; + dbentry->n_conflict_logicalslot = 0; dbentry->n_conflict_bufferpin = 0; dbentry->n_conflict_startup_deadlock = 0; dbentry->n_temp_files = 0; @@ -6943,6 +6944,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: dbentry->n_conflict_snapshot++; break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + dbentry->n_conflict_logicalslot++; + break; case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: dbentry->n_conflict_bufferpin++; break; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 8b02d2f437..47be24b131 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,6 +46,7 @@ #include "pgstat.h" #include "replication/slot.h" #include "storage/fd.h" +#include "storage/lock.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" @@ -103,6 +104,7 @@ static int ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name, SlotAcquireBehavior behavior); static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); +static void ReplicationSlotDropConflicting(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -711,6 +713,64 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) LWLockRelease(ReplicationSlotAllocationLock); } +/* + * Permanently drop a conflicting replication slot. If it's already active by + * another backend, send it a recovery conflict signal, and then try again. + */ +static void +ReplicationSlotDropConflicting(ReplicationSlot *slot) +{ + pid_t active_pid; + PGPROC *proc; + VirtualTransactionId vxid; + + ConditionVariablePrepareToSleep(&slot->active_cv); + while (1) + { + SpinLockAcquire(&slot->mutex); + active_pid = slot->active_pid; + if (active_pid == 0) + active_pid = slot->active_pid = MyProcPid; + SpinLockRelease(&slot->mutex); + + /* Drop the acquired slot, unless it is acquired by another backend */ + if (active_pid == MyProcPid) + { + elog(DEBUG1, "acquired conflicting slot, now dropping it"); + ReplicationSlotDropPtr(slot); + break; + } + + /* Send the other backend, a conflict recovery signal */ + + SetInvalidVirtualTransactionId(vxid); + LWLockAcquire(ProcArrayLock, LW_SHARED); + proc = BackendPidGetProcWithLock(active_pid); + if (proc) + GET_VXID_FROM_PGPROC(vxid, *proc); + LWLockRelease(ProcArrayLock); + + /* + * If coincidently that process finished, some other backend may + * acquire the slot again. So start over again. + * Note: Even if vxid.localTransactionId is invalid, we need to cancel + * that backend, because there is no other way to make it release the + * slot. So don't bother to validate vxid.localTransactionId. + */ + if (vxid.backendId == InvalidBackendId) + continue; + + elog(DEBUG1, "cancelling pid %d (backendId: %d) for releasing slot", + active_pid, vxid.backendId); + + CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + ConditionVariableSleep(&slot->active_cv, + WAIT_EVENT_REPLICATION_SLOT_DROP); + } + + ConditionVariableCancelSleep(); +} + /* * Serialize the currently acquired slot's state from memory to disk, thereby * guaranteeing the current state will survive a crash. @@ -1141,12 +1201,25 @@ ReplicationSlotReserveWal(void) { XLogRecPtr flushptr; + /* start at current insert position */ + restart_lsn = GetXLogInsertRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + /* make sure we have enough information to start */ flushptr = LogStandbySnapshot(); /* and make sure it's fsynced to disk */ XLogFlush(flushptr); } + else + { + restart_lsn = GetRedoRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + } /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); @@ -1164,6 +1237,121 @@ ReplicationSlotReserveWal(void) } } +/* + * Resolve recovery conflicts with logical slots. + * + * When xid is valid, it means that rows older than xid might have been + * removed. Therefore we need to drop slots that depend on seeing those rows. + * When xid is invalid, drop all logical slots. This is required when the + * master wal_level is set back to replica, so existing logical slots need to + * be dropped. Also, when xid is invalid, a common 'conflict_reason' is + * provided for the error detail; otherwise it is NULL, in which case it is + * constructed out of the xid value. + */ +void +ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, + char *conflict_reason) +{ + int i; + bool found_conflict = false; + + if (max_replication_slots <= 0) + return; + +restart: + if (found_conflict) + { + CHECK_FOR_INTERRUPTS(); + found_conflict = false; + } + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + /* We are only dealing with *logical* slot conflicts. */ + if (!SlotIsLogical(s)) + continue; + + /* Invalid xid means caller is asking to drop all logical slots */ + if (!TransactionIdIsValid(xid)) + found_conflict = true; + else + { + TransactionId slot_xmin; + TransactionId slot_catalog_xmin; + StringInfoData conflict_str, conflict_xmins; + char *conflict_sentence = + gettext_noop("Slot conflicted with xid horizon which was being increased to"); + + /* not our database, skip */ + if (s->data.database != InvalidOid && s->data.database != dboid) + continue; + + SpinLockAcquire(&s->mutex); + slot_xmin = s->data.xmin; + slot_catalog_xmin = s->data.catalog_xmin; + SpinLockRelease(&s->mutex); + + /* + * Build the conflict_str which will look like : + * "Slot conflicted with xid horizon which was being increased + * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)." + */ + initStringInfo(&conflict_xmins); + if (TransactionIdIsValid(slot_xmin) && + TransactionIdPrecedesOrEquals(slot_xmin, xid)) + { + appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin); + } + if (TransactionIdIsValid(slot_catalog_xmin) && + TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)) + appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d", + conflict_xmins.len > 0 ? ", " : "", + slot_catalog_xmin); + + if (conflict_xmins.len > 0) + { + initStringInfo(&conflict_str); + appendStringInfo(&conflict_str, "%s %d (%s).", + conflict_sentence, xid, conflict_xmins.data); + found_conflict = true; + conflict_reason = conflict_str.data; + } + } + + if (found_conflict) + { + NameData slotname; + + SpinLockAcquire(&s->mutex); + slotname = s->data.name; + SpinLockRelease(&s->mutex); + + /* ReplicationSlotDropConflicting() will acquire the lock below */ + LWLockRelease(ReplicationSlotControlLock); + + ReplicationSlotDropConflicting(s); + + ereport(LOG, + (errmsg("dropped conflicting slot %s", NameStr(slotname)), + errdetail("%s", conflict_reason))); + + /* We released the lock above; so re-scan the slots. */ + goto restart; + } + } + + LWLockRelease(ReplicationSlotControlLock); +} + /* * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index cf12eda504..3439026eb1 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -3346,6 +3346,10 @@ SignalVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode, GET_VXID_FROM_PGPROC(procvxid, *proc); + /* + * Note: vxid.localTransactionId can be invalid, which means the + * request is to signal the pid that is not running a transaction. + */ if (procvxid.backendId == vxid.backendId && procvxid.localTransactionId == vxid.localTransactionId) { diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index c43cdd685b..4740e8352a 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -680,6 +680,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 39a30c00f7..0087b2180e 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -23,6 +23,7 @@ #include "access/xloginsert.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -425,7 +426,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, } void -ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node) +ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, + bool onCatalogTable, RelFileNode node) { VirtualTransactionId *backends; @@ -450,6 +452,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, true); + + if (onCatalogTable) + ResolveRecoveryConflictWithLogicalSlots(node.dbNode, latestRemovedXid, NULL); } void diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index cb5a96117f..8aeeddf0a3 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2466,6 +2466,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: errdetail("User query might have needed to see row versions that must be removed."); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + errdetail("User was using the logical slot that must be dropped."); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; @@ -2979,6 +2982,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_LOCK: case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + /* + * For conflicts that require a logical slot to be dropped, the + * requirement is for the signal receiver to release the slot, + * so that it could be dropped by the signal sender. So for + * normal backends, the transaction should be aborted, just + * like for other recovery conflicts. But if it's walsender on + * standby, then it has to be killed so as to release an + * acquired logical slot. + */ + if (am_cascading_walsender && + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && + MyReplicationSlot && SlotIsLogical(MyReplicationSlot)) + { + RecoveryConflictPending = true; + QueryCancelPending = true; + InterruptPending = true; + break; + } /* * If we aren't in a transaction any longer then ignore. diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 62bff52638..76ba80f5ca 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1496,6 +1496,21 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS) PG_RETURN_INT64(result); } +Datum +pg_stat_get_db_conflict_logicalslot(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + int64 result; + PgStat_StatDBEntry *dbentry; + + if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL) + result = 0; + else + result = (int64) (dbentry->n_conflict_logicalslot); + + PG_RETURN_INT64(result); +} + Datum pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS) { @@ -1539,6 +1554,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS) result = (int64) (dbentry->n_conflict_tablespace + dbentry->n_conflict_lock + dbentry->n_conflict_snapshot + + dbentry->n_conflict_logicalslot + dbentry->n_conflict_bufferpin + dbentry->n_conflict_startup_deadlock); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 4e0c9be58c..eede0dad5b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5399,6 +5399,11 @@ proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's', proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', prosrc => 'pg_stat_get_db_conflict_snapshot' }, +{ oid => '4543', + descr => 'statistics: recovery conflicts in database caused by logical replication slot', + proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's', + proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', + prosrc => 'pg_stat_get_db_conflict_logicalslot' }, { oid => '3068', descr => 'statistics: recovery conflicts in database caused by shared buffer pin', proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 724068cf87..eb3efa3bbe 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -714,6 +714,7 @@ typedef struct PgStat_StatDBEntry PgStat_Counter n_conflict_tablespace; PgStat_Counter n_conflict_lock; PgStat_Counter n_conflict_snapshot; + PgStat_Counter n_conflict_logicalslot; PgStat_Counter n_conflict_bufferpin; PgStat_Counter n_conflict_startup_deadlock; PgStat_Counter n_temp_files; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 53f636c56f..6f65116a64 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -217,4 +217,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); +extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason); + #endif /* SLOT_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 4ae7dc33b8..27035b075b 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -40,6 +40,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_TABLESPACE, PROCSIG_RECOVERY_CONFLICT_LOCK, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 94d33851d0..802482e1cf 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -30,7 +30,7 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, - RelFileNode node); + bool onCatalogTable, RelFileNode node); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6173473de9..fb02e7be52 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1867,6 +1867,7 @@ pg_stat_database_conflicts| SELECT d.oid AS datid, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, + pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_logicalslot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d; -- 2.14.5