Re: Synchronizing slots from primary to standby

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: shveta malik <shveta(dot)malik(at)gmail(dot)com>
Cc: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>
Subject: Re: Synchronizing slots from primary to standby
Date: 2023-09-15 08:51:44
Message-ID: CAHut+PvX1q8CyO+QHUTnn_aPg3PM9eDUOdQGyWdKFKVoDfo67g@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi. Here are some review comments for v17-0002.

This is a WIP and a long way from complete, but I wanted to send what
I have so far (while it is still current with your latest posted
patches).

======
1. GENERAL - loop variable declaration

There are some code examples like below where the loop variable is
declared within the for. AFAIK this style of declaration is atypical
for the PG source.

+ /* Find unused worker slot. */
+ for (int i = 0; i < max_slotsync_workers; i++)

Search/Replace.

~~~

2. GENERAL - from primary

There are multiple examples in messages and comments that say "from
primary". I felt most would be better to say "from the primary".

Search/Replace.

~~~

3. GENERAL - pg_indent

There are lots of examples of function arguments like "* worker"
(with space) which changed to "*worker" (without space) in v16 and
then changed back to "* worker" with space in v17.

Can all these toggles be cleaned up by running pg_indent?

======
Commit message.

4.
This patch attempts to implement logical replication slots synchronization
from primary server to physical standby so that logical subscribers are not
blocked after failover. Now-on, all the logical replication slots created on
primary (assuming configurations are appropriate) are automatically created
on physical standbys and are synced periodically. This has been acheived
by starting slot-sync worker(s) on standby server which pings primary at
regular intervals to get the logical slots information and create/update the
slots locally.

SUGGESTION (just minor rewording)
This patch implements synchronization of logical replication slots
from the primary server to the physical standby so that logical
subscribers are not blocked after failover. All the logical
replication slots on the primary (assuming configurations are
appropriate) are automatically created on the physical standbys and
are synced periodically. Slot-sync worker(s) on the standby server
ping the primary at regular intervals to get the necessary logical
slot information and create/update the slots locally.

~

5.
For max number of slot-sync workers on standby, new GUC max_slotsync_workers
has been added, default value and max value is kept at 2 and 50 respectively.
This parameter can only be set at server start.

5a.
SUGGESTION (minor rewording)
A new GUC 'max_slotsync_workers' defines the maximum number of
slot-sync workers on the standby: default value = 2, max value = 50.
This parameter can only be set at server start

~

5b.
Actually, I think mentioning the values 2 and 50 here might be too
much detail, but I left it anyway. Consider removing that.

~~~

6.
Now replication launcher on physical standby queries primary to get list
of dbids which belong to slots mentioned in GUC 'synchronize_slot_names'.
Once it gets the dbids, if dbids < max_slotsync_workers, it starts only
that many workers and if dbids > max_slotsync_workers, it starts
max_slotsync_workers and divides the work equally among them. Each worker
is then responsible to keep on syncing the concerned logical slots belonging
to the DBs assigned to it.

~

6a.
SUGGESTION (first sentence)
Now the replication launcher on the physical standby queries primary
to get the list of dbids that belong to the...

~

6b.
"concerned" ??

~~~

7.
Let us say slots mentioned in 'synchronize_slot_names' on primary belongs to
4 DBs and say 'max_slotsync_workers' is 4, then a new worker will be launched
for each db. If a new logical slot with a different DB is found by replication
launcher, it will assign this new db to the worker handling the minimum number
of dbs currently (or first worker in case of equal count).

~

/Let us say/For example, let's say/

~~~

8.
The naptime of worker is tuned as per the activity on primary. Each worker
starts with naptime of 10ms and if no activity is observed on primary for some
time, then naptime is increased to 10sec. And if activity is observed again,
naptime is reduced back to 10ms. Each worker does it by choosing one slot
(first one assigned to it) for monitoring purpose. If there is no change
in lsn of that slot for say over 10 sync-checks, naptime is increased to 10sec
and as soon as a change is observed, naptime is reduced back to 10ms.

~

/as per the activity on primary/according to the activity on the primary/

