Re: Synchronizing slots from primary to standby

From: shveta malik <shveta(dot)malik(at)gmail(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, shveta malik <shveta(dot)malik(at)gmail(dot)com>
Subject: Re: Synchronizing slots from primary to standby
Date: 2023-11-23 08:59:03
Message-ID: CAJpy0uD6dWUvBgy8MGdugf_Am4pLXTL_vqcwSeHO13v+Mzc9KA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Nov 20, 2023 at 4:28 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Sat, Nov 18, 2023 at 4:15 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> >
> > On Fri, Nov 17, 2023 at 5:18 PM Drouvot, Bertrand
> > <bertranddrouvot(dot)pg(at)gmail(dot)com> wrote:
> >
> > More Review for v35-0002*
> > ============================
> >
>

Thanks for the feedback. Please find the patch attached and my comments inline.

> More review of v35-0002*
> ====================
> 1.
> +/*
> + * Helper function to check if local_slot is present in remote_slots list.
> + *
> + * It also checks if logical slot is locally invalidated i.e. invalidated on
> + * the standby but valid on the primary server. If found so, it sets
> + * locally_invalidated to true.
> + */
> +static bool
> +slot_exists_in_list(ReplicationSlot *local_slot, List *remote_slots,
> + bool *locally_invalidated)
>
> The name of the function is a bit misleading because it checks the
> validity of the slot not only whether it exists in remote_list. Would
> it be better to name it as ValidateSyncSlot() or something along those
> lines?
>

Sure, updated the name.

> 2.
> +static long
> +synchronize_slots(WalReceiverConn *wrconn)
> {
> ...
> + /* Construct query to get slots info from the primary server */
> + initStringInfo(&s);
> + construct_slot_query(&s);
> ...
> + if (remote_slot->conflicting)
> + remote_slot->invalidated = get_remote_invalidation_cause(wrconn,
> + remote_slot->name);
> ...
>
> +static ReplicationSlotInvalidationCause
> +get_remote_invalidation_cause(WalReceiverConn *wrconn, char *slot_name)
> {
> ...
> + appendStringInfo(&cmd,
> + "SELECT pg_get_slot_invalidation_cause(%s)",
> + quote_literal_cstr(slot_name));
> + res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
>
> Do we really need to query a second time to get the invalidation
> cause? Can we adjust the slot_query to get it in one round trip? I
> think this may not optimize much because the patch uses second round
> trip only for invalidated slots but still looks odd. So unless the
> query becomes too complicated, we should try to achive it one round
> trip.
>

Modified the query to fetch all the info at once.

> 3.
> +static long
> +synchronize_slots(WalReceiverConn *wrconn)
> +{
> ...
> ...
> + /* The syscache access needs a transaction env. */
> + StartTransactionCommand();
> +
> + /* Make things live outside TX context */
> + MemoryContextSwitchTo(oldctx);
> +
> + /* Construct query to get slots info from the primary server */
> + initStringInfo(&s);
> + construct_slot_query(&s);
> +
> + elog(DEBUG2, "slot-sync worker's query:%s \n", s.data);
> +
> + /* Execute the query */
> + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow);
>
> It is okay to perform the above query execution outside the
> transaction context but I would like to know the reason for the same.
> Do we want to retain anything beyond the transaction context or is
> there some other reason to do this outside the transaction context?
>

Modified the comment with the reason. We need to start a transaction
for syscache access. We can end it as soon as walrcv_exec() is over,
but we need the tuple-results to be accessed even after that, thus
those should not be allocated in TopTransactionContext.

> 4.
> +static void
> +construct_slot_query(StringInfo s)
> +{
> + /*
> + * Fetch data for logical failover slots with sync_state either as
> + * SYNCSLOT_STATE_NONE or SYNCSLOT_STATE_READY.
> + */
> + appendStringInfo(s,
> + "SELECT slot_name, plugin, confirmed_flush_lsn,"
> + " restart_lsn, catalog_xmin, two_phase, conflicting, "
> + " database FROM pg_catalog.pg_replication_slots"
> + " WHERE failover and sync_state != 'i'");
> +}
>
> Why would the sync_state on the primary server be any valid value? I
> thought it was set only on physical standby. I think it is better to
> mention the reason for using the sync state and or failover flag in
> the above comments. The current comment doesn't seem of much use as it
> just states what is evident from the query.

Updated the reason in comment. It is mainly for cascading standby to
fetch correct slots.

>
> 5.
> * This check should never pass as on the primary server, we have waited
> + * for the standby's confirmation before updating the logical slot. But to
> + * take care of any bug in that flow, we should retain this check.
> + */
> + if (remote_slot->confirmed_lsn > WalRcv->latestWalEnd)
> + {
> + elog(LOG, "skipping sync of slot \"%s\" as the received slot-sync "
> + "LSN %X/%X is ahead of the standby position %X/%X",
> + remote_slot->name,
> + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
> + LSN_FORMAT_ARGS(WalRcv->latestWalEnd));
> +
>
> This should be elog(ERROR, ..). Normally, we use elog(ERROR, ...) for
> such unexpected cases. And, you don't need to explicitly mention the
> last sentence in the comment: "But to take care of any bug in that
> flow, we should retain this check.".
>

Sure, modified.

> 6.
> +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
> + bool *slot_updated)
> {
> ...
> + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn)
> + {
> + ereport(WARNING,
> + errmsg("not synchronizing slot %s; synchronization would"
> + " move it backwards", remote_slot->name));
>
> I think here elevel should be LOG because user can't do much about
> this. Do we use ';' at other places in the message? But when can we
> hit this case? We can add some comments to state in which scenario
> this possible. OTOH, if this is sort of can't happen case and we have
> kept it to avoid any sort of inconsistency then we can probably use
> elog(ERROR, .. with approapriate LSN locations, so that later the
> problem could be debugged.
>

Converted to ERROR and updated comment

> 7.
> +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
> + bool *slot_updated)
> {
> ...
> +
> + StartTransactionCommand();
> +
> + /* Make things live outside TX context */
> + MemoryContextSwitchTo(oldctx);
> +
> ...
>
> Similar to one of the previous comments, it is not clear to me why the
> patch is doing a memory context switch here. Can we add a comment?
>

I have removed the memory-context-switch here as the results are all
consumed within the span of transaction, so we do not need to retain
those even after commit of txn for this particular case.

> 8.
> + /* User created slot with the same name exists, raise ERROR. */
> + else if (sync_state == SYNCSLOT_STATE_NONE)
> + {
> + ereport(ERROR,
> + errmsg("not synchronizing slot %s; it is a user created slot",
> + remote_slot->name));
> + }
>
> Won't we need error_code in this error? Also, the message doesn't seem
> to follow the code's usual style.

Modified. I have added errdetail as well, but not sure what we can add
as error-hint, Shall we add something like: Try renaming existing
slot.

>
> 9.
> +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
> + bool *slot_updated)
> {
> ...
> + else
> + {
> + TransactionId xmin_horizon = InvalidTransactionId;
> + ReplicationSlot *slot;
> +
> + ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL,
> + remote_slot->two_phase, false);
> + slot = MyReplicationSlot;
> +
> + SpinLockAcquire(&slot->mutex);
> + slot->data.database = get_database_oid(remote_slot->database, false);
> +
> + /* Mark it as sync initiated by slot-sync worker */
> + slot->data.sync_state = SYNCSLOT_STATE_INITIATED;
> + slot->data.failover = true;
> +
> + namestrcpy(&slot->data.plugin, remote_slot->plugin);
> + SpinLockRelease(&slot->mutex);
> +
> + ReplicationSlotReserveWal();
> +
>
> How and when will this init state (SYNCSLOT_STATE_INITIATED) persist to disk?

This will be inside wait_for_primary_and_sync. I have reorganized code
here (removed wait_for_primary_and_sync) to make it more readable.

>
> 10.
> + if (slot_updated)
> + SlotSyncWorker->last_update_time = now;
> +
> + else if (TimestampDifferenceExceeds(SlotSyncWorker->last_update_time,
> + now, WORKER_INACTIVITY_THRESHOLD_MS))
>
> Empty line between if/else if is not required.
>

This is added by pg_indent. Not sure how we can correct it.

> 11.
> +static WalReceiverConn *
> +remote_connect()
> +{
> + WalReceiverConn *wrconn = NULL;
> + char *err;
> +
> + wrconn = walrcv_connect(PrimaryConnInfo, true, false, "slot-sync", &err);
> + if (wrconn == NULL)
> + ereport(ERROR,
> + (errmsg("could not connect to the primary server: %s", err)));
>
> Let's use appname similar to what we do for "walreceiver" as shown below:
> /* Establish the connection to the primary for XLOG streaming */
> wrconn = walrcv_connect(conninfo, false, false,
> cluster_name[0] ? cluster_name : "walreceiver",
> &err);
> if (!wrconn)
> ereport(ERROR,
> (errcode(ERRCODE_CONNECTION_FAILURE),
> errmsg("could not connect to the primary server: %s", err)));
>
> Some proposals for default appname "slotsynchronizer", "slotsync
> worker". Also, use the same error code as used by "walreceiver".

Modified.

>
> 12. Do we need the handling of the slotsync worker in
> GetBackendTypeDesc()? Please check without that what value this patch
> displays for backend_type.

It currently displays "slot sync worker'. It is the same desc which
launcher has launched this worker with (snprintf(bgw.bgw_type,
BGW_MAXLEN, "slot sync worker")).

postgres=# select backend_type from pg_stat_activity;
backend_type
------------------------------
logical replication launcher
slot sync worker
.......

For slot sync and logical launcher, BackendType is B_BG_WORKER and
thus pg_stat_get_activity() for this type displays backend_type as the
one given during background process registration and thus we get these
correctly. But pg_stat_get_io() does not have the same
implementation, it displays 'background worker' as the description. I
think slot-sync and logical launcher are one of these entries

postgres=# select backend_type from pg_stat_io;
backend_type
---------------------
autovacuum launcher
..
background worker
background worker
background worker
background worker
background worker
background writer
.....

>
> 13.
> +/*
> + * Re-read the config file.
> + *
> + * If primary_conninfo has changed, reconnect to primary.
> + */
> +static void
> +slotsync_reread_config(WalReceiverConn **wrconn)
> +{
> + char *conninfo = pstrdup(PrimaryConnInfo);
> +
> + ConfigReloadPending = false;
> + ProcessConfigFile(PGC_SIGHUP);
> +
> + /* Reconnect if GUC primary_conninfo got changed */
> + if (strcmp(conninfo, PrimaryConnInfo) != 0)
> + {
> + if (*wrconn)
> + walrcv_disconnect(*wrconn);
> +
> + *wrconn = remote_connect();
>
> I think we should exit the worker in this case and allow it to
> reconnect. See the similar handling in maybe_reread_subscription().
> One effect of not doing is that the dbname patch has used in
> ReplSlotSyncWorkerMain() will become inconsistent.
>

Modified as suggested.

> 14.
> +void
> +ReplSlotSyncWorkerMain(Datum main_arg)
> +{
> ...
> ...
> + /*
> + * If the standby has been promoted, skip the slot synchronization process.
> + *
> + * Although the startup process stops all the slot-sync workers on
> + * promotion, the launcher may not have realized the promotion and could
> + * start additional workers after that. Therefore, this check is still
> + * necessary to prevent these additional workers from running.
> + */
> + if (PromoteIsTriggered())
> + exit(0);
> ...
> ...
> + /* Check if got promoted */
> + if (!RecoveryInProgress())
> + {
> + /*
> + * Drop the slots for which sync is initiated but not yet
> + * completed i.e. they are still waiting for the primary server to
> + * catch up.
> + */
> + slotsync_drop_initiated_slots();
> + ereport(LOG,
> + errmsg("exiting slot-sync woker on promotion of standby"));
>
> I think we should never reach this code in non-standby mode. It should
> elog(ERROR,.. Can you please explain why promotion handling is
> required here?

I will handle this in the next version. It needs some more thoughts,
especially on how 'PromoteIsTriggered' can be removed.

>
> 15.
> @@ -190,6 +190,8 @@ static const char *const BuiltinTrancheNames[] = {
> "LogicalRepLauncherDSA",
> /* LWTRANCHE_LAUNCHER_HASH: */
> "LogicalRepLauncherHash",
> + /* LWTRANCHE_SLOTSYNC_DSA: */
> + "SlotSyncWorkerDSA",
> };
> ...
> ...
> + LWTRANCHE_SLOTSYNC_DSA,
> LWTRANCHE_FIRST_USER_DEFINED,
> } BuiltinTrancheIds;
>
> These are not used in the patch.
>

Removed.

> 16.
> +/* -------------------------------
> + * LIST_DBID_FOR_FAILOVER_SLOTS command
> + * -------------------------------
> + */
> +typedef struct ListDBForFailoverSlotsCmd
> +{
> + NodeTag type;
> + List *slot_names;
> +} ListDBForFailoverSlotsCmd;
>
> ...
>
> +/*
> + * Failover logical slots data received from remote.
> + */
> +typedef struct WalRcvFailoverSlotsData
> +{
> + Oid dboid;
> +} WalRcvFailoverSlotsData;
>
> These structures don't seem to be used in the current version of the patch.

Removed.

>
> 17.
> --- a/src/include/replication/slot.h
> +++ b/src/include/replication/slot.h
> @@ -15,7 +15,6 @@
> #include "storage/lwlock.h"
> #include "storage/shmem.h"
> #include "storage/spin.h"
> -#include "replication/walreceiver.h"
> ...
> ...
> -extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn);
> extern List *GetStandbySlotList(bool copy);
>
> Why the above two are removed as part of this patch?

WaitForStandbyLSN() is no longer there, so that is why it was removed.
I think it should have been removed from patch0001. WIll make this
change in the next version where we have pacth0001 changes coming.

Regarding header inclusion and 'ReplicationSlotDropAtPubNode' removal,
not sure when those were removed. But my best guess is that the header
inclusion chain has changed a little bit in patch. The tablesync.c
uses ReplicationSlotDropAtPubNode which is part of subscriptioncmds.h.
Now in our patch since tablesync.c includes subscriptioncmds.h and
thus slot.h need not to extern it for tablesync.c. And if we can get
rid of ReplicationSlotDropAtPubNode in slot.h, then walreceiver.h
inclusion can also be removed as that was needed for 'WalReceiverConn'
argument of ReplicationSlotDropAtPubNode. There could be other 'header
inclusions' involved as well but this seems the primary reason.

> --
> With Regards,
> Amit Kapila.

Attachment Content-Type Size
v38-0003-Allow-slot-sync-worker-to-wait-for-the-cascading.patch application/octet-stream 8.0 KB
v38-0001-Allow-logical-walsenders-to-wait-for-the-physica.patch application/octet-stream 129.6 KB
v38-0002-Add-logical-slot-sync-capability-to-the-physical.patch application/octet-stream 101.6 KB

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Kapila 2023-11-23 09:03:14 Re: pgoutput incorrectly replaces missing values with NULL since PostgreSQL 15
Previous Message vignesh C 2023-11-23 08:48:52 Re: pg_upgrade and logical replication