Re: Minimal logical decoding on standbys

From: Melanie Plageman <melanieplageman(at)gmail(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, Jeff Davis <pgsql(at)j-davis(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Ibrar Ahmed <ibrar(dot)ahmad(at)gmail(dot)com>, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>, fabriziomello(at)gmail(dot)com, tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Minimal logical decoding on standbys
Date: 2023-04-07 22:32:04
Message-ID: 20230407223204.vj5t6ligooo6xa6e@liskov
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Code review only of 0001-0005.

I noticed you had two 0008, btw.

On Fri, Apr 07, 2023 at 11:12:26AM -0700, Andres Freund wrote:
> Hi,
>
> On 2023-04-07 08:47:57 -0700, Andres Freund wrote:
> > Integrated all of these.
>
> From 0e038eb5dfddec500fbf4625775d1fa508a208f6 Mon Sep 17 00:00:00 2001
> From: Andres Freund <andres(at)anarazel(dot)de>
> Date: Thu, 6 Apr 2023 20:00:07 -0700
> Subject: [PATCH va67 1/9] Replace a replication slot's invalidated_at LSN with
> an enum
>
> diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
> index 8872c80cdfe..ebcb637baed 100644
> --- a/src/include/replication/slot.h
> +++ b/src/include/replication/slot.h
> @@ -37,6 +37,17 @@ typedef enum ReplicationSlotPersistency
> RS_TEMPORARY
> } ReplicationSlotPersistency;
>
> +/*
> + * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
> + * 'invalidated' field is set to a value other than _NONE.
> + */
> +typedef enum ReplicationSlotInvalidationCause
> +{
> + RS_INVAL_NONE,
> + /* required WAL has been removed */

I just wonder if RS_INVAL_WAL is too generic. Something like
RS_INVAL_WAL_MISSING or similar may be better since it seems there are
other inavlidation causes that may be related to WAL.

> + RS_INVAL_WAL,
> +} ReplicationSlotInvalidationCause;
> +

0002 LGTM

> From 52c25cc15abc4470d19e305d245b9362e6b8d6a3 Mon Sep 17 00:00:00 2001
> From: Andres Freund <andres(at)anarazel(dot)de>
> Date: Fri, 7 Apr 2023 09:32:48 -0700
> Subject: [PATCH va67 3/9] Support invalidating replication slots due to
> horizon and wal_level
> MIME-Version: 1.0
> Content-Type: text/plain; charset=UTF-8
> Content-Transfer-Encoding: 8bit
>
> Needed for supporting logical decoding on a standby. The new invalidation
> methods will be used in a subsequent commit.
>

You probably are aware, but applying 0003 and 0004 both gives me two
warnings:

warning: 1 line adds whitespace errors.
Warning: commit message did not conform to UTF-8.
You may want to amend it after fixing the message, or set the config
variable i18n.commitEncoding to the encoding your project uses.

> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index df23b7ed31e..c2a9accebf6 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
> }
>
> /*
> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
> - * and mark it invalid, if necessary and possible.
> + * Report that replication slot needs to be invalidated
> + */
> +static void
> +ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
> + bool terminating,
> + int pid,
> + NameData slotname,
> + XLogRecPtr restart_lsn,
> + XLogRecPtr oldestLSN,
> + TransactionId snapshotConflictHorizon)
> +{
> + StringInfoData err_detail;
> + bool hint = false;
> +
> + initStringInfo(&err_detail);
> +
> + switch (cause)
> + {
> + case RS_INVAL_WAL:
> + hint = true;
> + appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
> + LSN_FORMAT_ARGS(restart_lsn),

I'm not sure what the below cast is meant to do. If you are trying to
protect against overflow/underflow, I think you'd need to cast before
doing the subtraction.

> + (unsigned long long) (oldestLSN - restart_lsn));
> + break;
> + case RS_INVAL_HORIZON:
> + appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
> + snapshotConflictHorizon);
> + break;
> +
> + case RS_INVAL_WAL_LEVEL:
> + appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
> + break;
> + case RS_INVAL_NONE:
> + pg_unreachable();
> + }