/is observed on primary/is observed on the primary/

/Each worker does it by choosing one slot/Each worker uses one slot/

~~~

9.
If there is any change in synchronize_slot_names, then the slots which are no
longer part of it or the ones which no longer exist on primary will be dropped
by slot-sync workers on physical standbys.

~

9a.
/on primary/on the primary/

/which no longer exist/that no longer exist/

~

9b.
I didn't really understand why this says "or the ones which no longer
exist". IIUC (from prior paragraph) such slots would already be
invalidated/removed by the sync-slot worker in due course -- i.e. we
don't need to wait for some change to the 'synchronize_slot_names'
list to trigger that deletion, right?

======
doc/src/sgml/config.sgml

10.
+ <varlistentry id="guc-max-slotsync-workers"
xreflabel="max_slotsync_workers">
+ <term><varname>max_slotsync_workers</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>max_slotsync_workers</varname> configuration
parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Specifies maximum number of slot synchronization workers.
+ </para>
+ <para>
+ Slot synchronization workers are taken from the pool defined by
+ <varname>max_worker_processes</varname>.
+ </para>
+ <para>
+ The default value is 2. This parameter can only be set at server
+ start.
+ </para>
+ </listitem>
+ </varlistentry>

This looks OK, but IMO there also needs some larger description (here
or elsewhere?) about this feature more generally. Otherwise, why would
the user change the 'max_slotsync_workers' when there is nothing to
say "slot synchronization workers" are for?

======
src/backend/postmaster/bgworker.c

11.
{
"ApplyWorkerMain", ApplyWorkerMain
},
+ {
+ "ReplSlotSyncMain", ReplSlotSyncMain
+ },
{
"ParallelApplyWorkerMain", ParallelApplyWorkerMain
},

~

I thought this entry point name/function should include the word
"Worker" same as for the others.

======
.../libpqwalreceiver/libpqwalreceiver.c

12.
+/*
+ * Get DB info for logical slots
+ *
+ * It gets the DBIDs for slot_names from primary. The list obatined has no
+ * duplicacy of DBIds.
+ */
+static List *
+libpqrcv_get_dbinfo_for_logical_slots(WalReceiverConn *conn,
+ const char *slot_names)

12a.
typo /obatined/

SUGGESTION
The returned list has no duplicates.

~

12b.
I did not recognise any part of the function logic ensuring no
duplicates are returned. IIUC it is actually within the logic of
LIST_DBID_FOR_LOGICAL_SLOTS that this is handled, so maybe the comment
can mention that.

~~~

13. libpqrcv_get_dbinfo_for_logical_slots

+ if (PQnfields(res) != 1)
+ {
+ int nfields = PQnfields(res);
+
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("invalid response from primary server"),
+ errdetail("Could not get list of slots: got %d fields, "
+ "expected %d fields.",
+ nfields, 1)));
+ }

Something seems not right about the message. The "expected" part
plurality is wrong, and if it can only be 1 then why use substitution?

======
src/backend/replication/logical/Makefile
OK

======
src/backend/replication/logical/launcher.c

14. slot_sync_worker_stop

+static void
+slot_sync_worker_stop(SlotSyncWorker *worker)
+{
+
+ Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED));

...

+ LWLockAcquire(SlotSyncWorkerLock, LW_SHARED);
+ }
+
+}

Unnecessary whitespace at the top and bottom of this function.

~~~

15. slot_sync_worker_launch_or_reuse

+ /* Find unused worker slot. */
+ for (int i = 0; i < max_slotsync_workers; i++)

loop variable declaration.

~~~

16. slot_sync_worker_launch_or_reuse

