Re: Minimal logical decoding on standbys

From: Andres Freund <andres(at)anarazel(dot)de>
To: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>
Cc: Jeff Davis <pgsql(at)j-davis(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)2ndquadrant(dot)com>, Ibrar Ahmed <ibrar(dot)ahmad(at)gmail(dot)com>, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>, fabriziomello(at)gmail(dot)com, tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Minimal logical decoding on standbys
Date: 2023-03-30 07:04:31
Message-ID: 20230330070431.tojeodaeo4zmpofu@awork3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 2023-03-04 12:19:57 +0100, Drouvot, Bertrand wrote:
> Subject: [PATCH v52 1/6] Add info in WAL records in preparation for logical
> slot conflict handling.
>
> Overall design:
>
> 1. We want to enable logical decoding on standbys, but replay of WAL
> from the primary might remove data that is needed by logical decoding,
> causing error(s) on the standby. To prevent those errors, a new replication
> conflict scenario needs to be addressed (as much as hot standby does).
>
> 2. Our chosen strategy for dealing with this type of replication slot
> is to invalidate logical slots for which needed data has been removed.
>
> 3. To do this we need the latestRemovedXid for each change, just as we
> do for physical replication conflicts, but we also need to know
> whether any particular change was to data that logical replication
> might access. That way, during WAL replay, we know when there is a risk of
> conflict and, if so, if there is a conflict.
>
> 4. We can't rely on the standby's relcache entries for this purpose in
> any way, because the startup process can't access catalog contents.
>
> 5. Therefore every WAL record that potentially removes data from the
> index or heap must carry a flag indicating whether or not it is one
> that might be accessed during logical decoding.
>
> Why do we need this for logical decoding on standby?
>
> First, let's forget about logical decoding on standby and recall that
> on a primary database, any catalog rows that may be needed by a logical
> decoding replication slot are not removed.
>
> This is done thanks to the catalog_xmin associated with the logical
> replication slot.
>
> But, with logical decoding on standby, in the following cases:
>
> - hot_standby_feedback is off
> - hot_standby_feedback is on but there is no a physical slot between
> the primary and the standby. Then, hot_standby_feedback will work,
> but only while the connection is alive (for example a node restart
> would break it)
>
> Then, the primary may delete system catalog rows that could be needed
> by the logical decoding on the standby (as it does not know about the
> catalog_xmin on the standby).
>
> So, it’s mandatory to identify those rows and invalidate the slots
> that may need them if any. Identifying those rows is the purpose of
> this commit.

This is a very nice commit message.

> Implementation:
>
> When a WAL replay on standby indicates that a catalog table tuple is
> to be deleted by an xid that is greater than a logical slot's
> catalog_xmin, then that means the slot's catalog_xmin conflicts with
> the xid, and we need to handle the conflict. While subsequent commits
> will do the actual conflict handling, this commit adds a new field
> isCatalogRel in such WAL records (and a new bit set in the
> xl_heap_visible flags field), that is true for catalog tables, so as to
> arrange for conflict handling.
>
> The affected WAL records are the ones that already contain the
> snapshotConflictHorizon field, namely:
>
> - gistxlogDelete
> - gistxlogPageReuse
> - xl_hash_vacuum_one_page
> - xl_heap_prune
> - xl_heap_freeze_page
> - xl_heap_visible
> - xl_btree_reuse_page
> - xl_btree_delete
> - spgxlogVacuumRedirect
>
> Due to this new field being added, xl_hash_vacuum_one_page and
> gistxlogDelete do now contain the offsets to be deleted as a
> FLEXIBLE_ARRAY_MEMBER. This is needed to ensure correct alignement.
> It's not needed on the others struct where isCatalogRel has
> been added.
>
> Author: Andres Freund (in an older version), Amit Khandekar, Bertrand
> Drouvot

I think you're first author on this one by now.

I think this commit is ready to go. Unless somebody thinks differently, I
think I might push it tomorrow.

> Subject: [PATCH v52 2/6] Handle logical slot conflicts on standby.