This ereport is quite hard to read. Is there any simplification you can
do of the ternaries without undue duplication?

> + ereport(LOG,
> + terminating ?
> + errmsg("terminating process %d to release replication slot \"%s\"",
> + pid, NameStr(slotname)) :
> + errmsg("invalidating obsolete replication slot \"%s\"",
> + NameStr(slotname)),
> + errdetail_internal("%s", err_detail.data),
> + hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
> +
> + pfree(err_detail.data);
> +}
> +
> +/*
> + * Helper for InvalidateObsoleteReplicationSlots
> + *
> + * Acquires the given slot and mark it invalid, if necessary and possible.
> *
> * Returns whether ReplicationSlotControlLock was released in the interim (and
> * in that case we're not holding the lock at return, otherwise we are).
> @@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
> * for syscalls, so caller must restart if we return true.
> */
> static bool
> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> +InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
> + ReplicationSlot *s,
> + XLogRecPtr oldestLSN,
> + Oid dboid, TransactionId snapshotConflictHorizon,
> bool *invalidated)
> {
> int last_signaled_pid = 0;
> @@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> XLogRecPtr restart_lsn;
> NameData slotname;
> int active_pid = 0;
> + ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
>
> Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
>
> @@ -1286,10 +1340,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> restart_lsn = s->data.restart_lsn;
>
> /*
> - * If the slot is already invalid or is fresh enough, we don't need to
> - * do anything.
> + * If the slot is already invalid or is a non conflicting slot, we
> + * don't need to do anything.
> */
> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
> + if (s->data.invalidated == RS_INVAL_NONE)
> + {
> + switch (cause)
> + {
> + case RS_INVAL_WAL:
> + if (s->data.restart_lsn != InvalidXLogRecPtr &&
> + s->data.restart_lsn < oldestLSN)
> + conflict = cause;
> + break;

Should the below be an error? a physical slot with RS_INVAL_HORIZON
invalidation cause?

> + case RS_INVAL_HORIZON:
> + if (!SlotIsLogical(s))
> + break;
> + /* invalid DB oid signals a shared relation */
> + if (dboid != InvalidOid && dboid != s->data.database)
> + break;
> + if (TransactionIdIsValid(s->effective_xmin) &&
> + TransactionIdPrecedesOrEquals(s->effective_xmin,
> + snapshotConflictHorizon))
> + conflict = cause;
> + else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
> + TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
> + snapshotConflictHorizon))
> + conflict = cause;
> + break;
> + case RS_INVAL_WAL_LEVEL:
> + if (SlotIsLogical(s))
> + conflict = cause;
> + break;

All three of default, pg_unreachable(), and break seems a bit like
overkill. Perhaps remove the break?

> + default:
> + pg_unreachable();
> + break;
> + }
> + }
> +

