Re: Minimal logical decoding on standbys

From: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, Jeff Davis <pgsql(at)j-davis(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Ibrar Ahmed <ibrar(dot)ahmad(at)gmail(dot)com>, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>, fabriziomello(at)gmail(dot)com, tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Minimal logical decoding on standbys
Date: 2023-04-06 13:00:57
Message-ID: b51f01b6-7ff4-fc5d-8068-6d94f9558235@gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 4/5/23 8:28 PM, Andres Freund wrote:
> Hi,
>
> On 2023-04-05 17:56:14 +0200, Drouvot, Bertrand wrote:
>
>> @@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record)
>> /* Update our copy of the parameters in pg_control */
>> memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>>
>> + /*
>> + * Invalidate logical slots if we are in hot standby and the primary
>> + * does not have a WAL level sufficient for logical decoding. No need
>> + * to search for potentially conflicting logically slots if standby is
>> + * running with wal_level lower than logical, because in that case, we
>> + * would have either disallowed creation of logical slots or
>> + * invalidated existing ones.
>> + */
>> + if (InRecovery && InHotStandby &&
>> + xlrec.wal_level < WAL_LEVEL_LOGICAL &&
>> + wal_level >= WAL_LEVEL_LOGICAL)
>> + {
>> + TransactionId ConflictHorizon = InvalidTransactionId;
>> +
>> + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, InvalidOid, &ConflictHorizon);
>> + }
>
> I mentioned this before,

Sorry, I probably missed it.

> but I still don't understand why
> InvalidateObsoleteReplicationSlots() accepts ConflictHorizon as a
> pointer. It's not even modified, as far as I can see?
>

The initial goal was to be able to check if
xid pointer was NULL and also if *xid was a valid xid or not. So basically being able to
do 3 checks with the same parameter.

That's how we decided wether or not we are in the wal_level < logical on primary conflict case in ReportTerminationInvalidation().

I agree that passing a pointer is not the best approach (as there is a "risk" of modifying the value it points to),
so adding an extra bool to InvalidateObsoleteReplicationSlots() in attached V62 instead.

Also replacing the InvalidXLogRecPtr by 0 as it does sound odd to use "InvalidXLogRecPtr"
naming for a XLogSegNo.

>
>> /*
>> * Report shared-memory space needed by ReplicationSlotsShmemInit.
>> */
>> @@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
>> SpinLockAcquire(&s->mutex);
>> effective_xmin = s->effective_xmin;
>> effective_catalog_xmin = s->effective_catalog_xmin;
>> - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
>> - XLogRecPtrIsInvalid(s->data.restart_lsn));
>> + invalidated = ObsoleteSlotIsInvalid(s, true) || LogicalReplicationSlotIsInvalid(s);
>> SpinLockRelease(&s->mutex);
>
> I don't understand why we need to have two different functions for this.
>

LogicalReplicationSlotIsInvalid() has been created to provide a different error message
than in ".....because it exceeded the maximum reserved size" in StartLogicalReplication()
and "This slot has never previously reserved WAL" in pg_logical_slot_get_changes_guts().

So basically to distinguish with the max_slot_wal_keep_size related messages.

