Re: Minimal logical decoding on standbys

From: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Jeff Davis <pgsql(at)j-davis(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)2ndquadrant(dot)com>, Ibrar Ahmed <ibrar(dot)ahmad(at)gmail(dot)com>, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>, fabriziomello(at)gmail(dot)com, tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Melanie Plageman <melanieplageman(at)gmail(dot)com>
Subject: Re: Minimal logical decoding on standbys
Date: 2023-03-30 16:23:41
Message-ID: da7184cf-c7b9-c333-801e-0e7507a23ddf@gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 3/30/23 9:04 AM, Andres Freund wrote:
> Hi,
>
> On 2023-03-04 12:19:57 +0100, Drouvot, Bertrand wrote:
>> Subject: [PATCH v52 1/6] Add info in WAL records in preparation for logical
>> slot conflict handling.
>
> This is a very nice commit message.

Thanks! Melanie and Robert did provide great feedback/input to help make it
as it is now.

> I think this commit is ready to go. Unless somebody thinks differently, I
> think I might push it tomorrow.

Great! Once done, I'll submit a new patch so that GlobalVisTestFor() can make
use of the heap relation in vacuumRedirectAndPlaceholder() (which will be possible
once 0001 is committed).

>
>> Subject: [PATCH v52 2/6] Handle logical slot conflicts on standby.
>
>
>> @@ -6807,7 +6808,8 @@ CreateCheckPoint(int flags)
>> */
>> XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
>> KeepLogSeg(recptr, &_logSegNo);
>> - if (InvalidateObsoleteReplicationSlots(_logSegNo))
>> + InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, NULL);
>> + if (invalidated)
>> {
>> /*
>> * Some slots have been invalidated; recalculate the old-segment
>
> I don't really understand why you changed InvalidateObsoleteReplicationSlots
> to return void instead of bool, and then added an output boolean argument via
> a pointer?
>
>

I gave a second thought and it looks like I over complicated that part. I removed the
pointer parameter in V53 attached (and it now returns bool as before).

>
>> @@ -7964,6 +7968,22 @@ xlog_redo(XLogReaderState *record)
>> /* Update our copy of the parameters in pg_control */
>> memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>>
>> + /*
>> + * Invalidate logical slots if we are in hot standby and the primary does not
>> + * have a WAL level sufficient for logical decoding. No need to search
>> + * for potentially conflicting logically slots if standby is running
>> + * with wal_level lower than logical, because in that case, we would
>> + * have either disallowed creation of logical slots or invalidated existing
>> + * ones.
>> + */
>> + if (InRecovery && InHotStandby &&
>> + xlrec.wal_level < WAL_LEVEL_LOGICAL &&
>> + wal_level >= WAL_LEVEL_LOGICAL)
>> + {
>> + TransactionId ConflictHorizon = InvalidTransactionId;
>> + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, NULL, InvalidOid, &ConflictHorizon);
>> + }
>> +
>
> Are there races around changing wal_level?
>

Humm, not that I can think of right now. Do you have one/some in mind?

>
>> @@ -855,8 +855,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
>> SpinLockAcquire(&s->mutex);
>> effective_xmin = s->effective_xmin;
>> effective_catalog_xmin = s->effective_catalog_xmin;
>> - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
>> - XLogRecPtrIsInvalid(s->data.restart_lsn));
>> + invalidated = ((!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
>> + XLogRecPtrIsInvalid(s->data.restart_lsn))
>> + || (!TransactionIdIsValid(s->data.xmin) &&
>> + !TransactionIdIsValid(s->data.catalog_xmin)));
>> SpinLockRelease(&s->mutex);
>>
>> /* invalidated slots need not apply */
>
> I still would like a wrapper function to determine whether a slot has been
> invalidated. This This is too complicated to be repeated in other places.
>
>

Agree, so adding ObsoleteSlotIsInvalid() and SlotIsInvalid() in V53 attached.

ObsoleteSlotIsInvalid() could also be done in a dedicated patch outside this patch series, though.

