Re: Minimal logical decoding on standbys

From: Andres Freund <andres(at)anarazel(dot)de>
To: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, Jeff Davis <pgsql(at)j-davis(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Ibrar Ahmed <ibrar(dot)ahmad(at)gmail(dot)com>, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>, fabriziomello(at)gmail(dot)com, tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Minimal logical decoding on standbys
Date: 2023-04-05 18:28:35
Message-ID: 20230405182835.ehufe4fj2zx3pjix@awork3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 2023-04-05 17:56:14 +0200, Drouvot, Bertrand wrote:

> @@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record)
> /* Update our copy of the parameters in pg_control */
> memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>
> + /*
> + * Invalidate logical slots if we are in hot standby and the primary
> + * does not have a WAL level sufficient for logical decoding. No need
> + * to search for potentially conflicting logically slots if standby is
> + * running with wal_level lower than logical, because in that case, we
> + * would have either disallowed creation of logical slots or
> + * invalidated existing ones.
> + */
> + if (InRecovery && InHotStandby &&
> + xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> + wal_level >= WAL_LEVEL_LOGICAL)
> + {
> + TransactionId ConflictHorizon = InvalidTransactionId;
> +
> + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, InvalidOid, &ConflictHorizon);
> + }

I mentioned this before, but I still don't understand why
InvalidateObsoleteReplicationSlots() accepts ConflictHorizon as a
pointer. It's not even modified, as far as I can see?

> /*
> * Report shared-memory space needed by ReplicationSlotsShmemInit.
> */
> @@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
> SpinLockAcquire(&s->mutex);
> effective_xmin = s->effective_xmin;
> effective_catalog_xmin = s->effective_catalog_xmin;
> - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> - XLogRecPtrIsInvalid(s->data.restart_lsn));
> + invalidated = ObsoleteSlotIsInvalid(s, true) || LogicalReplicationSlotIsInvalid(s);
> SpinLockRelease(&s->mutex);

I don't understand why we need to have two different functions for this.

> /* invalidated slots need not apply */
> @@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void)
> }
> }
>
> +
> +/*
> + * Report terminating or conflicting message.
> + *
> + * For both, logical conflict on standby and obsolete slot are handled.
> + */
> +static void
> +ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid,
> + NameData slotname, TransactionId *xid,
> + XLogRecPtr restart_lsn, XLogRecPtr oldestLSN)
> +{
> + StringInfoData err_msg;
> + StringInfoData err_detail;
> + bool hint = false;
> +
> + initStringInfo(&err_detail);
> +
> + if (check_on_xid)
> + {
> + if (!terminating)
> + {
> + initStringInfo(&err_msg);
> + appendStringInfo(&err_msg, _("invalidating replication slot \"%s\" because it conflicts with recovery"),
> + NameStr(slotname));

I still don't think the main error message should differ between invalidating
a slot due recovery and max_slot_wal_keep_size.

> +
> /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
> *
> * Returns whether ReplicationSlotControlLock was released in the interim (and
> * in that case we're not holding the lock at return, otherwise we are).
> *
> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.)

What's the point of making this specific to "obsolete slots"?

> * This is inherently racy, because we release the LWLock
> * for syscalls, so caller must restart if we return true.
> */
> static bool
> InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> - bool *invalidated)
> + bool *invalidated, TransactionId *xid)
> {
> int last_signaled_pid = 0;
> bool released_lock = false;
> + bool check_on_xid;
> +
> + check_on_xid = xid ? true : false;
>
> for (;;)
> {
> XLogRecPtr restart_lsn;
> +
> NameData slotname;
> int active_pid = 0;
>
> @@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> * Check if the slot needs to be invalidated. If it needs to be
> * invalidated, and is not currently acquired, acquire it and mark it
> * as having been invalidated. We do this with the spinlock held to
> - * avoid race conditions -- for example the restart_lsn could move
> - * forward, or the slot could be dropped.
> + * avoid race conditions -- for example the restart_lsn (or the
> + * xmin(s) could) move forward or the slot could be dropped.
> */
> SpinLockAcquire(&s->mutex);
>
> restart_lsn = s->data.restart_lsn;
>
> /*
> - * If the slot is already invalid or is fresh enough, we don't need to
> - * do anything.
> + * If the slot is already invalid or is a non conflicting slot, we
> + * don't need to do anything.
> */
> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
> + if (DoNotInvalidateSlot(s, xid, &oldestLSN))

DoNotInvalidateSlot() seems odd to me, and makes the code harder to
understand. I'd make it something like:

if (!SlotIsInvalid(s) && (
LogicalSlotConflictsWith(s, xid) ||
SlotConflictsWithLSN(s, lsn)))

> /*
> - * Mark any slot that points to an LSN older than the given segment
> - * as invalid; it requires WAL that's about to be removed.
> + * Invalidate Obsolete slots or resolve recovery conflicts with logical slots.

I don't like that this spreads "obsolete slots" around further - it's very
unspecific. A logical slot that needs to be removed due to an xid conflict is
just as obsolete as one that needs to be removed due to max_slot_wal_keep_size.

I'd rephrase this to be about required resources getting removed or such, one
case of that is WAL another case is xids.

> restart:
> LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> @@ -1414,21 +1505,35 @@ restart:
> if (!s->in_use)
> continue;
>
> - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
> + if (xid)
> {
> - /* if the lock was released, start from scratch */
> - goto restart;
> + /* we are only dealing with *logical* slot conflicts */
> + if (!SlotIsLogical(s))
> + continue;
> +
> + /*
> + * not the database of interest and we don't want all the
> + * database, skip
> + */
> + if (s->data.database != dboid && TransactionIdIsValid(*xid))
> + continue;

ISTM that this should be in InvalidatePossiblyObsoleteSlot().

> /*
> - * If any slots have been invalidated, recalculate the resource limits.
> + * If any slots have been invalidated, recalculate the required xmin and
> + * the required lsn (if appropriate).
> */
> if (invalidated)
> {
> ReplicationSlotsComputeRequiredXmin(false);
> - ReplicationSlotsComputeRequiredLSN();
> + if (!xid)
> + ReplicationSlotsComputeRequiredLSN();
> }

Why make this conditional? If we invalidated a logical slot, we also don't
require as much WAL anymore, no?

> @@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
> PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
> WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
> true);
> +
> + if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
> + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, locator.dbOid, &snapshotConflictHorizon);
> }