> @@ -1390,14 +1476,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> ReplicationSlotMarkDirty();
> ReplicationSlotSave();
> ReplicationSlotRelease();
> + pgstat_drop_replslot(s);
>
> - ereport(LOG,
> - errmsg("invalidating obsolete replication slot \"%s\"",
> - NameStr(slotname)),
> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> - LSN_FORMAT_ARGS(restart_lsn),
> - (unsigned long long) (oldestLSN - restart_lsn)),
> - errhint("You might need to increase max_slot_wal_keep_size."));
> + ReportSlotInvalidation(conflict, false, active_pid,
> + slotname, restart_lsn,
> + oldestLSN, snapshotConflictHorizon);
>
> /* done with this slot for now */
> break;
> @@ -1410,19 +1493,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> }
>
> /*
> - * Mark any slot that points to an LSN older than the given segment
> - * as invalid; it requires WAL that's about to be removed.
> + * Invalidate slots that require resources about to be removed.
> *
> * Returns true when any slot have got invalidated.
> *
> + * Whether a slot needs to be invalidated depends on the cause. A slot is
> + * removed if it:
> + * - RS_INVAL_WAL: requires a LSN older than the given segment
> + * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon, in the given db
> + dboid may be InvalidOid for shared relations

the comma above reduces readability

is this what you mean?

RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
db; dboid may be InvalidOid for shared relations

> From 311a1d8f9c2d1acf0c22e091d53f7a533073c8b7 Mon Sep 17 00:00:00 2001
> From: Andres Freund <andres(at)anarazel(dot)de>
> Date: Fri, 7 Apr 2023 09:56:02 -0700
> Subject: [PATCH va67 4/9] Handle logical slot conflicts on standby
> MIME-Version: 1.0
> Content-Type: text/plain; charset=UTF-8
> Content-Transfer-Encoding: 8bit
>
> During WAL replay on standby, when slot conflict is identified, invalidate
> such slots. Also do the same thing if wal_level on the primary server is
> reduced to below logical and there are existing logical slots on
> standby. Introduce a new ProcSignalReason value for slot conflict recovery.
>
> Author: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>
> Author: Andres Freund <andres(at)anarazel(dot)de>
> Author: Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com> (in an older version)
> Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>
> Reviewed-by: Andres Freund <andres(at)anarazel(dot)de>
> Reviewed-by: Robert Haas <robertmhaas(at)gmail(dot)com>
> Reviewed-by: Fabrzio de Royes Mello <fabriziomello(at)gmail(dot)com>
> Reviewed-by: Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>
> Reviewed-by: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> Reviewed-by: Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>
> Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
> ---
> src/include/storage/procsignal.h | 1 +
> src/include/storage/standby.h | 2 ++
> src/backend/access/gist/gistxlog.c | 2 ++
> src/backend/access/hash/hash_xlog.c | 1 +
> src/backend/access/heap/heapam.c | 3 +++
> src/backend/access/nbtree/nbtxlog.c | 2 ++
> src/backend/access/spgist/spgxlog.c | 1 +
> src/backend/access/transam/xlog.c | 15 +++++++++++++++
> src/backend/replication/slot.c | 8 +++++++-
> src/backend/storage/ipc/procsignal.c | 3 +++
> src/backend/storage/ipc/standby.c | 20 +++++++++++++++++++-
> src/backend/tcop/postgres.c | 9 +++++++++
> 12 files changed, 65 insertions(+), 2 deletions(-)
>
> diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
> index 905af2231ba..2f52100b009 100644
> --- a/src/include/storage/procsignal.h
> +++ b/src/include/storage/procsignal.h
> @@ -42,6 +42,7 @@ typedef enum
> PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
> PROCSIG_RECOVERY_CONFLICT_LOCK,
> PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
> + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
> PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
> PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
>
> diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
> index 2effdea126f..41f4dc372e6 100644
> --- a/src/include/storage/standby.h
> +++ b/src/include/storage/standby.h
> @@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
> extern void ShutdownRecoveryTransactionEnvironment(void);
>
> extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
> + bool isCatalogRel,
> RelFileLocator locator);
> extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
> + bool isCatalogRel,
> RelFileLocator locator);
> extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
> extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
> diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
> index b7678f3c144..9a86fb3feff 100644
> --- a/src/backend/access/gist/gistxlog.c
> +++ b/src/backend/access/gist/gistxlog.c
> @@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
> XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>
> ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
> + xldata->isCatalogRel,
> rlocator);
> }
>
> @@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
> */
> if (InHotStandby)
> ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
> + xlrec->isCatalogRel,
> xlrec->locator);
> }
>
> diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
> index f2dd9be8d3f..e8e06c62a95 100644
> --- a/src/backend/access/hash/hash_xlog.c
> +++ b/src/backend/access/hash/hash_xlog.c
> @@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
>
> XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
> ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
> + xldata->isCatalogRel,
> rlocator);
> }
>
> diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
> index 8b13e3f8925..f389ceee1ea 100644
> --- a/src/backend/access/heap/heapam.c
> +++ b/src/backend/access/heap/heapam.c
> @@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record)
> */
> if (InHotStandby)
> ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> + xlrec->isCatalogRel,
> rlocator);
>
> /*
> @@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record)
> */
> if (InHotStandby)
> ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> + xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
> rlocator);
>
> /*
> @@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
>
> XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
> ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> + xlrec->isCatalogRel,
> rlocator);
> }
>
> diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
> index 414ca4f6deb..c87e46ed66e 100644
> --- a/src/backend/access/nbtree/nbtxlog.c
> +++ b/src/backend/access/nbtree/nbtxlog.c
> @@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
> XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
>
> ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
> + xlrec->isCatalogRel,
> rlocator);
> }
>
> @@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
>
> if (InHotStandby)
> ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
> + xlrec->isCatalogRel,
> xlrec->locator);
> }
>
> diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
> index b071b59c8ac..459ac929ba5 100644
> --- a/src/backend/access/spgist/spgxlog.c
> +++ b/src/backend/access/spgist/spgxlog.c
> @@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
>
> XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
> ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
> + xldata->isCatalogRel,
> locator);
> }
>
> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
> index 1485e8f9ca9..5227fc675c8 100644
> --- a/src/backend/access/transam/xlog.c
> +++ b/src/backend/access/transam/xlog.c
> @@ -7965,6 +7965,21 @@ xlog_redo(XLogReaderState *record)
> /* Update our copy of the parameters in pg_control */
> memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
>
> + /*
> + * Invalidate logical slots if we are in hot standby and the primary
> + * does not have a WAL level sufficient for logical decoding. No need
> + * to search for potentially conflicting logically slots if standby is
> + * running with wal_level lower than logical, because in that case, we
> + * would have either disallowed creation of logical slots or
> + * invalidated existing ones.
> + */
> + if (InRecovery && InHotStandby &&
> + xlrec.wal_level < WAL_LEVEL_LOGICAL &&
> + wal_level >= WAL_LEVEL_LOGICAL)
> + InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
> + 0, InvalidOid,
> + InvalidTransactionId);
> +
> LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
> ControlFile->MaxConnections = xlrec.MaxConnections;
> ControlFile->max_worker_processes = xlrec.max_worker_processes;
> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
> index c2a9accebf6..1b1b51e21ed 100644
> --- a/src/backend/replication/slot.c
> +++ b/src/backend/replication/slot.c
> @@ -1443,7 +1443,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
> slotname, restart_lsn,
> oldestLSN, snapshotConflictHorizon);
>
> - (void) kill(active_pid, SIGTERM);
> + if (MyBackendType == B_STARTUP)