>> @@ -1224,20 +1226,21 @@ ReplicationSlotReserveWal(void)
>> }
>>
>> /*
>> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
>> - * and mark it invalid, if necessary and possible.
>> + * Helper for InvalidateObsoleteReplicationSlots
>> + *
>> + * Acquires the given slot and mark it invalid, if necessary and possible.
>> *
>> * Returns whether ReplicationSlotControlLock was released in the interim (and
>> * in that case we're not holding the lock at return, otherwise we are).
>> *
>> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
>> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.)
>> *
>> * This is inherently racy, because we release the LWLock
>> * for syscalls, so caller must restart if we return true.
>> */
>> static bool
>> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> - bool *invalidated)
>> +InvalidatePossiblyObsoleteOrConflictingLogicalSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> + bool *invalidated, TransactionId *xid)
>
> This is too long a name. I'd probably just leave it at the old name.
>
>

Done in V53 attached.

>
>> @@ -1261,18 +1267,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> * Check if the slot needs to be invalidated. If it needs to be
>> * invalidated, and is not currently acquired, acquire it and mark it
>> * as having been invalidated. We do this with the spinlock held to
>> - * avoid race conditions -- for example the restart_lsn could move
>> - * forward, or the slot could be dropped.
>> + * avoid race conditions -- for example the restart_lsn (or the
>> + * xmin(s) could) move forward or the slot could be dropped.
>> */
>> SpinLockAcquire(&s->mutex);
>>
>> restart_lsn = s->data.restart_lsn;
>> + slot_xmin = s->data.xmin;
>> + slot_catalog_xmin = s->data.catalog_xmin;
>> +
>> + /* slot has been invalidated (logical decoding conflict case) */
>> + if ((xid &&
>> + ((LogicalReplicationSlotIsInvalid(s))
>> + ||
>>
>
> Uh, huh?
>
> That's very odd formatting.
>
>> /*
>> - * If the slot is already invalid or is fresh enough, we don't need to
>> - * do anything.
>> + * We are not forcing for invalidation because the xid is valid and
>> + * this is a non conflicting slot.
>> */
>> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
>> + (TransactionIdIsValid(*xid) && !(
>> + (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, *xid))
>> + ||
>> + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, *xid))
>> + ))
>> + ))
>> + ||
>> + /* slot has been invalidated (obsolete LSN case) */
>> + (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)))
>> {
>> SpinLockRelease(&s->mutex);
>> if (released_lock)
>
>
> This needs some cleanup.

Added a new macro LogicalReplicationSlotXidsConflict() and reformatted a bit.
Also ran pgindent on it, hope it's cleaner now.

>
>
>> @@ -1292,9 +1313,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> {
>> MyReplicationSlot = s;
>> s->active_pid = MyProcPid;
>> - s->data.invalidated_at = restart_lsn;
>> - s->data.restart_lsn = InvalidXLogRecPtr;
>> -
>> + if (xid)
>> + {
>> + s->data.xmin = InvalidTransactionId;
>> + s->data.catalog_xmin = InvalidTransactionId;
>> + }
>> + else
>> + {
>> + s->data.invalidated_at = restart_lsn;
>> + s->data.restart_lsn = InvalidXLogRecPtr;
>> + }
>> /* Let caller know */
>> *invalidated = true;
>> }
>> @@ -1327,15 +1355,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> */
>> if (last_signaled_pid != active_pid)
>> {
>> - ereport(LOG,
>> - errmsg("terminating process %d to release replication slot \"%s\"",
>> - active_pid, NameStr(slotname)),
>> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> - LSN_FORMAT_ARGS(restart_lsn),
>> - (unsigned long long) (oldestLSN - restart_lsn)),
>> - errhint("You might need to increase max_slot_wal_keep_size."));
>> + if (xid)
>> + {
>> + if (TransactionIdIsValid(*xid))
>> + {
>> + ereport(LOG,
>> + errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery",
>> + active_pid, NameStr(slotname)),
>> + errdetail("The slot conflicted with xid horizon %u.",
>> + *xid));
>> + }
>> + else
>> + {
>> + ereport(LOG,
>> + errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery",
>> + active_pid, NameStr(slotname)),
>> + errdetail("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
>> + }
>> +
>> + (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
>> + }
>> + else
>> + {
>> + ereport(LOG,
>> + errmsg("terminating process %d to release replication slot \"%s\"",
>> + active_pid, NameStr(slotname)),
>> + errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> + LSN_FORMAT_ARGS(restart_lsn),
>> + (unsigned long long) (oldestLSN - restart_lsn)),
>> + errhint("You might need to increase max_slot_wal_keep_size."));
>> +
>> + (void) kill(active_pid, SIGTERM);
>
> I think it ought be possible to deduplicate this a fair bit. For one, two of
> the errmsg()s above are identical. But I think this could be consolidated
> further, e.g. by using the same message style for the three cases, and passing
> in a separately translated reason for the termination?
>

deduplication done in V53 so that there is a single ereport() call.
I'm not sure the translation is fine the way I did it, please advice if that's not right.

>
>> + }
>>
>> - (void) kill(active_pid, SIGTERM);
>> last_signaled_pid = active_pid;
>> }
>>
>> @@ -1369,13 +1421,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> ReplicationSlotSave();
>> ReplicationSlotRelease();
>>
>> - ereport(LOG,
>> - errmsg("invalidating obsolete replication slot \"%s\"",
>> - NameStr(slotname)),
>> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> - LSN_FORMAT_ARGS(restart_lsn),
>> - (unsigned long long) (oldestLSN - restart_lsn)),
>> - errhint("You might need to increase max_slot_wal_keep_size."));
>> + if (xid)
>> + {
>> + pgstat_drop_replslot(s);
>
> Why is this done here now?
>
>

Oops, moved above the if() in V53.

>> + if (TransactionIdIsValid(*xid))
>> + {
>> + ereport(LOG,
>> + errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)),
>> + errdetail("The slot conflicted with xid horizon %u.", *xid));
>> + }
>> + else
>> + {
>> + ereport(LOG,
>> + errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)),
>> + errdetail("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
>> + }
>> + }
>> + else
>> + {
>> + ereport(LOG,
>> + errmsg("invalidating obsolete replication slot \"%s\"",
>> + NameStr(slotname)),
>> + errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> + LSN_FORMAT_ARGS(restart_lsn),
>> + (unsigned long long) (oldestLSN - restart_lsn)),
>> + errhint("You might need to increase max_slot_wal_keep_size."));
>> + }
>>
>
> I don't like all these repeated elogs...