Hm. Is there a reason for doing this before resolving conflicts with existing
sessions?

Another issue: ResolveRecoveryConflictWithVirtualXIDs() takes
WaitExceedsMaxStandbyDelay() into account, but
InvalidateObsoleteReplicationSlots() does not. I think that's ok, because the
setup should prevent this case from being reached in normal paths, but at
least there should be a comment documenting this.

> +static inline bool
> +LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid)
> +{
> + TransactionId slot_effective_xmin;
> + TransactionId slot_catalog_xmin;
> +
> + slot_effective_xmin = s->effective_xmin;
> + slot_catalog_xmin = s->data.catalog_xmin;
> +
> + return (((TransactionIdIsValid(slot_effective_xmin) && TransactionIdPrecedesOrEquals(slot_effective_xmin, xid)) ||
> + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))));
> +}

return -ETOOMANYPARENS

> +static inline bool
> +SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN)
> +{
> + return (s->data.restart_lsn >= oldestLSN);
> +}
> +
> +static inline bool
> +LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid)
> +{
> + return (TransactionIdIsValid(*xid) && !LogicalReplicationSlotXidsConflict(s, *xid));
> +}
> +
> +static inline bool
> +DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr *oldestLSN)
> +{
> + if (xid)
> + return (LogicalReplicationSlotIsInvalid(s) || LogicalSlotIsNotConflicting(s, xid));
> + else
> + return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, *oldestLSN));
> +
> +}

See above for some more comments. But please don't accept stuff via pointer if
you don't have a reason for it. There's no reason for it for xid and oldestLSN
afaict.

> diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
> index dbe9394762..186e4ef600 100644
> --- a/src/backend/access/transam/xlogrecovery.c
> +++ b/src/backend/access/transam/xlogrecovery.c
> @@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
> XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
> SpinLockRelease(&XLogRecoveryCtl->info_lck);
>
> + /*
> + * Wakeup walsenders:
> + *
> + * On the standby, the WAL is flushed first (which will only wake up
> + * physical walsenders) and then applied, which will only wake up logical
> + * walsenders.
> + * Indeed, logical walsenders on standby can't decode and send data until
> + * it's been applied.
> + *
> + * Physical walsenders don't need to be waked up during replay unless

s/waked/woken/

> + * cascading replication is allowed and time line change occured (so that
> + * they can notice that they are on a new time line).
> + *
> + * That's why the wake up conditions are for:
> + *
> + * - physical walsenders in case of new time line and cascade
> + * replication is allowed.
> + * - logical walsenders in case of new time line or recovery is in progress
> + * (logical decoding on standby).
> + */
> + WalSndWakeup(switchedTLI && AllowCascadeReplication(),
> + switchedTLI || RecoveryInProgress());

I don't think it's possible to get here without RecoveryInProgress() being
true. So we don't need that condition.

> @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
> /* Signal the startup process and walsender that new WAL has arrived */
> WakeupRecovery();
> if (AllowCascadeReplication())
> - WalSndWakeup();
> + WalSndWakeup(true, !RecoveryInProgress());

Same comment as earlier.

> /* Report XLOG streaming progress in PS display */
> if (update_process_title)
> diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
> index 2d908d1de2..5c68ebb79e 100644
> --- a/src/backend/replication/walsender.c
> +++ b/src/backend/replication/walsender.c
> @@ -2628,6 +2628,23 @@ InitWalSenderSlot(void)
> walsnd->sync_standby_priority = 0;
> walsnd->latch = &MyProc->procLatch;
> walsnd->replyTime = 0;
> +
> + /*
> + * The kind assignment is done here and not in StartReplication()
> + * and StartLogicalReplication(). Indeed, the logical walsender
> + * needs to read WAL records (like snapshot of running
> + * transactions) during the slot creation. So it needs to be woken
> + * up based on its kind.
> + *
> + * The kind assignment could also be done in StartReplication(),
> + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
> + * seems better to set it on one place.
> + */

Doesn't that mean we'll wake up logical walsenders even if they're doing
normal query processing?

> + if (MyDatabaseId == InvalidOid)
> + walsnd->kind = REPLICATION_KIND_PHYSICAL;
> + else
> + walsnd->kind = REPLICATION_KIND_LOGICAL;
> +
> SpinLockRelease(&walsnd->mutex);
> /* don't need the lock anymore */
> MyWalSnd = (WalSnd *) walsnd;
> @@ -3310,30 +3327,39 @@ WalSndShmemInit(void)
> }
>
> /*
> - * Wake up all walsenders
> + * Wake up physical, logical or both walsenders kind
> + *
> + * The distinction between physical and logical walsenders is done, because:
> + * - physical walsenders can't send data until it's been flushed
> + * - logical walsenders on standby can't decode and send data until it's been
> + * applied
> + *
> + * For cascading replication we need to wake up physical
> + * walsenders separately from logical walsenders (see the comment before calling
> + * WalSndWakeup() in ApplyWalRecord() for more details).
> *
> * This will be called inside critical sections, so throwing an error is not
> * advisable.
> */
> void
> -WalSndWakeup(void)
> +WalSndWakeup(bool physical, bool logical)
> {
> int i;
>
> for (i = 0; i < max_wal_senders; i++)
> {
> Latch *latch;
> + ReplicationKind kind;
> WalSnd *walsnd = &WalSndCtl->walsnds[i];
>
> - /*
> - * Get latch pointer with spinlock held, for the unlikely case that
> - * pointer reads aren't atomic (as they're 8 bytes).
> - */
> + /* get latch pointer and kind with spinlock helds */
> SpinLockAcquire(&walsnd->mutex);
> latch = walsnd->latch;
> + kind = walsnd->kind;
> SpinLockRelease(&walsnd->mutex);
>
> - if (latch != NULL)
> + if (latch != NULL && ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
> + (logical && kind == REPLICATION_KIND_LOGICAL)))
> SetLatch(latch);
> }
> }

I'd consider rewriting this to something like:

if (latch == NULL)
continue;

if ((physical && kind == REPLICATION_KIND_PHYSICAL)) ||
(logical && kind == REPLICATION_KIND_LOGICAL)
SetLatch(latch)

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2023-04-05 18:33:33 Re: on placeholder entries in view rule action query's range table
Previous Message Pavel Stehule 2023-04-05 17:58:02 Re: Schema variables - new implementation for Postgres 15