Re: Synchronizing slots from primary to standby

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: shveta malik <shveta(dot)malik(at)gmail(dot)com>
Cc: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>
Subject: Re: Synchronizing slots from primary to standby
Date: 2023-10-24 07:32:04
Message-ID: CAHut+PspseC03Fhsi=OqOtksagspE+0MVOhrhhUb64cc_4SE1w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are some review comments for v24-0001

======
1. GENERAL - failover slots terminology

There is inconsistent terminology, such as below. Try to use the same
wording everywhere.
- failover logical slots
- failover slots
- logical failover slots
- logical replication failover slots
- etc.

These are in many places - comments, function names, constants etc.

~~~

2. GENERAL - THE

s/primary.../the primary.../
s/standby.../the standby.../

Missing "the" problems remain in multiple places in the patch.

~~~

3. GENERAL - messages

I searched all the ereports and elogs (the full list is below only for
reference). There are many little quirks:

3a. Sometimes messages say "primary"; sometimes "primary server" etc.
Be consistent.

3b. /primary/the primary/

3c. Sometimes messages include errcode and sometimes they do not; Are
they deliberate or are there missing errcodes?

3d. At least one message has unwanted trailing space

3e. Sometimes using errcode and/or errmsg enclosed in parentheses;
sometimes not. AFAIK it is not necessary anymore.

3f. Inconsistent terminology "slot" V "failover slots" V "failover
logical slots" etc mentioned in the previous review comment #1

3g. Sometimes messages "slot creation aborted"; Sometimes "aborting
slot creation". Be consistent.

3h. s/lsn/LSN/

3i. s/move it backward/move it backwards/

3j. Sometimes LOG message starts uppercase; Sometimes lowercase. Be consistent.

3k. typo: s/and and/and/

3l. "worker %d" V "worker%d"

~

Messages:

ereport(ERROR, (errmsg("could not receive failover slots dbinfo from
the primary server: %s", pchomp(PQerrorMessage(conn->streamConn)))));
ereport(ERROR, (errmsg("invalid response from primary server"),
errdetail("Could not get failover slots dbinfo: got %d fields, "
"expected 1", nfields)));
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid
connection string syntax: %s", errcopy)));
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("replication slot-sync worker slot %d is " "empty, cannot
attach", slot)));
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("replication slot-sync worker slot %d is " "already used by
another worker, cannot attach", slot)));
ereport(ERROR, (errmsg("could not connect to the primary server: %s", err)));
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use replication slot \"%s\" for logical decoding",
NameStr(slot->data.name)), errdetail("This slot is being synced from
the primary."), errhint("Specify another replication slot.")));
ereport(ERROR, (errmsg("could not fetch slot info for slot \"%s\"
from" " the primary: %s", remote_slot->name, res->err)));
ereport(ERROR, (errmsg("could not fetch slot info for slot \"%s\"
from" " the primary: %s", remote_slot->name, res->err)));
ereport(ERROR, (errmsg("could not fetch invalidation cause for slot
\"%s\" from" " primary: %s", slot_name, res->err)));
ereport(ERROR, (errmsg("slot \"%s\" disappeared from the primary", slot_name)));
ereport(ERROR, (errmsg("could not fetch failover logical slots info
from the primary: %s", res->err)));
ereport(ERROR, (errmsg("could not connect to the primary server: %s", err)));
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not map dynamic shared memory " "segment for slot-sync
worker")));
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot drop replication slot \"%s\"", name), errdetail("This
slot is being synced from the primary.")));
ereport(ERROR, (errmsg("could not receive failover slots dbinfo from
the primary server: %s", pchomp(PQerrorMessage(conn->streamConn)))));
ereport(ERROR, (errmsg("invalid response from primary server"),
errdetail("Could not get failover slots dbinfo: got %d fields, "
"expected 1", nfields)));
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("invalid
connection string syntax: %s", errcopy)));

ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"), errhint("You might need to
increase %s.", "max_worker_processes")));
ereport(WARNING, (errmsg("replication slot-sync worker failed to
attach to " "worker-pool slot %d", worker_slot)));
ereport(WARNING, errmsg("skipping slots synchronization as
primary_slot_name " "is not set."));
ereport(WARNING, errmsg("skipping slots synchronization as
hot_standby_feedback " "is off."));
ereport(WARNING, errmsg("skipping slots synchronization as dbname is
not " "specified in primary_conninfo."));
ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("slot-sync wait for slot %s interrupted by promotion, " "slot
creation aborted", remote_slot->name)));
ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("slot-sync wait for slot %s interrupted by promotion, " "slot
creation aborted", remote_slot->name)));
ereport(WARNING, (errmsg("slot \"%s\" disappeared from the primary,
aborting" " slot creation", remote_slot->name)));
ereport(WARNING, (errmsg("slot \"%s\" invalidated on primary,
aborting" " slot creation", remote_slot->name)));
ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("slot-sync for slot \"%s\" interrupted by promotion, " "sync
not possible", remote_slot->name)));
ereport(WARNING, errmsg("skipping sync of slot \"%s\" as the received
slot-sync " "lsn %X/%X is ahead of the standby position %X/%X",
remote_slot->name, LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
LSN_FORMAT_ARGS(WalRcv->latestWalEnd)));
ereport(WARNING, errmsg("not synchronizing slot %s; synchronization
would move" " it backward", remote_slot->name));

ereport(LOG, (errmsg("Dropped replication slot \"%s\" ",
NameStr(local_slot->data.name))));
ereport(LOG, (errmsg("Added database %d to replication slot-sync "
"worker %d; dbcount now: %d", dbid, worker_slot, worker->dbcount)));
ereport(LOG, (errmsg("Added database %d to replication slot-sync "
"worker %d; dbcount now: %d", dbid, worker_slot, worker->dbcount)));
ereport(LOG, (errmsg("Stopping replication slot-sync worker %d", slot)));
ereport(LOG, (errmsg("removed database %d from replication slot-sync "
"worker %d; dbcount now: %d", wdbid, worker->slot, worker->dbcount)));
ereport(LOG, errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and
catalog xmin" " (%u) to pass local slot LSN (%X/%X) and and catalog
xmin (%u)", remote_slot->name,
LSN_FORMAT_ARGS(remote_slot->restart_lsn), remote_slot->catalog_xmin,
LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
MyReplicationSlot->data.catalog_xmin));
ereport(LOG, errmsg("wait over for remote slot \"%s\" as its LSN
(%X/%X)" " and catalog xmin (%u) has now passed local slot LSN" "
(%X/%X) and catalog xmin (%u)", remote_slot->name,
LSN_FORMAT_ARGS(new_restart_lsn), new_catalog_xmin,
LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
MyReplicationSlot->data.catalog_xmin));
ereport(LOG, errmsg("Replication slot-sync worker %d is shutting" "
down on receiving SIGINT", MySlotSyncWorker->slot));
ereport(LOG, errmsg("Replication slot-sync worker %d started", worker_slot));

elog(DEBUG1, "allocated dsa for slot-sync worker for dbcount: %d",
DB_PER_WORKER_ALLOC_INIT);
elog(DEBUG1, "logical replication launcher started"); elog(DEBUG2,
"slot-sync worker%d's query:%s \n", MySlotSyncWorker->slot, s.data);

~~~

4. GENERAL - SlotSyncWorker loops

When iterating slot-sync workers the code sometimes looks like

+ for (int i = 0; i < max_slotsync_workers; i++)
+ {
+ SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];

and other times it looks like

+ for (int widx = 0; widx < max_slotsync_workers; widx++)
+ {
+ SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx];

etc.

It would be better if such loops would use the same loop variable and
SlotSyncWorker variable names; consistency will make the code easier
to read.

======
Commit message

5.
GUC 'enable_syncslot' enables a physical_satndby to synchronize logical
replication failover slots from the primary server.

s/physical_satndby/physical standby/

## I think this one is already fixed in the latest v25.

~~~

6.
The logical slots created by slot-sync workers on physical standbys are
not allowed to be consumed and dropped. Any attempt to perform logical decoding
on such slots will result in an error.

~

SUGGESTION
The logical slots created by slot-sync workers on physical standbys are
not allowed to be dropped or consumed. Any attempt to perform logical decoding
on such slots will result in an error.

======
doc/src/sgml/config.sgml

7.
+ <para>
+ Specify dbname in <varname>primary_conninfo</varname> string
+ to allow synchronization of slots from the primary to standby.
+ This will only be used for slot synchronization. It is ignored
+ for streaming.
</para>

Maybe better to use <literal> for dbname.

~~~

8.
+ </varlistentry>
+
+
</variablelist>

Extra blank link not needed.

======
.../libpqwalreceiver/libpqwalreceiver.c

9. libpqrcv_get_dbname_from_conninfo

+ for (opt = opts; opt->keyword != NULL; ++opt)
+ {
+ /* If multiple dbnames are used, then the last one will be returned */

s/are used/are specified/

======
src/backend/replication/logical/launcher.c

10. slotsync_worker_launch_or_reuse

+ MemoryContext oldcontext;
+ uint32 alloc_count = 0;
+ uint32 old_dbcnt = 0;
+ Oid *old_dbids = NULL;

No need to assign these in the declaration, because they get
unconditionally assigned before they are inspected anyhow.

~~~

11.
+ /* Prepare the new worker. */
+ worker->hdr.launch_time = GetCurrentTimestamp();
+ worker->hdr.in_use = true;
+
+ /*
+ * 'proc' and 'slot' will be assigned in ReplSlotSyncWorkerMain when we
+ * attach this worker to a particular worker-pool slot
+ */
+ worker->hdr.proc = NULL;
+ worker->slot = -1;
+
+ /* TODO: do we really need 'generation', analyse more here */
+ worker->hdr.generation++;
+
+ /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */
+ handle = slotsync_dsa_setup(worker);

It is confusing for some of the worker members to be initialized here
and other worker members (like `dbcount`) to be initialized within the
function slotsync_dsa_setup(). It might be better if all the field
initialization can be kept together -- e.g. combined in a new function
'slotsync_worker_setup()'.

~~~

12.
+ /* Check if current DB is still present in remote-db-list */
+ foreach(lc, remote_dbs)
+ {
+ WalRcvFailoverSlotsData *failover_slot_data = lfirst(lc);
+
+ if (failover_slot_data->dboid == wdbid)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ /* If not found, then delete this db from worker's db-list */
+ if (!found)
+ {
+ if (dbidx < (worker->dbcount - 1))
+ {
+ /* Shift the DBs and get rid of wdbid */
+ memmove(&dbids[dbidx], &dbids[dbidx + 1],
+ (worker->dbcount - dbidx - 1) * sizeof(Oid));
+ }
+
+ worker->dbcount--;
+
+ ereport(LOG,
+ (errmsg("removed database %d from replication slot-sync "
+ "worker %d; dbcount now: %d",
+ wdbid, worker->slot, worker->dbcount)));
+ }
+
+ /* Else move to next db-position */
+ else
+ {
+ dbidx++;
+ }

This code might be simpler if you just remove the whole "Else move..."
part and instead just increment the `dbidx` at the same time you set
found = true;s/

For example,

if (failover_slot_data->dboid == wdbid)
{
/* advance worker to next db-position */
found = true;
dbidxid++;
break;
}

~~~

13. slotsync_remote_connect

+/*
+ * Connect to the primary server for slotsync purpose and return the connection
+ * info.
+ */
+static WalReceiverConn *
+slotsync_remote_connect()
+{
+ WalReceiverConn *wrconn = NULL;
+ char *err;
+ char *dbname;

No need to assign NULL there. It will be overwritten before it is used.

~~~

14.
Ajins's previous explanation ([1] #27) of why some of the checks have
warnings and some do not was helpful; IMO this should be written as a
comment in this function.

+ /* The primary_slot_name is not set */
+ if (!WalRcv || WalRcv->slotname[0] == '\0')
+ {
+ ereport(WARNING,
+ errmsg("skipping slots synchronization as primary_slot_name "
+ "is not set."));
+ return NULL;
+ }
+
+ /* The hot_standby_feedback must be ON for slot-sync to work */
+ if (!hot_standby_feedback)
+ {
+ ereport(WARNING,
+ errmsg("skipping slots synchronization as hot_standby_feedback "
+ "is off."));
+ return NULL;
+ }
+
+ /* The dbname must be specified in primary_conninfo for slot-sync to work */
+ dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
+ if (dbname == NULL)
+ {
+ ereport(WARNING,
+ errmsg("skipping slots synchronization as dbname is not "
+ "specified in primary_conninfo."));
+ return NULL;
+ }

Add a new comment above all those:

SUGGESTION
/*
* Check that other GUC settings (primary_slot_name,
hot_standby_feedback, primary_conninfo)
* are compatible with slot synchronization.
*/

~~~

15. slotsync_configs_changed

+static bool
+slotsync_configs_changed()
+{
+ if ((EnableSyncSlotPreReload != enable_syncslot) ||
+ (HotStandbyFeedbackPreReload != hot_standby_feedback) ||
+ (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0) ||
+ (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0))
+ {
+ return true;
+ }
+
+ return false;
+}

Might as well write this as a single return. Also, IMO it is more
natural to write as "if the <now_value> is different to <prev_value>"
instead of the other way around

For example:

return
(enable_syncslot != EnableSyncSlotPreReload) ||
(hot_standby_feedback != HotStandbyFeedbackPreReload) ||
(strcmp(PrimaryConnInfo, PrimaryConnInfoPreReload) != 0) ||
(strcmp(WalRcv->slotname,PrimarySlotNamePreReload) != 0);

~~~

16. slotsync_configs_changed

+ foreach(lc, slots_dbs)
+ {
+ WalRcvFailoverSlotsData *failover_slot_data = lfirst(lc);
+ SlotSyncWorker *w;
+
+ Assert(OidIsValid(failover_slot_data->dboid));
+
+ LWLockAcquire(SlotSyncWorkerLock, LW_SHARED);
+ w = slotsync_worker_find(failover_slot_data->dboid);
+ LWLockRelease(SlotSyncWorkerLock);
+
+ if (w != NULL)
+ continue; /* worker is running already */
+
+ /*
+ * If we failed to launch this slotsync worker, return and try
+ * launching the failed and remaining workers in next sync-cycle. But
+ * change launcher's wait time to minimum of
+ * wal_retrieve_retry_interval and default wait time to try next
+ * sync-cycle sooner.
+ */
+ if (!slotsync_worker_launch_or_reuse(failover_slot_data->dboid))
+ {
+ *wait_time = Min(*wait_time, wal_retrieve_retry_interval);
+ break;
+ }
+ }

Nit: IMO when the variable scope is small (when you can easily see the
declaration and every usage in a few lines) having such long
descriptive makes the code *less* instead of more readable.

SUGGESTION

s/failover_slot_data/slot_data/

OR

s/failover_slot_data/sdata/

======
src/backend/replication/logical/slotsync.c

17.
+ * This file contains the code for slot-sync workers on physical standby
+ * to fetch logical failover slots information from the primary server,
+ * create the slots on the standby and synchronize them periodically.

s/on physical standby/on the physical standby/

~~~

18. slot_exists_in_list

+ if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
+ {
+ /*
+ * if remote slot is marked as non-conflicting (i.e. not
+ * invalidated) but local slot is marked as invalidated, then set
+ * the bool.
+ */
+ if (!remote_slot->conflicting &&
+ local_slot->data.invalidated != RS_INVAL_NONE)
+ *locally_invalidated = true;
+
+ return true;
+ }

Isn't it better to *always* set that 'locally_invalidated' flag for a
found slot? Otherwise, you are assuming that the flag value was
initially false, but maybe it was not.

SUGGESTION
/*
* Is the remote slot is marked as non-conflicting (i.e. not
* invalidated) when the local slot is marked as invalidated?
*/
*locally_invalidated =
!remote_slot->conflicting &&
(local_slot->data.invalidated != RS_INVAL_NONE);

~~

19. get_remote_invalidation_cause

+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch invalidation cause for slot \"%s\" from"
+ " primary: %s", slot_name, res->err)));

(already mentioned in general review comment)

s/from primary/from the primary/

~~~

20.
+/*
+ * Drop obsolete slots
+ *
+ * Drop the slots that no longer need to be synced i.e. these either
+ * do not exist on primary or are no longer enabled as failover slots.

(??)

s/enabled as failover slots/designated as failover slots/

OR

s/enabled as failover slots/enabled for failover

~~~

21. construct_slot_query

+static void
+construct_slot_query(StringInfo s, Oid *dbids)
+{
+ Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED));
+
+ appendStringInfo(s,
+ "SELECT slot_name, plugin, confirmed_flush_lsn,"
+ " restart_lsn, catalog_xmin, two_phase, conflicting, "
+ " database FROM pg_catalog.pg_replication_slots"
+ " WHERE enable_failover=true and database IN ");

/WHERE enable_failover=true and database IN/WHERE enable_failover AND
database IN/

### I noticed the code is a tiny bit different in v25, but the review
comment is still relevant.

~~~

22. synchronize_slots

+/*
+ * Synchronize slots.
+ *
+ * It gets the failover logical slots info from the primary server
for the dbids
+ * managed by this worker and then updates the slots locally as per the info
+ * received. It creates the slots if not present on the standby.
+ *
+ * It returns nap time for the next sync-cycle.
+ */

Comment can be re-worded to not say "it" everywhere.

======
src/backend/replication/walsender.c

23.
+ /*
+ * Check if the database OID is already in the list, and if so, skip
+ * this slot.
+ */
+ if (list_member_oid(database_oids_list, dboid))
+ continue;

Simplify the comment

SUGGESTION
Skip this slot if the database OID is already in the list.

======
src/backend/utils/activity/wait_event_names.txt

24.
+REPL_SLOTSYNC_MAIN "Waiting in main loop of slot-sync worker."
+REPL_SLOTSYNC_PRIMARY_CATCHUP "Waiting for primary to catch-up, in
slot-sync worker."

(this was already mentioned in the general review comment)

s/primary/the primary/

======
src/include/postmaster/bgworker_internals.h

25.
#define MAX_PARALLEL_WORKER_LIMIT 1024
+#define MAX_SLOTSYNC_WORKER_LIMIT 50

This constant seems to be not used anywhere except in guc_tables.c
where the GUC is defined. IMO you should make use of this in some
Assert or a message; Otherwise, might as well just remove it and
hardwire the 50 in the guc_tables.c directly.

======
src/include/replication/walreceiver.h

26. WalRcvFailoverSlotsData

+/*
+ * Failover logical slots dbids received from remote.
+ */
+typedef struct WalRcvFailoverSlotsData
+{
+ Oid dboid;
+} WalRcvFailoverSlotsData;
+

For now, the only data is `dbids` but maybe one day there will be more
stuff, so make the struct comment more generic.

SUGGESTION
Failover logical slots data received from remote.

======
src/include/replication/worker_internal.h

27. LogicalRepWorkerType
+
+typedef struct LogicalRepWorker
+{
+ LogicalWorkerHeader hdr;
+
+ /* What type of worker is this? */
+ LogicalRepWorkerType type;
+

Maybe add some struct-level comments for this.

======
[1] https://www.postgresql.org/message-id/CAFPTHDaqn%2Bm47_vkAToQD6Pe8diut0F0g0bSr8PdcuW6cbSSkQ%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Peter Smith 2023-10-24 07:35:42 Re: Synchronizing slots from primary to standby
Previous Message Étienne BERSAC 2023-10-24 07:12:46 Re: RFC: Logging plan of the running query