deduplication done in V53 so that there is a single ereport() call.
I'm not sure the translation is fine the way I did it, please advice if that's not right.

>
>
>
>> @@ -3057,6 +3060,27 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
>> case PROCSIG_RECOVERY_CONFLICT_LOCK:
>> case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
>> case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
>> + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
>> +
>> + /*
>> + * For conflicts that require a logical slot to be
>> + * invalidated, the requirement is for the signal receiver to
>> + * release the slot, so that it could be invalidated by the
>> + * signal sender. So for normal backends, the transaction
>> + * should be aborted, just like for other recovery conflicts.
>> + * But if it's walsender on standby, we don't want to go
>> + * through the following IsTransactionOrTransactionBlock()
>> + * check, so break here.
>> + */
>> + if (am_cascading_walsender &&
>> + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
>> + MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
>> + {
>> + RecoveryConflictPending = true;
>> + QueryCancelPending = true;
>> + InterruptPending = true;
>> + break;
>> + }
>>
>> /*
>> * If we aren't in a transaction any longer then ignore.
>
> I can't see any reason for this to be mixed into the same case "body" as LOCK
> etc?
>

Oh right, nice catch. I don't know how it ended up done that way. Fixed in V53.

>
>> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
>> index 38c6f18886..290d4b45f4 100644
>> --- a/src/backend/replication/slot.c
>> +++ b/src/backend/replication/slot.c
>> @@ -51,6 +51,7 @@
>> #include "storage/proc.h"
>> #include "storage/procarray.h"
>> #include "utils/builtins.h"
>> +#include "access/xlogrecovery.h"
>
> Add new includes in the "alphabetically" right place...

Fixed in 0003 in V53 and the other places (aka other sub-patches) where it was needed.

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com

Attachment Content-Type Size
v53-0006-Doc-changes-describing-details-about-logical-dec.patch text/plain 2.2 KB
v53-0005-New-TAP-test-for-logical-decoding-on-standby.patch text/plain 32.9 KB
v53-0004-Fixing-Walsender-corner-case-with-logical-decodi.patch text/plain 7.7 KB
v53-0003-Allow-logical-decoding-on-standby.patch text/plain 11.8 KB
v53-0002-Handle-logical-slot-conflicts-on-standby.patch text/plain 37.2 KB
v53-0001-Add-info-in-WAL-records-in-preparation-for-logic.patch text/plain 76.2 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Masahiko Sawada 2023-03-30 16:31:41 Re: Minimal logical decoding on standbys
Previous Message John Morris 2023-03-30 16:11:08 FW: Add the ability to limit the amount of memory that can be allocated to backends.