>
>> /* invalidated slots need not apply */
>> @@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void)
>> }
>> }
>>
>> +
>> +/*
>> + * Report terminating or conflicting message.
>> + *
>> + * For both, logical conflict on standby and obsolete slot are handled.
>> + */
>> +static void
>> +ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid,
>> + NameData slotname, TransactionId *xid,
>> + XLogRecPtr restart_lsn, XLogRecPtr oldestLSN)
>> +{
>> + StringInfoData err_msg;
>> + StringInfoData err_detail;
>> + bool hint = false;
>> +
>> + initStringInfo(&err_detail);
>> +
>> + if (check_on_xid)
>> + {
>> + if (!terminating)
>> + {
>> + initStringInfo(&err_msg);
>> + appendStringInfo(&err_msg, _("invalidating replication slot \"%s\" because it conflicts with recovery"),
>> + NameStr(slotname));
>
> I still don't think the main error message should differ between invalidating
> a slot due recovery and max_slot_wal_keep_size.

Okay. I gave a second thought and I agree that "obsolete" does also make
sense for the xid conflict case. So, done that way in V62.

>
>> +
>> /*
>> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
>> - * and mark it invalid, if necessary and possible.
>> + * Helper for InvalidateObsoleteReplicationSlots
>> + *
>> + * Acquires the given slot and mark it invalid, if necessary and possible.
>> *
>> * Returns whether ReplicationSlotControlLock was released in the interim (and
>> * in that case we're not holding the lock at return, otherwise we are).
>> *
>> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
>> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.)
>
> What's the point of making this specific to "obsolete slots"?

There is no. Should be coming from a previous version/experiment.
Removed in V62, thanks!

>
>
>> * This is inherently racy, because we release the LWLock
>> * for syscalls, so caller must restart if we return true.
>> */
>> static bool
>> InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> - bool *invalidated)
>> + bool *invalidated, TransactionId *xid)
>> {
>> int last_signaled_pid = 0;
>> bool released_lock = false;
>> + bool check_on_xid;
>> +
>> + check_on_xid = xid ? true : false;
>>
>> for (;;)
>> {
>> XLogRecPtr restart_lsn;
>> +
>> NameData slotname;
>> int active_pid = 0;
>>
>> @@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> * Check if the slot needs to be invalidated. If it needs to be
>> * invalidated, and is not currently acquired, acquire it and mark it
>> * as having been invalidated. We do this with the spinlock held to
>> - * avoid race conditions -- for example the restart_lsn could move
>> - * forward, or the slot could be dropped.
>> + * avoid race conditions -- for example the restart_lsn (or the
>> + * xmin(s) could) move forward or the slot could be dropped.
>> */
>> SpinLockAcquire(&s->mutex);
>>
>> restart_lsn = s->data.restart_lsn;
>>
>> /*
>> - * If the slot is already invalid or is fresh enough, we don't need to
>> - * do anything.
>> + * If the slot is already invalid or is a non conflicting slot, we
>> + * don't need to do anything.
>> */
>> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
>> + if (DoNotInvalidateSlot(s, xid, &oldestLSN))
>
> DoNotInvalidateSlot() seems odd to me, and makes the code harder to
> understand. I'd make it something like:
>
> if (!SlotIsInvalid(s) && (
> LogicalSlotConflictsWith(s, xid) ||
> SlotConflictsWithLSN(s, lsn)))
>

I think that's a matter of taste (having a single function was suggested
by Amit up-thread).

I think I prefer having one single function as it seems to me easier to
understand if we want to check on xid or not.

>
>> /*
>> - * Mark any slot that points to an LSN older than the given segment
>> - * as invalid; it requires WAL that's about to be removed.
>> + * Invalidate Obsolete slots or resolve recovery conflicts with logical slots.
>
> I don't like that this spreads "obsolete slots" around further - it's very
> unspecific. A logical slot that needs to be removed due to an xid conflict is
> just as obsolete as one that needs to be removed due to max_slot_wal_keep_size.
>
> I'd rephrase this to be about required resources getting removed or such, one
> case of that is WAL another case is xids.
>

Agree. Re-worded in V62.

>> restart:
>> LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
>> @@ -1414,21 +1505,35 @@ restart:
>> if (!s->in_use)
>> continue;
>>
>> - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
>> + if (xid)
>> {
>> - /* if the lock was released, start from scratch */
>> - goto restart;
>> + /* we are only dealing with *logical* slot conflicts */
>> + if (!SlotIsLogical(s))
>> + continue;
>> +
>> + /*
>> + * not the database of interest and we don't want all the
>> + * database, skip
>> + */
>> + if (s->data.database != dboid && TransactionIdIsValid(*xid))
>> + continue;
>
> ISTM that this should be in InvalidatePossiblyObsoleteSlot().
>

Agree, done in V62.

>
>> /*
>> - * If any slots have been invalidated, recalculate the resource limits.
>> + * If any slots have been invalidated, recalculate the required xmin and
>> + * the required lsn (if appropriate).
>> */
>> if (invalidated)
>> {
>> ReplicationSlotsComputeRequiredXmin(false);
>> - ReplicationSlotsComputeRequiredLSN();
>> + if (!xid)
>> + ReplicationSlotsComputeRequiredLSN();
>> }
>
> Why make this conditional? If we invalidated a logical slot, we also don't
> require as much WAL anymore, no?
>

Agree, done in V62.

>
>> @@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
>> PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
>> WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
>> true);
>> +
>> + if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
>> + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, locator.dbOid, &snapshotConflictHorizon);
>> }
>
> Hm. Is there a reason for doing this before resolving conflicts with existing
> sessions?
>

