Re: Synchronizing slots from primary to standby

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: shveta malik <shveta(dot)malik(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>
Subject: Re: Synchronizing slots from primary to standby
Date: 2023-11-21 05:39:21
Message-ID: CAHut+Pv-yu71ogj_hRi6cCtmD55bsyw7XTxj1Nq8yVFKpY3NDQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are some review comments for the patch v35-0001.

======
0. GENERAL documentation

I felt that the documentation gave details of the individual changes
(e.g. GUC 'standby_slot_names' and API, CREATE SUBSCRIPTION option,
and pg_replication_slots 'failover' attribute etc.) but there is
nothing that seemed to bring all these parts together to give examples
for user "when" and "how" to make all these parts work. I'm not sure
if there is some overview missing from this patch 00001 or if you are
planning that extra documentation for subsequent patches.

======
Commit message

1.
A new property 'failover' is added at the slot level which
is persistent information which specifies that this logical slot
is enabled to be synced to the physical standbys so that logical
replication can be resumed after failover. It is always false
for physical slots.

~

SUGGESTION
A new property 'failover' is added at the slot level. This is
persistent information to indicate that this logical slot...

~~~

2.
Users can set it during the create subscription or during
pg_create_logical_replication_slot. Examples:

create subscription mysub connection '..' publication mypub
WITH (failover = true);

--last arg
SELECT * FROM pg_create_logical_replication_slot('myslot',
'pgoutput', false, true, true);

~

2a.
Add a blank line before this

~

2b.
Use uppercase for the SQL

~

2c.
SUGGESTION
Users can set this flag during CREATE SUBSCRIPTION or during
pg_create_logical_replication_slot API.

Ex1.
CREATE SUBSCRIPTION mysub CONNECTION '...' PUBLICATION mypub
WITH (failover = true);

Ex2. (failover is the last arg)
SELECT * FROM pg_create_logical_replication_slot('myslot',
'pgoutput', false, true, true);

~~~

3.
This 'failover' is displayed as part of pg_replication_slots
view.

~

SUGGESTION
The value of the 'failover' flag is displayed as part of
pg_replication_slots view.

~~~

4.
A new GUC standby_slot_names has been added. It is the list of
physical replication slots that logical replication with failover
enabled waits for. The intent of this wait is that no logical
replication subscribers (with failover=true) should go
ahead of physical replication standbys (corresponding to the
physical slots in standby_slot_names).

~

4a.
SUGGESTION
A new GUC standby_slot_names has been added. This is a list of
physical replication slots that logical replication with failover
enabled will wait for.

~

4b.
/no logical replication subscribers/no logical replication subscriptions/

~

4c
/should go ahead of physical/should get ahead of physical/

======
contrib/test_decoding/sql/slot.sql

5.
+
+-- Test logical slots creation with 'failover'=true (last arg)
+SELECT 'init' FROM
pg_create_logical_replication_slot('failover_slot', 'test_decoding',
false, false, true);
+SELECT slot_name, slot_type, failover FROM pg_replication_slots;
+
+SELECT pg_drop_replication_slot('failover_slot');

How about a couple more simple tests:
a) pass false arg to confirm it is false in the view.
b) according to the docs this failover is optional, so try API without
passing it
c) create a physical slot to confirm it is false in the view.

======
doc/src/sgml/catalogs.sgml

6.
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>subfailoverstate</structfield> <type>char</type>
+ </para>
+ <para>
+ State codes for failover mode:
+ <literal>d</literal> = disabled,
+ <literal>p</literal> = pending enablement,
+ <literal>e</literal> = enabled
+ </para></entry>
+ </row>
+

This attribute is very similar to the 'subtwophasestate' so IMO it
would be better to be adjacent to that one in the docs.

(probably this means putting it in the same order in the catalog also,
assuming that is allowed)

======
doc/src/sgml/config.sgml

7.
+ <para>
+ List of physical replication slots that logical replication slots with
+ failover enabled waits for. If a logical replication connection is
+ meant to switch to a physical standby after the standby is promoted,
+ the physical replication slot for the standby should be listed here.
+ </para>
+ <para>
+ The standbys corresponding to the physical replication slots in
+ <varname>standby_slot_names</varname> must enable
+ <varname>enable_syncslot</varname> for the standbys to receive
+ failover logical slots changes from the primary.
+ </para>