+ if (!worker)
+ {
+ for (int i = 0; i < max_slotsync_workers; i++)

loop variable declaration.

~~~

17. slot_sync_remove_obsolete_dbs

+ /* Traverse slot-sync-workers to validate the DBs */
+ for (int widx = 0; widx < max_slotsync_workers; widx++)
+ {

loop variable declaration.

~

18.
+ for (int dbidx = 0; dbidx < worker->dbcount;)
+ {

loop variable declaration

~

19.
+ for (int i = dbidx; i < worker->dbcount; i++)
+ {

loop variable declaration

~

20.
+ /* If dbcount for any worker has become 0, shut it down */
+ for (int widx = 0; widx < max_slotsync_workers; widx++)
+ {

loop variable declaration

~

21.
+ }
+
+}
+

Unnecessary whitespace at the end of the function body

~~~

22. ApplyLauncherStartSubs

+static void
+ApplyLauncherStartSubs(long *wait_time)
+{

Missing function comment.

======
.../replication/logical/logicalfuncs.c
OK

======
src/backend/replication/logical/meson.build
OK

======
src/backend/replication/logical/slotsync.c

23.
+/*-------------------------------------------------------------------------
+ * slotsync.c
+ * PostgreSQL worker for synchronizing slots to a standby from primary
+ *
+ * Copyright (c) 2016-2018, PostgreSQL Global Development Group
+ *

Wrong copyright date?

~~~

24.
+ * This file contains the code for slot-sync worker on physical standby that
+ * fetches the logical replication slots information from primary server
+ * (PrimaryConnInfo) and creates the slots on standby and synchronizes them
+ * periodically. It synchronizes only the slots configured in
+ * 'synchronize_slot_names'.

SUGGESTION
This file contains the code for slot-sync workers on physical standby
to fetch logical replication slot information from the primary server
(PrimaryConnInfo), create the slots on the standby, and synchronize
them periodically. Slot-sync workers only synchronize slots configured
in 'synchronize_slot_names'.

~~~

25.
+ * It takes a nap of WORKER_DEFAULT_NAPTIME before every next synchronization.
+ * If there is no acitivity observed on primary for sometime, it increases the
+ * naptime to WORKER_INACTIVITY_NAPTIME and as soon as any activity
is observed,
+ * it brings back the naptime to default value.

SUGGESTION (2nd sentence)
If there is no activity observed on the primary for some time, the
naptime is increased to WORKER_INACTIVITY_NAPTIME, but if any activity
is observed, the naptime reverts to the default value.

~~~

26.
+typedef struct RemoteSlot
+{
+ char *name;
+ char *plugin;
+ char *database;
+ bool two_phase;
+ bool conflicting;
+ XLogRecPtr restart_lsn;
+ XLogRecPtr confirmed_lsn;
+ TransactionId catalog_xmin;
+
+ /* RS_INVAL_NONE if valid, or the reason of invalidation */
+ ReplicationSlotInvalidationCause invalidated;
+} RemoteSlot;

This deserves at least a struct-level comment.

~~~

27.
+/*
+ * Inactivity Threshold Count before increasing naptime of worker.
+ *
+ * If the lsn of slot being monitored did not change for these many times,
+ * then increase naptime of current worker from WORKER_DEFAULT_NAPTIME to
+ * WORKER_INACTIVITY_NAPTIME.
+ */
+#define WORKER_INACTIVITY_THRESHOLD 10

I felt this constant would be better expressed as a time interval
instead of a magic number. You can easily derive that loop count
anyway in the code logic. e.g. here the comment would be "If the lsn
of the slot being monitored did not change for XXXms then...".

~~~

28. wait_for_primary_slot_catchup
+/*
+ * Wait for remote slot to pass localy reserved position.
+ */
+static void
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, char *slot_name,
+ XLogRecPtr min_lsn)

/localy/locally/

~~~

29. wait_for_primary_slot_catchup

+ ereport(ERROR,
+ (errmsg("slot \"%s\" disapeared from primary",
+ slot_name)));

/disapeared/disappeared/

~~~

30. ReplSlotSyncMain

+ if (!dsa)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not map dynamic shared memory "
+ "segment for slot-sync worker")));
+
+
+ /* Primary initialization is complete. Now, attach to our slot. */

Unnecessary double whitespace.

======
src/backend/replication/logical/tablesync.c
OK

======
src/backend/replication/repl_gram.y
OK

======
src/backend/replication/repl_scanner.l
OK

======
src/backend/replication/slot.c

======
src/backend/replication/slotfuncs.c