Do you mean, you'd prefer to InvalidateObsoleteReplicationSlots() before ResolveRecoveryConflictWithVirtualXIDs()?

>
> Another issue: ResolveRecoveryConflictWithVirtualXIDs() takes
> WaitExceedsMaxStandbyDelay() into account, but
> InvalidateObsoleteReplicationSlots() does not.

humm, good point.

> I think that's ok, because the
> setup should prevent this case from being reached in normal paths, but at
> least there should be a comment documenting this.
>

I started to add the comment InvalidateObsoleteReplicationSlots() but I'm not
sure what you mean by "the setup should prevent this case from being reached in normal paths"
(so I let "XXXX" in the comment for now).

Did you mean hsf and a physical slot between the primary and the standby should be in place?
Could you please elaborate?

>
>
>> +static inline bool
>> +LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid)
>> +{
>> + TransactionId slot_effective_xmin;
>> + TransactionId slot_catalog_xmin;
>> +
>> + slot_effective_xmin = s->effective_xmin;
>> + slot_catalog_xmin = s->data.catalog_xmin;
>> +
>> + return (((TransactionIdIsValid(slot_effective_xmin) && TransactionIdPrecedesOrEquals(slot_effective_xmin, xid)) ||
>> + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))));
>> +}
>
> return -ETOOMANYPARENS
>

gave it a try to make it better in V62.

>
>> +static inline bool
>> +SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN)
>> +{
>> + return (s->data.restart_lsn >= oldestLSN);
>> +}
>> +
>> +static inline bool
>> +LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid)
>> +{
>> + return (TransactionIdIsValid(*xid) && !LogicalReplicationSlotXidsConflict(s, *xid));
>> +}
>> +
>> +static inline bool
>> +DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr *oldestLSN)
>> +{
>> + if (xid)
>> + return (LogicalReplicationSlotIsInvalid(s) || LogicalSlotIsNotConflicting(s, xid));
>> + else
>> + return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, *oldestLSN));
>> +
>> +}
>
> See above for some more comments. But please don't accept stuff via pointer if
> you don't have a reason for it. There's no reason for it for xid and oldestLSN
> afaict.

Agree that there is no reason for oldestLSN. Changing in V62.
As far the xid, I explained why I used a pointer above but find a way to remove the need
in V62 (as explained above).

>
>
>> diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
>> index dbe9394762..186e4ef600 100644
>> --- a/src/backend/access/transam/xlogrecovery.c
>> +++ b/src/backend/access/transam/xlogrecovery.c
>> @@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
>> XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
>> SpinLockRelease(&XLogRecoveryCtl->info_lck);
>>
>> + /*
>> + * Wakeup walsenders:
>> + *
>> + * On the standby, the WAL is flushed first (which will only wake up
>> + * physical walsenders) and then applied, which will only wake up logical
>> + * walsenders.
>> + * Indeed, logical walsenders on standby can't decode and send data until
>> + * it's been applied.
>> + *
>> + * Physical walsenders don't need to be waked up during replay unless
>
> s/waked/woken/

Thans, fixed.

>> + * cascading replication is allowed and time line change occured (so that
>> + * they can notice that they are on a new time line).
>> + *
>> + * That's why the wake up conditions are for:
>> + *
>> + * - physical walsenders in case of new time line and cascade
>> + * replication is allowed.
>> + * - logical walsenders in case of new time line or recovery is in progress
>> + * (logical decoding on standby).
>> + */
>> + WalSndWakeup(switchedTLI && AllowCascadeReplication(),
>> + switchedTLI || RecoveryInProgress());
>
> I don't think it's possible to get here without RecoveryInProgress() being
> true. So we don't need that condition.

Right, so using "true" instead as we don't want to rely only on a time line change
for a logical walsender.

>
>
>> @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
>> /* Signal the startup process and walsender that new WAL has arrived */
>> WakeupRecovery();
>> if (AllowCascadeReplication())
>> - WalSndWakeup();
>> + WalSndWakeup(true, !RecoveryInProgress());
>
> Same comment as earlier.

done.