That sentence mentioning 'enable_syncslot' seems premature because
AFAIK that GUC is not introduced until patch 0002. So this part should
be moved into the 0002 patch.

======
doc/src/sgml/ref/alter_subscription.sgml

8.
These commands also cannot be executed when the subscription has
<link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
- commit enabled, unless
+ commit enabled or
+ <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
+ enabled, unless
<link linkend="sql-createsubscription-params-with-copy-data"><literal>copy_data</literal></link>
is <literal>false</literal>. See column
<structfield>subtwophasestate</structfield>
- of <link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>
+ and <structfield>subfailoverstate</structfield> of
+ <link linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>
to know the actual two-phase state.
I think the last sentence doesn't make sense anymore because it is no
longer talking about only two-phase state.

BEFORE
See column subtwophasestate and subfailoverstate of pg_subscription to
know the actual two-phase state.

SUGGESTION
See column subtwophasestate and subfailoverstate of pg_subscription to
know the actual states.

======
doc/src/sgml/ref/create_subscription.sgml

9.
+
+ <varlistentry id="sql-createsubscription-params-with-failover">
+ <term><literal>failover</literal> (<type>boolean</type>)</term>
+ <listitem>
+ <para>
+ Specifies whether the replication slot assocaited with the
subscription
+ is enabled to be synced to the physical standbys so that logical
+ replication can be resumed from the new primary after failover.
+ The default is <literal>false</literal>.
+ </para>
+
+ <para>
+ The implementation of failover requires that replication
+ has successfully finished the initial table synchronization
+ phase. So even when <literal>failover</literal> is enabled for a
+ subscription, the internal failover state remains
+ temporarily <quote>pending</quote> until the initialization phase
+ completes. See column <structfield>subfailoverstate</structfield>
+ of <link
linkend="catalog-pg-subscription"><structname>pg_subscription</structname></link>
+ to know the actual failover state.
+ </para>
+
+ </listitem>
+ </varlistentry>

9a.
/assocaited/associated/

~

9b.
Unnecessary blank line before </listitem>

======
src/backend/commands/subscriptioncmds.c

10.
#define SUBOPT_ORIGIN 0x00004000
+#define SUBOPT_FAILOVER 0x00008000

Bad indentation

~~~

11. CreateSubscription
+ /*
+ * If only the slot_name is specified, it is possible that the user intends to
+ * use an existing slot on the publisher, so here we enable failover for the
+ * slot if requested.
+ */
+ else if (opts.slot_name && failover_enabled)
+ {
+ walrcv_alter_slot(wrconn, opts.slot_name, opts.failover);
+ ereport(NOTICE,
+ (errmsg("enabled failover for replication slot \"%s\" on publisher",
+ opts.slot_name)));
+ }

11a.
How does this code ensure that *only* slot_name was set (e.g the
comment says "only the slot_name is specified")

~

11b.
Should 3rd arg to walrcv_alter_slot be 'failover_enabled', or maybe just 'true'?

~~~

12. AlterSubscription

+ if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed
when failover is enabled"),
+ errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh =
false, or with copy_data = false, or use DROP/CREATE
SUBSCRIPTION.")));

12a.
This should have a comment like what precedes the sub->twophasestate
error. Or maybe group them both and use the same common comment.

~

12b.
AFAIK when there are messages like this that differ only by
non-translatable things ("failover" option) then that non-translatable
thing should be extracted as a parameter so the messages are common.
And, don't forget to add a /* translator: %s is a subscription option
like 'failover' */ comment.

SUGGESTION like:
errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed
when %s is enabled", "two_phase")
errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed
when %s is enabled", "failover")

~~~

13.
+ if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed
when failover is enabled"),
+ /* translator: %s is an SQL ALTER command */
+ errhint("Use %s with refresh = false, or with copy_data = false, or
use DROP/CREATE SUBSCRIPTION.",
+ isadd ?
+ "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
+ "ALTER SUBSCRIPTION ... DROP PUBLICATION")));

Same comment as above #12b.

SUGGESTION like:
errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed
when %s is enabled", "two_phase")
errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed
when %s is enabled", "failover")

~~~

14.
+ /*
+ * See comments above for twophasestate, same holds true for
+ * 'failover'
+ */
+ if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed
when failover is enabled"),
+ errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false,
or use DROP/CREATE SUBSCRIPTION.")));