31.
+/*
+ * SQL function for getting invalidation cause of a slot.
+ *
+ * Returns ReplicationSlotInvalidationCause enum value for valid slot_name;
+ * returns NULL if slot with given name is not found.
+ */
+Datum
+pg_get_invalidation_cause(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+ ReplicationSlotInvalidationCause cause;
+ int slotno;
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (slotno = 0; slotno < max_replication_slots; slotno++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[slotno];
+ if (strcmp(NameStr(s->data.name), NameStr(*name)) == 0)
+ {
+ cause = s->data.invalidated;
+ PG_RETURN_INT16(cause);
+ }
+ }
+ LWLockRelease(ReplicationSlotControlLock);
+
+ PG_RETURN_NULL();
+}

31a.
There seems no check if the slot actually is invalidated. I guess in
that case the function just returns the enum value RS_INVAL_NONE, but
should that be mentioned in the function header comment?

~

31b.
Seems a poor choice of function name -- does not even have the word
"slot" in the name (??).

~

31c.
IMO it is better to have a blankline after the declaration in the loop.

~

31b.
Might be simpler just to remove that 'cause' variable. It's not doing much.

======
src/backend/replication/walsender.c

32. ListSlotDatabaseOIDs

+/*
+ * Handle the LIST_SLOT_DATABASE_OIDS command.
+ */
+static void
+ListSlotDatabaseOIDs(ListDBForLogicalSlotsCmd *cmd)

32a.
The function-level comment seems too terse. Just saying "handle the
command" does not describe what this function is actually doing and
how it does it.

~

32b.
Is "LIST_SLOT_DATABASE_OIDS" even the correct name? I don't see that
anywhere else in this patch.

AFAICT it should be "LIST_DBID_FOR_LOGICAL_SLOTS".

~

33. ListSlotDatabaseOIDs - comments

The comments in the body of this function are inconsistent begining
uppercase/lowercase

~

34. ListSlotDatabaseOIDs - sorting/logic

Maybe explain better the reason for having the qsort and other logic.

TBH, I was not sure of the necessity for the names lists and the
sorting and bsearch logic. AFAICT these are all *only* used to check
for uniqueness and existence of the slot name. So I was wondering if a
hashmap keyed by the slot name might be more appropriate and also
simpler than all this list sorting/searching.

~~

35. ListSlotDatabaseOIDs

+ for (int slotno = 0; slotno < max_replication_slots; slotno++)
+ {

loop variable declaration

======
src/backend/storage/lmgr/lwlock.c
OK

======
src/backend/storage/lmgr/lwlocknames.txt
OK

======
.../utils/activity/wait_event_names.txt
TODO

======
src/backend/utils/misc/guc_tables.c
OK

======
src/backend/utils/misc/postgresql.conf.sample

36.
# primary to streaming replication standby server
+#max_slotsync_workers = 2 # max number of slot synchronization
workers on a standby

IMO it is better to say "maximum" instead of "max" in the comment.

(make sure the GUC description text is identical)

======
src/include/catalog/pg_proc.dat

37.
+{ oid => '6312', descr => 'get invalidate cause of a replication slot',
+ proname => 'pg_get_invalidation_cause', provolatile => 's',
proisstrict => 't',
+ prorettype => 'int2', proargtypes => 'name',
+ prosrc => 'pg_get_invalidation_cause' },

37a.
SUGGESTION (descr)
what caused the replication slot to become invalid

~

37b
'pg_get_invalidation_cause' seemed like a poor function name because
it doesn't have any context -- not even the word "slot" in it.

======
src/include/commands/subscriptioncmds.h
OK

======
src/include/nodes/replnodes.h
OK

======
src/include/postmaster/bgworker_internals.h

38.
#define MAX_PARALLEL_WORKER_LIMIT 1024
+#define MAX_SLOT_SYNC_WORKER_LIMIT 50

Consider SLOTSYNC instead of SLOT_SYNC for consistency with other
names of this worker.

======
OK

======
src/include/replication/logicalworker.h

39.
extern void ApplyWorkerMain(Datum main_arg);
extern void ParallelApplyWorkerMain(Datum main_arg);
extern void TablesyncWorkerMain(Datum main_arg);
+extern void ReplSlotSyncMain(Datum main_arg);

The name is not consistent with others nearby. At least it should
include the word "Worker" like everything else does.

======
src/include/replication/slot.h
OK

======
src/include/replication/walreceiver.h

40.
+/*
+ * Slot's DBid related data
+ */
+typedef struct WalRcvRepSlotDbData
+{
+ Oid database; /* Slot's DBid received from remote */
+ TimestampTz last_sync_time; /* The last time we tried to launch sync
+ * worker for above Dbid */
+} WalRcvRepSlotDbData;
+

Is that comment about field 'last_sync_time' correct? I thought this
field is the last time the slot was synced -- not the last time the
worker was launched.

======
src/include/replication/worker_internal.h

41.
- /* User to use for connection (will be same as owner of subscription). */
+ /* User to use for connection (will be same as owner of subscription
+ * in case of LogicalRep worker). */
Oid userid;
+} WorkerHeader;