> @@ -6807,7 +6808,8 @@ CreateCheckPoint(int flags)
> */
> XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
> KeepLogSeg(recptr, &_logSegNo);
> - if (InvalidateObsoleteReplicationSlots(_logSegNo))
> + InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, NULL);
> + if (invalidated)
> {
> /*
> * Some slots have been invalidated; recalculate the old-segment

I don't really understand why you changed InvalidateObsoleteReplicationSlots
to return void instead of bool, and then added an output boolean argument via
a pointer?

> @@ -7964,6 +7968,22 @@ xlog_redo(XLogReaderState *record)
> /* Update our copy of the parameters in pg_control */
> memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>
> + /*
> + * Invalidate logical slots if we are in hot standby and the primary does not
> + * have a WAL level sufficient for logical decoding. No need to search
> + * for potentially conflicting logically slots if standby is running
> + * with wal_level lower than logical, because in that case, we would
> + * have either disallowed creation of logical slots or invalidated existing
> + * ones.
> + */
> + if (InRecovery && InHotStandby &&
> + xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> + wal_level >= WAL_LEVEL_LOGICAL)
> + {
> + TransactionId ConflictHorizon = InvalidTransactionId;
> + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, NULL, InvalidOid, &ConflictHorizon);
> + }
> +

Are there races around changing wal_level?

> @@ -855,8 +855,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
> SpinLockAcquire(&s->mutex);
> effective_xmin = s->effective_xmin;
> effective_catalog_xmin = s->effective_catalog_xmin;
> - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> - XLogRecPtrIsInvalid(s->data.restart_lsn));
> + invalidated = ((!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
> + XLogRecPtrIsInvalid(s->data.restart_lsn))
> + || (!TransactionIdIsValid(s->data.xmin) &&
> + !TransactionIdIsValid(s->data.catalog_xmin)));
> SpinLockRelease(&s->mutex);
>
> /* invalidated slots need not apply */

I still would like a wrapper function to determine whether a slot has been
invalidated. This This is too complicated to be repeated in other places.

> @@ -1224,20 +1226,21 @@ ReplicationSlotReserveWal(void)
> }
>
> /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
> *
> * Returns whether ReplicationSlotControlLock was released in the interim (and
> * in that case we're not holding the lock at return, otherwise we are).
> *
> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.)
> *
> * This is inherently racy, because we release the LWLock
> * for syscalls, so caller must restart if we return true.
> */
> static bool
> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> - bool *invalidated)
> +InvalidatePossiblyObsoleteOrConflictingLogicalSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> + bool *invalidated, TransactionId *xid)

This is too long a name. I'd probably just leave it at the old name.