IMO this is another message where the option should be extracted to
make a common message for the translators. And don't forget to add a
/* translator: %s is a subscription option like 'failover' */ comment.

SUGGESTION like:
errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed
when %s is enabled", "two_phase"),
errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed
when %s is enabled", "failover"),

======
.../libpqwalreceiver/libpqwalreceiver.c

15. libpqrcv_create_slot

+ if (failover)
+ {
+ appendStringInfoString(&cmd, "FAILOVER");
+ if (use_new_options_syntax)
+ appendStringInfoString(&cmd, ", ");
+ else
+ appendStringInfoChar(&cmd, ' ');
+ }

15a.
Isn't failover a new option that is unsupported pre-PG17? Why is it
necessary to support an old-style syntax for something that was not
supported on old servers? (I'm confused).

~

15b.
Also IIRC, this FAILOVER wasn't not listed in the old-style syntax of
doc/src/sgml/protocol.sgml. Was that deliberate?

======
.../replication/logical/logicalfuncs.c

16. pg_logical_slot_get_changes_guts

+ if (XLogRecPtrIsInvalid(upto_lsn))
+ wal_to_wait = end_of_wal;
+ else
+ wal_to_wait = Min(upto_lsn, end_of_wal);
+
+ /*
+ * Wait for specified streaming replication standby servers (if any)
+ * to confirm receipt of WAL upto wal_to_wait.
+ */
+ WalSndWaitForStandbyConfirmation(wal_to_wait);
+

16a.
/WAL upto wal_to_wait./WAL up to wal_to_wait./

~

16b.
Is there another name for this variable (wal_to_wait) that conveys
more meaning? Maybe 'wal_received_pos' or 'wait_for_wal_lsn' or
something better.

======
src/backend/replication/logical/tablesync.c

17. process_syncing_tables_for_apply

CommandCounterIncrement(); /* make updates visible */
if (AllTablesyncsReady())
{
+ char buf[100];
+
+ buf[0] = '\0';
+
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
+ strcat(buf, "twophase");
+ if (MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING)
+ {
+ if (buf[0] != '\0')
+ strcat(buf, " and ");
+ strcat(buf, "failover");
+ }
+
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\"
will restart so that two_phase can be enabled",
- MySubscription->name)));
+ (errmsg("logical replication apply worker for subscription \"%s\"
will restart so that %s can be enabled",
+ MySubscription->name, buf)));
should_exit = true;
}
~

IMO you cannot build up a log buffer using " and " like this because
the translation would be a mess. IIUC, you might have to do it the
long way with multiple errmsg.

SUGGESTION
twophase_pending = MySubscription->twophasestate ==
LOGICALREP_TWOPHASE_STATE_PENDING;
failover_pending = MySubscription->failoverstate ==
LOGICALREP_FAILOVER_STATE_PENDING;

if (twophase_pending || twophase_pending)
ereport(LOG,
twophase_pending && twophase_pending
/* translator: 'two_phase' or 'failover' are subscription options */
? errmsg("logical replication apply worker for subscription \"%s\"
will restart so that two_phase and failover can be enabled")
: errmsg("logical replication apply worker for subscription \"%s\"
will restart so that %s can be enabled",
twophase_pending ? "two_phase" : "failover"));

~~~

18. UpdateTwoPhaseFailoverStates

-UpdateTwoPhaseState(Oid suboid, char new_state)
+UpdateTwoPhaseFailoverStates(Oid suboid,
+ bool update_twophase, char new_state_twophase,
+ bool update_failover, char new_state_failover)

Although this function is written to update to *any* specified state,
in practice it only ever seems called to update from PENDING to ENABLE
state and nothing else.

Therefore it can be simplified by not even passing those states, and
by changing the function name like 'EnableTwoPhaseFailoverTriState'

======
src/backend/replication/logical/worker.c

19. File header comment

There is a lot of talk here about two_phase tri-state and the special
ALTER REFRESH considerations for the two-phase transactions. IIUC,
there should be lots of similar commentary for the failover tri-sate
and ALTER REFRESH.

~~~

20.
* PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
* work.
*/
- if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
- AllTablesyncsReady())
+ twophase_pending = (MySubscription->twophasestate
+ == LOGICALREP_TWOPHASE_STATE_PENDING) ? true : false;
+ failover_pending = (MySubscription->failoverstate
+ == LOGICALREP_FAILOVER_STATE_PENDING) ? true : false;
+