41a.

This is not the normal style for a multi-line comment.

~

41b.
I wondered if the name "WorkerHeader" is just a bit *too* generic and
might cause future trouble because of the vague name.

~~~

42.
+typedef struct LogicalRepWorker
+{
+ WorkerHeader header;
+
+ /* What type of worker is this? */
+ LogicalRepWorkerType type;

/* Subscription id for the worker. */
Oid subid;
@@ -77,7 +84,7 @@ typedef struct LogicalRepWorker
* would be created for each transaction which will be deleted after the
* transaction is finished.
*/
- FileSet *stream_fileset;
+ struct FileSet *stream_fileset;

/*
* PID of leader apply worker if this slot is used for a parallel apply
@@ -96,6 +103,32 @@ typedef struct LogicalRepWorker
TimestampTz reply_time;
} LogicalRepWorker;

42a.
I suggest having some struct-level comments.

~

42b.
The field name 'header' is propagated all over the place. So, IMO
calling it 'hdr' instead of 'header' might be slightly less intrusive.
I think there are lots of precedents for calling headers as 'hdr'.

~

42c.
What was the FileSet field changed to struct FileSet? Aren't the
struct/typedef defined in the same place?

~~~

43.
+typedef struct SlotSyncWorker
+{
+ WorkerHeader header;
+
+ /* The slot in worker pool to which it is attached */
+ int slot;
+
+ /* Count of Database ids it manages */
+ uint32 dbcount;
+
+ /* DSA for dbids */
+ dsa_area *dbids_dsa;
+
+ /* dsa_pointer for database ids it manages */
+ dsa_pointer dbids_dp;
+
+ /* Info about slot being monitored for worker's naptime purpose */
+ struct SlotSyncWorkerWatchSlot
+ {
+ NameData slot_name;
+ XLogRecPtr confirmed_lsn;
+ int inactivity_count;
+ } monitoring_info;
+
+} SlotSyncWorker;

43a.
I suggest having some struct-level comments.

~

43b.
IMO it will avoid ambiguitities to be more explicit in the comments
instead of just saying "it" everywhere.

+ /* The slot in worker pool to which it is attached */
+ /* Count of Database ids it manages */
+ /* dsa_pointer for database ids it manages */

~

43c.
There is inconsistent wording and case in these comments. Just pick
one term to use everywhere.

"Database ids"
"database ids"
"dbids"

~~~

44. GENERAL = restructuring of common structs in worker_internal.h

The field name 'header' is propagated all over the place. It is OK,
and I guess there is no choice, but IMO calling it 'hdr' instead of
'header' might be slightly less intrusive. I think there are lots of
precedents for calling headers as 'hdr'.

======
src/include/storage/lwlock.h

======
src/tools/pgindent/typedefs.list

45.
Missing the the typedef WorkerHeader?

======
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Gabriele Bartolini 2023-09-15 09:16:09 Re: Possibility to disable `ALTER SYSTEM`
Previous Message Daniel Gustafsson 2023-09-15 08:13:10 Re: bug fix and documentation improvement about vacuumdb