Re: Synchronizing slots from primary to standby

From: Ajin Cherian <itsajin(at)gmail(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: shveta malik <shveta(dot)malik(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>
Subject: Re: Synchronizing slots from primary to standby
Date: 2023-10-05 08:54:14
Message-ID: CAFPTHDbMB42KwhCVZQP0Jr+rTyL1raPMyffuk14ei6bphtWSyg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Sep 27, 2023 at 2:37 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> Here are some more review comments for the patch v19-0002.
>
> This is a WIP.... these review comments are all for the file slotsync.c
>
> ======
> src/backend/replication/logical/slotsync.c
>
> 1. wait_for_primary_slot_catchup
>
> + WalRcvExecResult *res;
> + TupleTableSlot *slot;
> + Oid slotRow[1] = {LSNOID};
> + StringInfoData cmd;
> + bool isnull;
> + XLogRecPtr restart_lsn;
> +
> + for (;;)
> + {
> + int rc;
>
> I could not recognize a reason why 'rc' is declared within the loop,
> but none of the other local variables are. Personally, I'd declare all
> variables at the deepest scope (e.g. inside the for loop).
>

fixed.
> ~~~
>
> 2. get_local_synced_slot_names
>
> +/*
> + * Get list of local logical slot names which are synchronized from
> + * primary and belongs to one of the DBs passed in.
> + */
> +static List *
> +get_local_synced_slot_names(Oid *dbids)
> +{
>
> IIUC, this function gets called only from the drop_obsolete_slots()
> function. But I thought this list of local slot names (i.e. for the
> dbids that this worker is handling) would be something that perhaps
> could the initialized one time for the worker, instead of it being
> re-calculated every single time the slots processing/dropping happens.
> Isn't the current code expending too much effort recalculating over
> and over but giving back the same list every time?
>

The reason this is being done is because the dblist could be changed at any time
by the launcher, which requires us to recalculate the list of slots
specific to each workers dblist.

> ~~~
>
> 3. get_local_synced_slot_names
>
> + for (int i = 0; i < max_replication_slots; i++)
> + {
> + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
> +
> + /* Check if it is logical synchronized slot */
> + if (s->in_use && SlotIsLogical(s) && s->data.synced)
> + {
> + for (int j = 0; j < MySlotSyncWorker->dbcount; j++)
> + {
>
> Loop variables are not declared in the common PG code way.
>

fixed.

> ~~~
>
> 4. slot_exists_locally
>
> +static bool
> +slot_exists_locally(List *remote_slots, ReplicationSlot *local_slot,
> + bool *locally_invalidated)
> +{
> + ListCell *cell;
> +
> + foreach(cell, remote_slots)
> + {
> + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell);
> +
> + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
> + {
> + /*
> + * if remote slot is marked as non-conflicting (i.e. not
> + * invalidated) but local slot is marked as invalidated, then set
> + * the bool.
> + */
> + if (!remote_slot->conflicting &&
> + SlotIsLogical(local_slot) &&
> + local_slot->data.invalidated != RS_INVAL_NONE)
> + *locally_invalidated = true;
> +
> + return true;
> + }
> + }
> +
> + return false;
> +}
>
> Why is there a SlotIsLogical(local_slot) check buried in this
> function? How is slot_exists_locally() getting called with a
> non-logical local_slot? Shouldn't that have been screened out long
> before here?
>

Removed that because it is redundant.

> ~~~
>
> 5. use_slot_in_query
>
> +static bool
> +use_slot_in_query(char *slot_name, Oid *dbids)
>
> There are multiple non-standard for-loop variable declarations in this function.
>

fixed.

> ~~~
>
> 6. compute_naptime
>
> + * The first slot managed by each worker is chosen for monitoring purpose.
> + * If the lsn of that slot changes during each sync-check time, then the
> + * nap time is kept at regular value of WORKER_DEFAULT_NAPTIME_MS.
> + * When no lsn change is observed for WORKER_INACTIVITY_THRESHOLD_MS
> + * time, then the nap time is increased to WORKER_INACTIVITY_NAPTIME_MS.
> + * This nap time is brought back to WORKER_DEFAULT_NAPTIME_MS as soon as
> + * lsn change is observed.
>
> 6a.
> /regular value/the regular value/
>
> /for WORKER_INACTIVITY_THRESHOLD_MS time/within the threshold period
> (WORKER_INACTIVITY_THRESHOLD_MS)/
>

Fixed.

> ~
>
> 6b.
> /as soon as lsn change is observed./as soon as another lsn change is observed./
>

fixed.

> ~~~
>
> 7.
> + * The caller is supposed to ignore return-value of 0. The 0 value is returned
> + * for the slots other that slot being monitored.
> + */
> +static long
> +compute_naptime(RemoteSlot *remote_slot)
>
> This rule about the returning 0 seemed hacky to me. IMO this would be
> a better API to pass long *naptime (which this function either updates
> or doesn't update, depending on this being the "monitored" slot.
> Knowing the current naptime is also useful to improve the function
> logic (see the next review comment below).
>
> Also, since this function is really only toggling naptime between 2
> values, it would be helpful to assert that
>
> Assert(*naptime == WORKER_DEFAULT_NAPTIME_MS || *naptime ==
> WORKER_INACTIVITY_NAPTIME_MS);
>

fixed.

> ~~~
>
> 8.
> + if (NameStr(MySlotSyncWorker->monitoring_info.slot_name)[0] == '\0')
> + {
> + /*
> + * First time, just update the name and lsn and return regular
> + * nap time. Start comparison from next time onward.
> + */
> + strcpy(NameStr(MySlotSyncWorker->monitoring_info.slot_name),
> + remote_slot->name);
>
> I wasn't sure why it was necessary to identify the "monitoring" slot
> by name. Why doesn't the compute_naptime just get called only for the
> 1st slot found in the tuple loop instead of all the strcmp business
> trying to match monitor names?
>
> And, if the monitored slot gets "dropped", then so what; next time
> another slot will be the first tuple so will automatically take its
> place, right?
>

Yes, that is correct. Fixed as commented.

> ~~~
>
> 9.
> + /*
> + * If new received lsn (remote one) is different from what we have in
> + * our local slot, then update last_update_time.
> + */
> + if (MySlotSyncWorker->monitoring_info.confirmed_lsn !=
> + remote_slot->confirmed_lsn)
> + MySlotSyncWorker->monitoring_info.last_update_time = now;
> +
> + MySlotSyncWorker->monitoring_info.confirmed_lsn =
> + remote_slot->confirmed_lsn;
>
> Doesn't it make more sense to also put that 'confirmed_lsn' assignment
> under the same condition? e.g. No need to overwrite the same value
> again.
>

Fixed.

> ~~~
>
> 10.
> + /* If the inactivity time reaches the threshold, increase nap time */
> + if (TimestampDifferenceExceeds(MySlotSyncWorker->monitoring_info.last_update_time,
> + now, WORKER_INACTIVITY_THRESHOLD_MS))
> + return WORKER_INACTIVITY_NAPTIME_MS;
> + else
> + return WORKER_DEFAULT_NAPTIME_MS;
> + }
>
> Somehow this feels overcomplicated to me.
>
> In reality, the naptime is only toggling between 2 values (DEFAULT and
> INACTIVITY) so we should never need to be testing
> TimestampDifferenceExceeds again and again on subsequent calls (there
> might be 1000s of them)
>
> Once naptime is WORKER_INACTIVITY_NAPTIME_MS we know to reset it back
> to WORKER_DEFAULT_NAPTIME_MS only if
> (MySlotSyncWorker->monitoring_info.confirmed_lsn !=
> remote_slot->confirmed_lsn) is detected.
>
> Basically, I think the algorithm should be like the code below:
>
> TimestampTz now = GetCurrentTimestamp();
>
> if (MySlotSyncWorker->monitoring_info.confirmed_lsn !=
> remote_slot->confirmed_lsn)
> {
> MySlotSyncWorker->monitoring_info.last_update_time = now;
> MySlotSyncWorker->monitoring_info.confirmed_lsn = remote_slot->confirmed_lsn;
>
> /* Something changed; reset naptime to default. */
> *naptime = WORKER_DEFAULT_NAPTIME_MS;
> }
> else
> {
> if (*naptime == WORKER_DEFAULT_NAPTIME_MS)
> {
> /* If the inactivity time reaches the threshold, increase nap time. */
> if (TimestampDifferenceExceeds(MySlotSyncWorker->monitoring_info.last_update_time,
> now, WORKER_INACTIVITY_THRESHOLD_MS))
> *naptime = WORKER_INACTIVITY_NAPTIME_MS;
> }
> }
>

Fixed as suggested.

> ~~~
>
> 11. get_remote_invalidation_cause
>
> +/*
> + * Get Remote Slot's invalidation cause.
> + *
> + * This gets invalidation cause of remote slot.
> + */
> +static ReplicationSlotInvalidationCause
> +get_remote_invalidation_cause(WalReceiverConn *wrconn, char *slot_name)
> +{
>
> Isn't that function comment just repeating itself?
>

Fixed.

> ~~~
>
> 12.
> + initStringInfo(&cmd);
> + appendStringInfo(&cmd,
> + "select pg_get_slot_invalidation_cause(%s)",
> + quote_literal_cstr(slot_name));
>
> Use uppercase "SELECT" for consistency with other SQL.
>

Fixed.

> ~~~
>
> 13.
> + /* Make things live outside TX context */
> + MemoryContextSwitchTo(oldctx);
> +
> + initStringInfo(&cmd);
> + appendStringInfo(&cmd,
> + "select pg_get_slot_invalidation_cause(%s)",
> + quote_literal_cstr(slot_name));
> + res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
> + pfree(cmd.data);
> +
> + CommitTransactionCommand();
> +
> + /* Switch to oldctx we saved */
> + MemoryContextSwitchTo(oldctx);
>
> There are 2x MemoryContextSwitchTo(oldctx) here. Is that deliberate?
>

Yes, that is required as both start transaction and commit transaction
could change memory
context.

> ~~~
>
> 14.
> + if (res->status != WALRCV_OK_TUPLES)
> + ereport(ERROR,
> + (errmsg("could not fetch invalidation cuase for slot \"%s\" from"
> + " primary: %s", slot_name, res->err)));
>
> typo /cuase/cause/
>

fixed.

> ~~~
>
> 15.
> + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
> + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
> + ereport(ERROR,
> + (errmsg("slot \"%s\" disapeared from the primary",
> + slot_name)));
>
> typo /disapeared/disappeared/
>
> ~~~
>
>
> 16. drop_obsolete_slots
>
> +/*
> + * Drop obsolete slots
> + *
> + * Drop the slots which no longer need to be synced i.e. these either
> + * do not exist on primary or are no longer part of synchronize_slot_names.
> + *
> + * Also drop the slots which are valid on primary and got invalidated
> + * on standby due to conflict (say required rows removed on primary).
> + * The assumption is, these will get recreated in next sync-cycle and
> + * it is okay to drop and recreate such slots as long as these are not
> + * consumable on standby (which is the case currently).
> + */
>
> /which no/that no/
>
> /which are/that are/
>
> /these will/that these will/
>
> /and got invalidated/that got invalidated/
>

Fixed.

> ~~~
>
> 17.
> + /* If this slot is being monitored, clean-up the monitoring info */
> + if (strcmp(NameStr(local_slot->data.name),
> + NameStr(MySlotSyncWorker->monitoring_info.slot_name)) == 0)
> + {
> + MemSet(NameStr(MySlotSyncWorker->monitoring_info.slot_name), 0, NAMEDATALEN);
> + MySlotSyncWorker->monitoring_info.confirmed_lsn = 0;
> + MySlotSyncWorker->monitoring_info.last_update_time = 0;
> + }
>
> Maybe it is better to assign InvalidXLogRecPtr instead of 0 to the cleared lsn.
>

Removed this as the slot_name is no longer required in this structure.

> ~
>
> Alternatively, consider just zapping the entire monitoring_info
> structure in one go:
> MemSet(&MySlotSyncWorker->monitoring_info, 0,
> sizeof(MySlotSyncWorker->monitoring_info));
>

Code removed.

> ~~~
>
> 18. construct_slot_query (calling use_slot_in_query)
>
> This separation of functions (use_slot_in_query /
> construct_slot_query) seems awkward to me. The use_slot_in_query()
> function is only called by construct_slot_query(). I felt it might be
> simpler to keep all the logical with the construct_slot_query().
>
> Furthermore, it seemed strange to iterate all the DBs (to populate the
> "WHERE database IN" clause) and then iterate all the DBs multiple
> times again in use_slot_in_query (looking for slots to populate the
> "AND slot_name IN (" clause).
>
> Maybe I misunderstand the reason for this structuring, but IMO it
> would be simpler code to keep all the logic in construct_slot_query()
> like:
>
> a. Initialize with empty dblist, empty slotlist.
> b. Iterate all dbids
> - constructing the dblist as you go
> - constructing the slot list as you go (if synchronize_slot_names is
> not "" or "*")
> c. Finally, build the query: basic + dblist-clause + optional slotlist-clause
>

This I feel will make it more complicated as to get dbid of slot, we need to
search hash, which requires locking, so keeping that seperate.

> ~~~
>
> 19. construct_slot_query
>
> Why does this function return a boolean? I only see it returns true,
> but never false.
>

Fixed.

> ~~~
>
> 20.
> + {
> + ListCell *lc;
> + bool first_slot = true;
> +
> +
> + foreach(lc, sync_slot_names_list)
>
> Unnecessary blank line.
>
> ~~~
>
> 21. synchronize_one_slot
>
> +/*
> + * Synchronize single slot to given position.
> + *
> + * This creates new slot if there is no existing one and updates the
> + * metadata of existing slots as per the data received from the primary.
> + */
> +static void
> +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
>
> /creates new slot/creates a new slot/
>
> /metadata of existing slots/metadata of the slot/
>
> ~~~
>
> 22
>
> + /* Search for the named slot and mark it active if we find it. */
> + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
> + for (int i = 0; i < max_replication_slots; i++)
> + {
> + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
> +
> + if (!s->in_use)
> + continue;
> +
> + if (strcmp(NameStr(s->data.name), remote_slot->name) == 0)
> + {
> + found = true;
> + break;
> + }
> + }
> + LWLockRelease(ReplicationSlotControlLock);
> 22a.
> "and mark it active if we find it." -- What code here is marking
> anything active?
>
> ~
>
> 22b.
> Uncommon style of loop variable declaration
>

Fixed all above.

> ~
>
> 22c.
> IMO it is over-complicated code; e.g. same loop can be written like this:
>
> SUGGESTION
> for (i = 0; i < max_replication_slots && !found; i++)
> {
> ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
>
> if (s->in_use)
> found = (strcmp(NameStr(s->data.name), remote_slot->name) == 0);
> }
>

Fixed as suggested.

> ~~~
>
> 23. synchronize_slots
>
> + /* Construct query to get slots info from the primary */
> + initStringInfo(&s);
> + if (!construct_slot_query(&s, dbids))
> + {
> + pfree(s.data);
> + CommitTransactionCommand();
> + LWLockRelease(SlotSyncWorkerLock);
> + return naptime;
> + }
>
> As noted elsewhere, it seems construct_slot_query() will never return
> false and so this block of code is unreachable.
>

Removed this code.

> ~~~
>
> 24.
> + /* Create list of remote slot names to be used by drop_obsolete_slots */
> + remote_slot_list = lappend(remote_slot_list, remote_slot);
>
> This is a list of slots, not just slot names.
>

Fixed.

> ~~~
>
> 25.
> + /*
> + * Update nap time in case of non-zero value returned. The zero value
> + * is returned if remote_slot is not the one being monitored.
> + */
> + value = compute_naptime(remote_slot);
> + if (value)
> + naptime = value;
>
> If the compute_naptime API is changed as suggested in a prior review
> comment then this can be simplified to something like:
>
> SUGGESTION:
> /* Update nap time as required depending on slot activity. */
> compute_naptime(remote_slot, &naptime);
>

Fixed.

> ~~~
>
> 26.
> + /*
> + * Drop local slots which no longer need to be synced i.e. these either do
> + * not exist on primary or are no longer part of synchronize_slot_names.
> + */
> + drop_obsolete_slots(dbids, remote_slot_list);
>
> /which no longer/that no longer/
>
> I thought it might be better to omit the "i.e." part. Just leave it to
> the function-header of drop_obsolete_slots for a detailed explanation
> about *which* slots are candidates for dropping.
>

Fixed.

> ~
>
> 27.
> + /* We are done, free remot_slot_list elements */
> + foreach(cell, remote_slot_list)
> + {
> + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell);
> +
> + pfree(remote_slot);
> + }
>
> 27a.
> /remot_slot_list/remote_slot_list/
>

Fixed.

> ~
>
> 27b.
> Isn't this just the same as the one-liner:
>
> list_free_deep(remote_slot_list);
>
> ~~~
>
> 28.
> +/*
> + * Initialize the list from raw synchronize_slot_names and cache it, in order
> + * to avoid parsing it repeatedly. Done at slot-sync worker startup and after
> + * each SIGHUP.
> + */
> +static void
> +SlotSyncInitSlotNamesList()
> +{
> + char *rawname;
> +
> + if (strcmp(synchronize_slot_names, "") != 0 &&
> + strcmp(synchronize_slot_names, "*") != 0)
> + {
> + rawname = pstrdup(synchronize_slot_names);
> + SplitIdentifierString(rawname, ',', &sync_slot_names_list);
> + }
> +}
>
> 28a.
> Why this static function name is camel-case, unlike all the others?
>

Fixed.

> ~
>
> 28b.
> What about when the sync_slot_names_list changes from value to "" or
> "*". Shouldn't this function be setting sync_slot_names_list = NIL for
> that scenario?
>

I modified this logic to free sync_slot_names_list prior to setting
and initializing it to NIL.
> ~~~
>
> 29. remote_connect
>
> +/*
> + * Connect to remote (primary) server.
> + *
> + * This uses primary_conninfo in order to connect to primary. For slot-sync
> + * to work, primary_conninfo is expected to have dbname as well.
> + */
> +static WalReceiverConn *
> +remote_connect()
>
> 29a.
> I felt it might be more helpful to say "GUC primary_conninfo" instead
> of just 'primary_conninfo' the first time this is mentioned.
>

fixed.

> ~
>
> 29b.
> /connect to primary/connect to the primary/
>
> ~
>
> 29c.
> /is expected to have/is required to specify/
>
> ~~~
>
> 30. reconnect_if_needed
>
> +/*
> + * Reconnect to remote (primary) server if PrimaryConnInfo got changed.
> + */
> +static WalReceiverConn *
> +reconnect_if_needed(WalReceiverConn *wrconn_prev, char *conninfo_prev)
>
> /got changed/has changed/
>
> ~~~
>
> 31.
> +static WalReceiverConn *
> +reconnect_if_needed(WalReceiverConn *wrconn_prev, char *conninfo_prev)
> +{
> + WalReceiverConn *wrconn = NULL;
> +
> + /* If no change in PrimaryConnInfo, return previous connection itself */
> + if (strcmp(conninfo_prev, PrimaryConnInfo) == 0)
> + return wrconn_prev;
> +
> + walrcv_disconnect(wrconn);
> + wrconn = remote_connect();
> + return wrconn;
> +}
>
> /return previous/return the previous/
>
> Disconnect NULL is a bug isn't it? Don't you mean to disconnect 'wrconn_prev'?
>

Fixed

> ~~~
>
> 32. slotsync_worker_detach
>
> +/*
> + * Detach the worker from DSM and update 'proc' and 'in_use'.
> + * Logical replication launcher will come to know using these
> + * that the worker has shutdown.
> + */
> +static void
> +slotsync_worker_detach(int code, Datum arg)
> +{
> + dsa_detach((dsa_area *) DatumGetPointer(arg));
> + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
> + MySlotSyncWorker->hdr.in_use = false;
> + MySlotSyncWorker->hdr.proc = NULL;
> + LWLockRelease(SlotSyncWorkerLock);
> +}
>
> I expected this function to be in the same module as
> slotsync_worker_attach. It seems a bit strange to have them separated.
>

Both now are part of launcher.c file

> ~~~
>
> 33. ReplSlotSyncMain
>
> + ereport(ERROR,
> + (errmsg("The dbname not specified in primary_conninfo, skipping"
> + " slots synchronization"),
> + errhint("Specify dbname in primary_conninfo for slots"
> + " synchronization to proceed")));
>
> /not specified in/was not specified in/
>
> /slots synchronization/slot synchronization/ (??) -- there are multiple of these
>
> ~
>
> 34.
> + /*
> + * Connect to the database specified by user in PrimaryConnInfo. We need
> + * database connection for walrcv_exec to work. Please see comments atop
> + * libpqrcv_exec.
> + */
>
> /database connection/a database connection/
>
> ~~~
>
> 35.
> + /* Reconnect if primary_conninfo got changed */
> + if (config_reloaded)
> + wrconn = reconnect_if_needed(wrconn, conninfo_prev);
>
> SUGGESTION
> Reconnect if GUC primary_conninfo has changed.
>
> ~
>
> 36.
> + /*
> + * The slot-sync worker must not get here because it will only stop when
> + * it receives a SIGINT from the logical replication launcher, or when
> + * there is an error. None of these cases will allow the code to reach
> + * here.
> + */
> + Assert(false);
>
> 36a.
> /must not/cannot/
>
> 36b.
> "None of these cases will allow the code to reach here." <-- redundant sentence
>

Fixed all above.

This patch-set also fixes the crash reported by Kuroda-san, thanks to
Shveta for that fix.

regards,
Ajin Cherian
Fujitsu Australia

Attachment Content-Type Size
v21-0001-Allow-logical-walsenders-to-wait-for-physical-st.patch application/octet-stream 25.9 KB
v21-0002-Add-logical-slot-sync-capability-to-physical-sta.patch application/octet-stream 107.0 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Dilip Kumar 2023-10-05 08:58:53 Re: [PoC] pg_upgrade: allow to upgrade publisher node
Previous Message David Rowley 2023-10-05 08:24:28 Re: Making aggregate deserialization (and WAL receive) functions slightly faster