>
>
>> /* Report XLOG streaming progress in PS display */
>> if (update_process_title)
>> diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
>> index 2d908d1de2..5c68ebb79e 100644
>> --- a/src/backend/replication/walsender.c
>> +++ b/src/backend/replication/walsender.c
>> @@ -2628,6 +2628,23 @@ InitWalSenderSlot(void)
>> walsnd->sync_standby_priority = 0;
>> walsnd->latch = &MyProc->procLatch;
>> walsnd->replyTime = 0;
>> +
>> + /*
>> + * The kind assignment is done here and not in StartReplication()
>> + * and StartLogicalReplication(). Indeed, the logical walsender
>> + * needs to read WAL records (like snapshot of running
>> + * transactions) during the slot creation. So it needs to be woken
>> + * up based on its kind.
>> + *
>> + * The kind assignment could also be done in StartReplication(),
>> + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
>> + * seems better to set it on one place.
>> + */
>
> Doesn't that mean we'll wake up logical walsenders even if they're doing
> normal query processing?
>

I'm not following what you mean here.

>
>> + if (MyDatabaseId == InvalidOid)
>> + walsnd->kind = REPLICATION_KIND_PHYSICAL;
>> + else
>> + walsnd->kind = REPLICATION_KIND_LOGICAL;
>> +
>> SpinLockRelease(&walsnd->mutex);
>> /* don't need the lock anymore */
>> MyWalSnd = (WalSnd *) walsnd;
>> @@ -3310,30 +3327,39 @@ WalSndShmemInit(void)
>> }
>>
>> /*
>> - * Wake up all walsenders
>> + * Wake up physical, logical or both walsenders kind
>> + *
>> + * The distinction between physical and logical walsenders is done, because:
>> + * - physical walsenders can't send data until it's been flushed
>> + * - logical walsenders on standby can't decode and send data until it's been
>> + * applied
>> + *
>> + * For cascading replication we need to wake up physical
>> + * walsenders separately from logical walsenders (see the comment before calling
>> + * WalSndWakeup() in ApplyWalRecord() for more details).
>> *
>> * This will be called inside critical sections, so throwing an error is not
>> * advisable.
>> */
>> void
>> -WalSndWakeup(void)
>> +WalSndWakeup(bool physical, bool logical)
>> {
>> int i;
>>
>> for (i = 0; i < max_wal_senders; i++)
>> {
>> Latch *latch;
>> + ReplicationKind kind;
>> WalSnd *walsnd = &WalSndCtl->walsnds[i];
>>
>> - /*
>> - * Get latch pointer with spinlock held, for the unlikely case that
>> - * pointer reads aren't atomic (as they're 8 bytes).
>> - */
>> + /* get latch pointer and kind with spinlock helds */
>> SpinLockAcquire(&walsnd->mutex);
>> latch = walsnd->latch;
>> + kind = walsnd->kind;
>> SpinLockRelease(&walsnd->mutex);
>>
>> - if (latch != NULL)
>> + if (latch != NULL && ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
>> + (logical && kind == REPLICATION_KIND_LOGICAL)))
>> SetLatch(latch);
>> }
>> }
>
> I'd consider rewriting this to something like:
>
> if (latch == NULL)
> continue;
>
> if ((physical && kind == REPLICATION_KIND_PHYSICAL)) ||
> (logical && kind == REPLICATION_KIND_LOGICAL)
> SetLatch(latch)
>

Yeah better, done.

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com

Attachment Content-Type Size
v62-0006-Doc-changes-describing-details-about-logical-dec.patch text/plain 2.7 KB
v62-0005-New-TAP-test-for-logical-decoding-on-standby.patch text/plain 32.7 KB
v62-0004-For-cascading-replication-wake-up-physical-walse.patch text/plain 9.6 KB
v62-0003-Allow-logical-decoding-on-standby.patch text/plain 12.3 KB
v62-0002-Arrange-for-a-new-pg_stat_database_conflicts-and.patch text/plain 10.4 KB
v62-0001-Handle-logical-slot-conflicts-on-standby.patch text/plain 27.5 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Drouvot, Bertrand 2023-04-06 13:01:30 Re: Minimal logical decoding on standbys
Previous Message Daniel Gustafsson 2023-04-06 12:29:34 Re: Should vacuum process config file reload more often