The comment preceding this is only talking about 'two_phase', so
should be expanded to mention also 'failover'

~~~

21. run_apply_worker

+ twophase_pending = (MySubscription->twophasestate
+ == LOGICALREP_TWOPHASE_STATE_PENDING) ? true : false;
+ failover_pending = (MySubscription->failoverstate
+ == LOGICALREP_FAILOVER_STATE_PENDING) ? true : false;

These ternaries are not necessary.

SUGGESTION (has the same meaning)
twophase_pending = (MySubscription->twophasestate ==
LOGICALREP_TWOPHASE_STATE_PENDING);
failover_pending = (MySubscription->failoverstate ==
LOGICALREP_FAILOVER_STATE_PENDING);

~~~

22.
- UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
- MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+
+ /* Update twophase and/or failover */
+ if (twophase_pending || failover_pending)
+ UpdateTwoPhaseFailoverStates(MySubscription->oid,
+ twophase_pending,
+ LOGICALREP_TWOPHASE_STATE_ENABLED,
+ failover_pending,
+ LOGICALREP_FAILOVER_STATE_ENABLED);
+ if (twophase_pending)
+ MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+
+ if (failover_pending)
+ MySubscription->failoverstate = LOGICALREP_FAILOVER_STATE_ENABLED;

There seem rather too many checks for 'twophase_pending' and 'failover_pending'.

With some refactoring this could be done with less code I think. For example,
1. Unconditionally call UpdateTwoPhaseFailoverStates() but just quick
return if nothing to do
2. Pass address of MySubscription->twophasestate/failoverstate, and
let function UpdateTwoPhaseFailoverStates() set those

======
src/backend/replication/slot.c

23.
+char *standby_slot_names;
+static List *standby_slot_names_list = NIL;

Should there be a comment for the new GUC?

~~~

24. ReplicationSlotAlter
+/*
+ * Change the definition of the slot identified by the passed in name.
+ */
+void
+ReplicationSlotAlter(const char *name, bool failover)

/passed in/specified/

/the definition/the failover state/

~~~

25. validate_standby_slots
+
+/*
+ * A helper function to validate slots specified in standby_slot_names GUCs.
+ */
+static bool
+validate_standby_slots(char **newval)

/in standby_slot_names GUCs./in GUC standby_slot_names./

~

26. validate_standby_slots

+ /*
+ * Verify 'type' of slot now.
+ *
+ * Skip check if replication slots' data is not initialized yet i.e. we
+ * are in startup process.
+ */
+ if (!ReplicationSlotCtl)
+ return true;

26a.
This code seems to neglect doing memory cleanup.

+ pfree(rawname);
+ list_free(elemlist);

~

26b.
Indeed, most of this function's return points seem to be neglecting
some memory cleanup, so IMO it would be better to write this function
with some common goto labels that do all this common cleanup:

SUGGESTION
ret_standby_slot_names_ok:
pfree(rawname);
list_free(elemlist);
return true;

ret_standby_slot_names_ng:
pfree(rawname);
list_free(elemlist);
return false;

~

27. validate_standby_slots

+ if (SlotIsLogical(slot))
+ {
+ GUC_check_errdetail("cannot have logical replication slot \"%s\" "
+ "in this parameter", name);
+ list_free(elemlist);
+ return false;
+ }

IIUC, the GUC is for physical replication slots only, so somehow I
felt it was better to keep everything from that (physical)
perspective. YMMV.

SUGGESTION
if (!SlotIsPhysical(slot))
{
GUC_check_errdetail("\"%s\" is not a physical replication slot", name);
list_free(elemlist);
return false;
}

~~~

28. check_standby_slot_names

+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+ if (strcmp(*newval, "") == 0)
+ return true;
+
+ /*
+ * "*" is not accepted as in that case primary will not be able to know
+ * for which all standbys to wait for. Even if we have physical-slots
+ * info, there is no way to confirm whether there is any standby
+ * configured for the known physical slots.
+ */
+ if (strcmp(*newval, "*") == 0)
+ {
+ GUC_check_errdetail("\"%s\" is not accepted for standby_slot_names",
+ *newval);
+ return false;
+ }
+
+ /* Now verify if the specified slots really exist and have correct type */
+ if (!validate_standby_slots(newval))
+ return false;
+
+ *extra = guc_strdup(ERROR, *newval);
+
+ return true;
+}

