Re: Synchronizing slots from primary to standby

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>
Cc: "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com>, shveta malik <shveta(dot)malik(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>
Subject: Re: Synchronizing slots from primary to standby
Date: 2023-11-20 10:58:01
Message-ID: CAA4eK1L8qPMGvomC154+jxqYxu3ieoV1STzboBEfLdVMmQcxCQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Sat, Nov 18, 2023 at 4:15 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Fri, Nov 17, 2023 at 5:18 PM Drouvot, Bertrand
> <bertranddrouvot(dot)pg(at)gmail(dot)com> wrote:
>
> More Review for v35-0002*
> ============================
>

More review of v35-0002*
====================
1.
+/*
+ * Helper function to check if local_slot is present in remote_slots list.
+ *
+ * It also checks if logical slot is locally invalidated i.e. invalidated on
+ * the standby but valid on the primary server. If found so, it sets
+ * locally_invalidated to true.
+ */
+static bool
+slot_exists_in_list(ReplicationSlot *local_slot, List *remote_slots,
+ bool *locally_invalidated)

The name of the function is a bit misleading because it checks the
validity of the slot not only whether it exists in remote_list. Would
it be better to name it as ValidateSyncSlot() or something along those
lines?

2.
+static long
+synchronize_slots(WalReceiverConn *wrconn)
{
...
+ /* Construct query to get slots info from the primary server */
+ initStringInfo(&s);
+ construct_slot_query(&s);
...
+ if (remote_slot->conflicting)
+ remote_slot->invalidated = get_remote_invalidation_cause(wrconn,
+ remote_slot->name);
...

+static ReplicationSlotInvalidationCause
+get_remote_invalidation_cause(WalReceiverConn *wrconn, char *slot_name)
{
...
+ appendStringInfo(&cmd,
+ "SELECT pg_get_slot_invalidation_cause(%s)",
+ quote_literal_cstr(slot_name));
+ res = walrcv_exec(wrconn, cmd.data, 1, slotRow);

Do we really need to query a second time to get the invalidation
cause? Can we adjust the slot_query to get it in one round trip? I
think this may not optimize much because the patch uses second round
trip only for invalidated slots but still looks odd. So unless the
query becomes too complicated, we should try to achive it one round
trip.

3.
+static long
+synchronize_slots(WalReceiverConn *wrconn)
+{
...
...
+ /* The syscache access needs a transaction env. */
+ StartTransactionCommand();
+
+ /* Make things live outside TX context */
+ MemoryContextSwitchTo(oldctx);
+
+ /* Construct query to get slots info from the primary server */
+ initStringInfo(&s);
+ construct_slot_query(&s);
+
+ elog(DEBUG2, "slot-sync worker's query:%s \n", s.data);
+
+ /* Execute the query */
+ res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow);

It is okay to perform the above query execution outside the
transaction context but I would like to know the reason for the same.
Do we want to retain anything beyond the transaction context or is
there some other reason to do this outside the transaction context?

4.
+static void
+construct_slot_query(StringInfo s)
+{
+ /*
+ * Fetch data for logical failover slots with sync_state either as
+ * SYNCSLOT_STATE_NONE or SYNCSLOT_STATE_READY.
+ */
+ appendStringInfo(s,
+ "SELECT slot_name, plugin, confirmed_flush_lsn,"
+ " restart_lsn, catalog_xmin, two_phase, conflicting, "
+ " database FROM pg_catalog.pg_replication_slots"
+ " WHERE failover and sync_state != 'i'");
+}

Why would the sync_state on the primary server be any valid value? I
thought it was set only on physical standby. I think it is better to
mention the reason for using the sync state and or failover flag in
the above comments. The current comment doesn't seem of much use as it
just states what is evident from the query.

5.
* This check should never pass as on the primary server, we have waited
+ * for the standby's confirmation before updating the logical slot. But to
+ * take care of any bug in that flow, we should retain this check.
+ */
+ if (remote_slot->confirmed_lsn > WalRcv->latestWalEnd)
+ {
+ elog(LOG, "skipping sync of slot \"%s\" as the received slot-sync "
+ "LSN %X/%X is ahead of the standby position %X/%X",
+ remote_slot->name,
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ LSN_FORMAT_ARGS(WalRcv->latestWalEnd));
+

This should be elog(ERROR, ..). Normally, we use elog(ERROR, ...) for
such unexpected cases. And, you don't need to explicitly mention the
last sentence in the comment: "But to take care of any bug in that
flow, we should retain this check.".

6.
+synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
+ bool *slot_updated)
{
...
+ if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn)
+ {
+ ereport(WARNING,
+ errmsg("not synchronizing slot %s; synchronization would"
+ " move it backwards", remote_slot->name));

I think here elevel should be LOG because user can't do much about
this. Do we use ';' at other places in the message? But when can we
hit this case? We can add some comments to state in which scenario
this possible. OTOH, if this is sort of can't happen case and we have
kept it to avoid any sort of inconsistency then we can probably use
elog(ERROR, .. with approapriate LSN locations, so that later the
problem could be debugged.

7.
+synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
+ bool *slot_updated)
{
...
+
+ StartTransactionCommand();
+
+ /* Make things live outside TX context */
+ MemoryContextSwitchTo(oldctx);
+
...

Similar to one of the previous comments, it is not clear to me why the
patch is doing a memory context switch here. Can we add a comment?

8.
+ /* User created slot with the same name exists, raise ERROR. */
+ else if (sync_state == SYNCSLOT_STATE_NONE)
+ {
+ ereport(ERROR,
+ errmsg("not synchronizing slot %s; it is a user created slot",
+ remote_slot->name));
+ }

Won't we need error_code in this error? Also, the message doesn't seem
to follow the code's usual style.

9.
+synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
+ bool *slot_updated)
{
...
+ else
+ {
+ TransactionId xmin_horizon = InvalidTransactionId;
+ ReplicationSlot *slot;
+
+ ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL,
+ remote_slot->two_phase, false);
+ slot = MyReplicationSlot;
+
+ SpinLockAcquire(&slot->mutex);
+ slot->data.database = get_database_oid(remote_slot->database, false);
+
+ /* Mark it as sync initiated by slot-sync worker */
+ slot->data.sync_state = SYNCSLOT_STATE_INITIATED;
+ slot->data.failover = true;
+
+ namestrcpy(&slot->data.plugin, remote_slot->plugin);
+ SpinLockRelease(&slot->mutex);
+
+ ReplicationSlotReserveWal();
+

How and when will this init state (SYNCSLOT_STATE_INITIATED) persist to disk?

10.
+ if (slot_updated)
+ SlotSyncWorker->last_update_time = now;
+
+ else if (TimestampDifferenceExceeds(SlotSyncWorker->last_update_time,
+ now, WORKER_INACTIVITY_THRESHOLD_MS))

Empty line between if/else if is not required.

11.
+static WalReceiverConn *
+remote_connect()
+{
+ WalReceiverConn *wrconn = NULL;
+ char *err;
+
+ wrconn = walrcv_connect(PrimaryConnInfo, true, false, "slot-sync", &err);
+ if (wrconn == NULL)
+ ereport(ERROR,
+ (errmsg("could not connect to the primary server: %s", err)));

Let's use appname similar to what we do for "walreceiver" as shown below:
/* Establish the connection to the primary for XLOG streaming */
wrconn = walrcv_connect(conninfo, false, false,
cluster_name[0] ? cluster_name : "walreceiver",
&err);
if (!wrconn)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the primary server: %s", err)));

Some proposals for default appname "slotsynchronizer", "slotsync
worker". Also, use the same error code as used by "walreceiver".

12. Do we need the handling of the slotsync worker in
GetBackendTypeDesc()? Please check without that what value this patch
displays for backend_type.

13.
+/*
+ * Re-read the config file.
+ *
+ * If primary_conninfo has changed, reconnect to primary.
+ */
+static void
+slotsync_reread_config(WalReceiverConn **wrconn)
+{
+ char *conninfo = pstrdup(PrimaryConnInfo);
+
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+
+ /* Reconnect if GUC primary_conninfo got changed */
+ if (strcmp(conninfo, PrimaryConnInfo) != 0)
+ {
+ if (*wrconn)
+ walrcv_disconnect(*wrconn);
+
+ *wrconn = remote_connect();

I think we should exit the worker in this case and allow it to
reconnect. See the similar handling in maybe_reread_subscription().
One effect of not doing is that the dbname patch has used in
ReplSlotSyncWorkerMain() will become inconsistent.

14.
+void
+ReplSlotSyncWorkerMain(Datum main_arg)
+{
...
...
+ /*
+ * If the standby has been promoted, skip the slot synchronization process.
+ *
+ * Although the startup process stops all the slot-sync workers on
+ * promotion, the launcher may not have realized the promotion and could
+ * start additional workers after that. Therefore, this check is still
+ * necessary to prevent these additional workers from running.
+ */
+ if (PromoteIsTriggered())
+ exit(0);
...
...
+ /* Check if got promoted */
+ if (!RecoveryInProgress())
+ {
+ /*
+ * Drop the slots for which sync is initiated but not yet
+ * completed i.e. they are still waiting for the primary server to
+ * catch up.
+ */
+ slotsync_drop_initiated_slots();
+ ereport(LOG,
+ errmsg("exiting slot-sync woker on promotion of standby"));

I think we should never reach this code in non-standby mode. It should
elog(ERROR,.. Can you please explain why promotion handling is
required here?

15.
@@ -190,6 +190,8 @@ static const char *const BuiltinTrancheNames[] = {
"LogicalRepLauncherDSA",
/* LWTRANCHE_LAUNCHER_HASH: */
"LogicalRepLauncherHash",
+ /* LWTRANCHE_SLOTSYNC_DSA: */
+ "SlotSyncWorkerDSA",
};
...
...
+ LWTRANCHE_SLOTSYNC_DSA,
LWTRANCHE_FIRST_USER_DEFINED,
} BuiltinTrancheIds;

These are not used in the patch.

16.
+/* -------------------------------
+ * LIST_DBID_FOR_FAILOVER_SLOTS command
+ * -------------------------------
+ */
+typedef struct ListDBForFailoverSlotsCmd
+{
+ NodeTag type;
+ List *slot_names;
+} ListDBForFailoverSlotsCmd;

...

+/*
+ * Failover logical slots data received from remote.
+ */
+typedef struct WalRcvFailoverSlotsData
+{
+ Oid dboid;
+} WalRcvFailoverSlotsData;

These structures don't seem to be used in the current version of the patch.

17.
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,7 +15,6 @@
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
-#include "replication/walreceiver.h"
...
...
-extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn);
extern List *GetStandbySlotList(bool copy);

Why the above two are removed as part of this patch?

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Kapila 2023-11-20 10:59:15 Re: Synchronizing slots from primary to standby
Previous Message Tomas Vondra 2023-11-20 10:51:47 Re: Inquiry on Generating Bitmaps from Filter Conditions in Index Scans