> @@ -1261,18 +1267,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> * Check if the slot needs to be invalidated. If it needs to be
> * invalidated, and is not currently acquired, acquire it and mark it
> * as having been invalidated. We do this with the spinlock held to
> - * avoid race conditions -- for example the restart_lsn could move
> - * forward, or the slot could be dropped.
> + * avoid race conditions -- for example the restart_lsn (or the
> + * xmin(s) could) move forward or the slot could be dropped.
> */
> SpinLockAcquire(&s->mutex);
>
> restart_lsn = s->data.restart_lsn;
> + slot_xmin = s->data.xmin;
> + slot_catalog_xmin = s->data.catalog_xmin;
> +
> + /* slot has been invalidated (logical decoding conflict case) */
> + if ((xid &&
> + ((LogicalReplicationSlotIsInvalid(s))
> + ||
>

Uh, huh?

That's very odd formatting.

> /*
> - * If the slot is already invalid or is fresh enough, we don't need to
> - * do anything.
> + * We are not forcing for invalidation because the xid is valid and
> + * this is a non conflicting slot.
> */
> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
> + (TransactionIdIsValid(*xid) && !(
> + (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, *xid))
> + ||
> + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, *xid))
> + ))
> + ))
> + ||
> + /* slot has been invalidated (obsolete LSN case) */
> + (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)))
> {
> SpinLockRelease(&s->mutex);
> if (released_lock)

This needs some cleanup.

> @@ -1292,9 +1313,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> {
> MyReplicationSlot = s;
> s->active_pid = MyProcPid;
> - s->data.invalidated_at = restart_lsn;
> - s->data.restart_lsn = InvalidXLogRecPtr;
> -
> + if (xid)
> + {
> + s->data.xmin = InvalidTransactionId;
> + s->data.catalog_xmin = InvalidTransactionId;
> + }
> + else
> + {
> + s->data.invalidated_at = restart_lsn;
> + s->data.restart_lsn = InvalidXLogRecPtr;
> + }
> /* Let caller know */
> *invalidated = true;
> }
> @@ -1327,15 +1355,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> */
> if (last_signaled_pid != active_pid)
> {
> - ereport(LOG,
> - errmsg("terminating process %d to release replication slot \"%s\"",
> - active_pid, NameStr(slotname)),
> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> - LSN_FORMAT_ARGS(restart_lsn),
> - (unsigned long long) (oldestLSN - restart_lsn)),
> - errhint("You might need to increase max_slot_wal_keep_size."));
> + if (xid)
> + {
> + if (TransactionIdIsValid(*xid))
> + {
> + ereport(LOG,
> + errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery",
> + active_pid, NameStr(slotname)),
> + errdetail("The slot conflicted with xid horizon %u.",
> + *xid));
> + }
> + else
> + {
> + ereport(LOG,
> + errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery",
> + active_pid, NameStr(slotname)),
> + errdetail("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
> + }
> +
> + (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
> + }
> + else
> + {
> + ereport(LOG,
> + errmsg("terminating process %d to release replication slot \"%s\"",
> + active_pid, NameStr(slotname)),
> + errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> + LSN_FORMAT_ARGS(restart_lsn),
> + (unsigned long long) (oldestLSN - restart_lsn)),
> + errhint("You might need to increase max_slot_wal_keep_size."));
> +
> + (void) kill(active_pid, SIGTERM);

I think it ought be possible to deduplicate this a fair bit. For one, two of
the errmsg()s above are identical. But I think this could be consolidated
further, e.g. by using the same message style for the three cases, and passing
in a separately translated reason for the termination?

> + }
>
> - (void) kill(active_pid, SIGTERM);
> last_signaled_pid = active_pid;
> }
>
> @@ -1369,13 +1421,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> ReplicationSlotSave();
> ReplicationSlotRelease();
>
> - ereport(LOG,
> - errmsg("invalidating obsolete replication slot \"%s\"",
> - NameStr(slotname)),
> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> - LSN_FORMAT_ARGS(restart_lsn),
> - (unsigned long long) (oldestLSN - restart_lsn)),
> - errhint("You might need to increase max_slot_wal_keep_size."));
> + if (xid)
> + {
> + pgstat_drop_replslot(s);

Why is this done here now?

> + if (TransactionIdIsValid(*xid))
> + {
> + ereport(LOG,
> + errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)),
> + errdetail("The slot conflicted with xid horizon %u.", *xid));
> + }
> + else
> + {
> + ereport(LOG,
> + errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)),
> + errdetail("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
> + }
> + }
> + else
> + {
> + ereport(LOG,
> + errmsg("invalidating obsolete replication slot \"%s\"",
> + NameStr(slotname)),
> + errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> + LSN_FORMAT_ARGS(restart_lsn),
> + (unsigned long long) (oldestLSN - restart_lsn)),
> + errhint("You might need to increase max_slot_wal_keep_size."));
> + }
>

I don't like all these repeated elogs...

> @@ -3057,6 +3060,27 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
> case PROCSIG_RECOVERY_CONFLICT_LOCK:
> case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
> case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
> + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
> +
> + /*
> + * For conflicts that require a logical slot to be
> + * invalidated, the requirement is for the signal receiver to
> + * release the slot, so that it could be invalidated by the
> + * signal sender. So for normal backends, the transaction
> + * should be aborted, just like for other recovery conflicts.
> + * But if it's walsender on standby, we don't want to go
> + * through the following IsTransactionOrTransactionBlock()
> + * check, so break here.
> + */
> + if (am_cascading_walsender &&
> + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
> + MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
> + {
> + RecoveryConflictPending = true;
> + QueryCancelPending = true;
> + InterruptPending = true;
> + break;
> + }
>
> /*
> * If we aren't in a transaction any longer then ignore.

I can't see any reason for this to be mixed into the same case "body" as LOCK
etc?

> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index 38c6f18886..290d4b45f4 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -51,6 +51,7 @@
> #include "storage/proc.h"
> #include "storage/procarray.h"
> #include "utils/builtins.h"
> +#include "access/xlogrecovery.h"

Add new includes in the "alphabetically" right place...

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Julien Rouhaud 2023-03-30 07:57:22 Re: [POC] Allow an extension to add data into Query and PlannedStmt nodes
Previous Message Daniel Gustafsson 2023-03-30 07:02:15 Re: [EXTERNAL] Support load balancing in libpq