Is it really necessary to have a special test for the special value
"*" which you are going to reject? I don't see why this should be any
different from checking for other values like "." or "$" or "?" etc.
Why not just let validate_standby_slots() handle all of these?

~~~

29. assign_standby_slot_names

+ /* No value is specified for standby_slot_names. */
+ if (standby_slot_names_cpy == NULL)
+ return;

Is this possible? IIUC the check_standby_slot_names() did:
*extra = guc_strdup(ERROR, *newval);

Maybe this code also needs a similar elog and comment like already in
this function:
/* This should not happen if GUC checked check_standby_slot_names. */

~

30. assign_standby_slot_names

+ char *standby_slot_names_cpy = extra;

IIUC, the 'extra' was unconditionally guc_strdup()'ed in the check
hook, so should we also free it here before leaving this function?

~~~

31. GetStandbySlotList

+/*
+ * Return a copy of standby_slot_names_list if the copy flag is set to true,
+ * otherwise return the original list.
+ */
+List *
+GetStandbySlotList(bool copy)
+{
+ if (copy)
+ return list_copy(standby_slot_names_list);
+ else
+ return standby_slot_names_list;
+}

Why is this better than just exposing the standby_slot_names_list. The
caller can make a copy or not.
e.g. why is calling GetStandbySlotList(true) better than just doing
list_copy(standby_slot_names_list)?

======
src/backend/replication/walsender.c

32. parseCreateReplSlotOptions

static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
TimeLineID *tli_p);

-
/* Initialize walsender process before entering the main command loop */
void

~

Unnecessary changing of whitespace unrelated to this patch.

~~~

33. WalSndWakeupNeeded

+/*
+ * Does this Wal Sender need to wake up logical walsender.
+ *
+ * Check if the physical slot of this walsender is specified in
+ * standby_slot_names GUC.
+ */
+static bool
+WalSndWakeupNeeded()

/Wal Sender/physical walsender process/ (maybe??)

~~~

34. WalSndFilterStandbySlots

+ /* Log warning if no active_pid for this physical slot */
+ if (slot->active_pid == 0)
+ ereport(WARNING,

Other nearby code is guarding the slot in case it was NULL, so why not
here? Is it a potential NPE?

~~~

35.
+ /*
+ * If logical slot name is given in standby_slot_names, give WARNING
+ * and skip it. Since it is harmless, so WARNING should be enough, no
+ * need to error-out.
+ */
+ else if (SlotIsLogical(slot))
+ warningfmt = _("cannot have logical replication slot \"%s\" in
parameter \"%s\", ignoring");

Is this possible? Doesn't the function 'validate_standby_slots' called
by the GUC hook prevent specifying logical slots in the GUC? Maybe
this warning should be changed to Assert?

~~~

36.
+ /*
+ * Reaching here indicates that either the slot has passed the
+ * wait_for_lsn or there is an issue with the slot that requires a
+ * warning to be reported.
+ */
+ if (warningfmt)
+ ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names"));
+
+ standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc);

If something was wrong with the slot that required a warning, is it
really OK to remove this slot from the list? This seems contrary to
the function comment which only talks about removing slots that have
caught up.

~~~

37. WalSndWaitForStandbyConfirmation

+/*
+ * Wait for physical standby to confirm receiving given lsn.
+ *
+ * Here logical walsender associated with failover logical slot waits
+ * for physical standbys corresponding to physical slots specified in
+ * standby_slot_names GUC.
+ */

/given/the given/

~~~

38. WalSndWaitForStandbyConfirmation

+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+
+ for (;;)

This ConditionVariablePrepareToSleep was already called in the
WalSndWait() function. Did it need to be called 2 times?

~~~

39.
+ WalSndFilterStandbySlots(wait_for_lsn, &standby_slots);
+
+ /* Exit if done waiting for every slot. */
+ if (standby_slots == NIL)
+ break;
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ WalSndRereadConfigAndSlots(&standby_slots);
+ }

Shouldn't all the config reload stuff come first before the filter and
NIL check, just in case after the reload there is nothing to do?
Otherwise, it might cause unnecessary sleep.

~~~