Is SendProcSignal() marked warn_unused_result or something? I don't see
other callers who don't use its return value void casting it.

> + (void) SendProcSignal(active_pid,
> + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
> + InvalidBackendId);
> + else
> + (void) kill(active_pid, SIGTERM);
> +
> last_signaled_pid = active_pid;
> }

> diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
> index 9f56b4e95cf..3b5d654347e 100644
> --- a/src/backend/storage/ipc/standby.c
> +++ b/src/backend/storage/ipc/standby.c
> @@ -24,6 +24,7 @@
> #include "access/xlogutils.h"
> #include "miscadmin.h"
> #include "pgstat.h"
> +#include "replication/slot.h"
> #include "storage/bufmgr.h"
> #include "storage/lmgr.h"
> #include "storage/proc.h"
> @@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
> */
> void
> ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
> + bool isCatalogRel,
> RelFileLocator locator)
> {
> VirtualTransactionId *backends;
> @@ -491,6 +493,16 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
> PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
> WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
> true);
> +
> + /*
> + * Note that WaitExceedsMaxStandbyDelay() is not taken into account here
> + * (as opposed to ResolveRecoveryConflictWithVirtualXIDs() above). That
> + * seems OK, given that this kind of conflict should not normally be

do you mean "when using a physical replication slot"?

> + * reached, e.g. by using a physical replication slot.
> + */
> + if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
> + InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
> + snapshotConflictHorizon);
> }

0005 LGTM

- Melanie

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2023-04-07 22:35:06 Re: Making background psql nicer to use in tap tests
Previous Message Tom Lane 2023-04-07 22:29:45 Re: Commitfest 2023-03 starting tomorrow!