40. WalSndWaitForWal

/*
* Wait till WAL < loc is flushed to disk so it can be safely sent to client.
*
- * Returns end LSN of flushed WAL. Normally this will be >= loc, but
- * if we detect a shutdown request (either from postmaster or client)
- * we will return early, so caller must always check.
+ * If the walsender holds a logical slot that has enabled failover, the
+ * function also waits for all the specified streaming replication standby
+ * servers to confirm receipt of WAL upto RecentFlushPtr.
+ *
+ * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we
+ * detect a shutdown request (either from postmaster or client) we will return
+ * early, so caller must always check.
*/
static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)

~

/upto/up to/

~~~

41.
/*
- * Fast path to avoid acquiring the spinlock in case we already know we
- * have enough WAL available. This is particularly interesting if we're
- * far behind.
+ * Check if all the standby servers have confirmed receipt of WAL upto
+ * RecentFlushPtr if we already know we have enough WAL available.
+ *
+ * Note that we cannot directly return without checking the status of
+ * standby servers because the standby_slot_names may have changed, which
+ * means there could be new standby slots in the list that have not yet
+ * caught up to the RecentFlushPtr.
*/
if (RecentFlushPtr != InvalidXLogRecPtr &&
loc <= RecentFlushPtr)
- return RecentFlushPtr;
+ {
+ WalSndFilterStandbySlots(RecentFlushPtr, &standby_slots);

41a.
/upto/up to/

~

41b.
IMO there is some missing information in this comment because it
wasn't clear to me that calling WalSndFilterStandbySlots was going to
side-efect that list to give it a different meaning. e.g. it seems it
no longer means "standby slots" but instead means something like
"standby slots that are not caught up". Perhaps that local variable
can have a name that helps to convey that better?

~~~

42.
+ /*
+ * Fast path to entering the loop in case we already know we have
+ * enough WAL available and all the standby servers has confirmed
+ * receipt of WAL upto RecentFlushPtr. This is particularly
+ * interesting if we're far behind.
+ */
+ if (standby_slots == NIL)
+ return RecentFlushPtr;

42a.
/has/have/

~

42b.
For entering what loop? There's no context for this comment. I assume
it means the loop that comes later in this function, but then isn't
this a typo? /Fast path to entering the loop/Fast path to avoid
entering the loop/. Alternatively, just don't even mention the loop -
just say "Quick return" etc.

~~~

43. WalSndWait

-WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
+WalSndWait(uint32 socket_events, long timeout, uint32 wait_event,
+ bool wait_for_standby)

Does this need the 'wait_for_standby' parameter? AFAICT this was only
set true when the event enum was
WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION, so why do we need
an extra boolean to be passed when there is already enough information
in the event to know when it is waiting for standby?

~~~

44.
+ if (wait_for_standby)
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+ else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
~

A walsender is either physical or logical, but here the
'wait_for_standby' flag overrides everything. Is it OK for this to be
if/else/else or should this code call for wal_confirm_rcv_cv AND the
other one?

e.g. The function comment for WalSndWaitForWal said "the function also waits..."

======
src/backend/utils/misc/guc_tables.c

45.
+ {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+ gettext_noop("List of streaming replication standby server slot "
+ "names that logical walsenders waits for."),

/walsenders waits for./walsender processes will wait for./

======
src/backend/utils/misc/postgresql.conf.sample

46.
+#standby_slot_names = '' # streaming replication standby server slot names that
+ # logical walsenders waits for

(same as the msg in guc_tables)

/walsenders waits for/walsender processes will wait for/

======
src/bin/pg_upgrade/info.c

47.
@@ -681,6 +681,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo,
bool live_check)
int i_twophase;
int i_caught_up;
int i_invalid;
+ int i_failover;

~

IMO it would be better if all these were coded to use the same order
as the SQL -- so put each of the "failover" code immediately after the
'two_phase" code.

~~~

48.
@@ -689,6 +690,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo,
bool live_check)
i_twophase = PQfnumber(res, "two_phase");
i_caught_up = PQfnumber(res, "caught_up");
i_invalid = PQfnumber(res, "invalid");
+ i_failover = PQfnumber(res, "failover");

~

ditto #47.

~~~

49.
@@ -699,6 +701,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo,
bool live_check)
curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
+ curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0);

~

ditto #47.
======
src/bin/pg_upgrade/pg_upgrade.c

50.
+ if (GET_MAJOR_VERSION(new_cluster.major_version) >= 1700)
+ appendPQExpBuffer(query, ", false, %s, %s);",
+ slot_info->two_phase ? "true" : "false",
+ slot_info->failover ? "true" : "false");
+ else
+ appendPQExpBuffer(query, ", false, %s);",
+ slot_info->two_phase ? "true" : "false");

IMO this would be easier to read if it was written the other way around like

if (GET_MAJOR_VERSION(new_cluster.major_version) < 1700)
... old args
else
... new args

======
src/bin/pg_upgrade/pg_upgrade.h

51.
+ bool failover; /* is the slot designated to be synced
+ * to the physical standby */
} LogicalSlotInfo;

The comment is missing a question mark (?) which the others have.

======
src/bin/psql/describe.c

52.
", suborigin AS \"%s\"\n"
", subpasswordrequired AS \"%s\"\n"
- ", subrunasowner AS \"%s\"\n",
+ ", subrunasowner AS \"%s\"\n"
+ ", subfailoverstate AS \"%s\"\n",
gettext_noop("Origin"),
gettext_noop("Password required"),
- gettext_noop("Run as owner?"));
+ gettext_noop("Run as owner?"),
+ gettext_noop("Enable failover?"));

I didn't think "Enable failover?" should not have a question mark. IMO
"run as owner?" is the odd one out so should not have been copied.
Anyway, the subfailoverstate is a 'state' rather than a simple
boolean, so it should be more like subtwophasestate than anything
else.

======
src/bin/psql/tab-complete.c

53.
COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
"disable_on_error", "enabled", "origin",
"password_required", "run_as_owner", "slot_name",
- "streaming", "synchronous_commit", "two_phase");
+ "streaming", "synchronous_commit", "two_phase",
+ "failover");

All these tab completion options are supposed to be in alphabetical
order, so this 'failover' has been added in the wrong position.

======
src/include/catalog/pg_subscription.h

54.
/*
* two_phase tri-state values. See comments atop worker.c to know more about
* these states.
*/
#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'

#define LOGICALREP_FAILOVER_STATE_DISABLED 'd'
#define LOGICALREP_FAILOVER_STATE_PENDING 'p'
#define LOGICALREP_FAILOVER_STATE_ENABLED 'e'

~

54a.
There should either be another comment (like the 'two_phase tri-state'
one) added for the FAILOVER states or that existing comment should be
expanded so that it also mentions the 'failover' tri-states.

~

54b.
Idea: If you are willing to change the constant names (not the values)
of the current tri-states then now both the 'two_phase' and 'failover'
could share them -- I also think this might give the ability to create
macros (if wanted) or to share more code instead of always handling
failover and two_phase separately.

SUGGESTION
#define LOGICALREP_TRISTATE_DISABLED 'd'
#define LOGICALREP_TRISTATE_PENDING 'p'
#define LOGICALREP_TRISTATE_ENABLED 'e'

~

54c.
The header comment at the top of worker.c should give more details
about the 'failover' tri-state. (also mentioned in another review
comment)

~~~

55. FormData_pg_subscription

+ char subfailoverstate; /* Enable Failover State */
+

/Enable Failover State/Failover state/

======
src/include/replication/slot.h

56.
+
+ /*
+ * Is this a failover slot (sync candidate for physical standbys)?
+ * Relevant for logical slots on the primary server.
+ */
+ bool failover;
} ReplicationSlotPersistentData;

~

/Relevant/Only relevant/

======
src/include/replication/walreceiver.h

57.
+#define walrcv_create_slot(conn, slotname, temporary, two_phase,
failover, snapshot_action, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary,
two_phase, failover, snapshot_action, lsn)

double whitespace after the 'failover' parameter?

======
src/include/replication/walsender_private.h

58.
ConditionVariable wal_flush_cv;
ConditionVariable wal_replay_cv;

+ ConditionVariable wal_confirm_rcv_cv;

Should this new field have a comment? Or should it be grouped with the
2 preceding fields (if that same group comment is valid for all of
them)?

======
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message shveta malik 2023-11-21 05:45:31 Re: Synchronizing slots from primary to standby
Previous Message Amit Kapila 2023-11-21 05:16:06 Re: Synchronizing slots from primary to standby