Re: Exit walsender before confirming remote flush in logical replication

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>
Cc: "Takamichi Osumi (Fujitsu)" <osumi(dot)takamichi(at)fujitsu(dot)com>, "pgsql-hackers(at)postgresql(dot)org" <pgsql-hackers(at)postgresql(dot)org>, "sawada(dot)mshk(at)gmail(dot)com" <sawada(dot)mshk(at)gmail(dot)com>, "michael(at)paquier(dot)xyz" <michael(at)paquier(dot)xyz>, "peter(dot)eisentraut(at)enterprisedb(dot)com" <peter(dot)eisentraut(at)enterprisedb(dot)com>, "dilipbalaut(at)gmail(dot)com" <dilipbalaut(at)gmail(dot)com>, "andres(at)anarazel(dot)de" <andres(at)anarazel(dot)de>, "amit(dot)kapila16(at)gmail(dot)com" <amit(dot)kapila16(at)gmail(dot)com>, Kyotaro Horiguchi <horikyota(dot)ntt(at)gmail(dot)com>
Subject: Re: Exit walsender before confirming remote flush in logical replication
Date: 2023-02-10 07:10:53
Message-ID: CAHut+PtDhN6JBHJmmeU4U-0hWf106AV39bRjcS7UuqTBtdXQCg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are my review comments for the v6-0002 patch.

======
Commit Message

1.
This commit extends START_REPLICATION to accept SHUTDOWN_MODE term. Currently,
it works well only for logical replication.

~

1a.
"to accept SHUTDOWN term" --> "to include a SHUTDOWN_MODE clause."

~

1b.
"it works well only for..." --> do you mean "it is currently
implemented only for..."

~~~

2.
When 'wait_flush', which is the default, is specified, the walsender will wait
for all the sent WALs to be flushed on the subscriber side, before exiting the
process. 'immediate' will exit without confirming the remote flush. This may
break the consistency between publisher and subscriber, but it may be useful
for a system that has a high-latency network to reduce the amount of time for
shutdown. This may be useful to shut down the publisher even when the
worker is stuck.

~

SUGGESTION
The shutdown modes are:

1) 'wait_flush' (the default). In this mode, the walsender will wait
for all the sent WALs to be flushed on the subscriber side, before
exiting the process.

2) 'immediate'. In this mode, the walsender will exit without
confirming the remote flush. This may break the consistency between
publisher and subscriber. This mode might be useful for a system that
has a high-latency network (to reduce the amount of time for
shutdown), or to allow the shutdown of the publisher even when the
worker is stuck.

======
doc/src/sgml/protocol.sgml

3.
+ <varlistentry>
+ <term><literal>SHUTDOWN_MODE { 'wait_flush' | 'immediate'
}</literal></term>
+ <listitem>
+ <para>
+ Decides the behavior of the walsender process at shutdown. If the
+ shutdown mode is <literal>'wait_flush'</literal>, which is the
+ default, the walsender waits for all the sent WALs to be flushed
+ on the subscriber side. If it is <literal>'immediate'</literal>,
+ the walsender exits without confirming the remote flush.
+ </para>
+ </listitem>
+ </varlistentry>

The synopsis said:
[ SHUTDOWN_MODE shutdown_mode ]

But then the 'shutdown_mode' term was never mentioned again (??).
Instead it says:
SHUTDOWN_MODE { 'wait_flush' | 'immediate' }

IMO the detailed explanation should not say SHUTDOWN_MODE again. It
should be writtenmore like this:

SUGGESTION
shutdown_mode

Determines the behavior of the walsender process at shutdown. If
shutdown_mode is 'wait_flush', the walsender waits for all the sent
WALs to be flushed on the subscriber side. This is the default when
SHUTDOWN_MODE is not specified.

If shutdown_mode is 'immediate', the walsender exits without
confirming the remote flush.

======
.../libpqwalreceiver/libpqwalreceiver.c

4.
+ /* Add SHUTDOWN_MODE option if needed */
+ if (options->shutdown_mode &&
+ PQserverVersion(conn->streamConn) >= 160000)
+ appendStringInfo(&cmd, " SHUTDOWN_MODE '%s'",
+ options->shutdown_mode);

Maybe you can expand on the meaning of "if needed".

SUGGESTION
Add SHUTDOWN_MODE clause if needed (i.e. if not using the default shutdown_mode)

======
src/backend/replication/logical/worker.c

5. maybe_reread_subscription

+ *
+ * minapplydelay affects SHUTDOWN_MODE option. 'immediate' shutdown mode
+ * will be specified if it is set to non-zero, otherwise default mode will
+ * be set.

Reworded this comment slightly and give a reference to ApplyWorkerMain.

SUGGESTION
Time-delayed logical replication affects the SHUTDOWN_MODE clause. The
'immediate' shutdown mode will be specified if min_apply_delay is
non-zero, otherwise the default shutdown mode will be used. See
ApplyWorkerMain.

~~~

6. ApplyWorkerMain
+ /*
+ * time-delayed logical replication does not support tablesync
+ * workers, so only the leader apply worker can request walsenders to
+ * exit before confirming remote flush.
+ */

"time-delayed" --> "Time-delayed"

======
src/backend/replication/repl_gram.y

7.
@@ -91,6 +92,7 @@ Node *replication_parse_result;
%type <boolval> opt_temporary
%type <list> create_slot_options create_slot_legacy_opt_list
%type <defelt> create_slot_legacy_opt
+%type <str> opt_shutdown_mode

The tab alignment seemed not quite right. Not 100% sure.

~~~

8.
@@ -270,20 +272,22 @@ start_replication:
cmd->slotname = $2;
cmd->startpoint = $4;
cmd->timeline = $5;
+ cmd->shutdownmode = NULL;
$$ = (Node *) cmd;
}

It seemed a bit inconsistent. E.g. the cmd->options member was not set
for physical replication, so why then set this member?

Alternatively, maybe should set cmd->options = NULL here as well?

======
src/backend/replication/walsender.c

9.
+/* Indicator for specifying the shutdown mode */
+typedef enum
+{
+ WALSND_SHUTDOWN_MODE_WAIT_FLUSH = 0,
+ WALSND_SHUTDOWN_MODE_IMMIDEATE
+} WalSndShutdownMode;

~

9a.
"Indicator for specifying" (??). How about just saying: "Shutdown modes"

~

9b.
Typo: WALSND_SHUTDOWN_MODE_IMMIDEATE ==> WALSND_SHUTDOWN_MODE_IMMEDIATE

~

9c.
AFAICT the fact that the first enum value is assigned 0 is not really
of importance. If that is correct, then IMO maybe it's better to
remove the "= 0" because the explicit assignment made me expect that
it had special meaning, and then it was confusing when I could not
find a reason.

~~~

10. ProcessPendingWrites

+ /*
+ * In this function, there is a possibility that the walsender is
+ * stuck. It is caused when the opposite worker is stuck and then the
+ * send-buffer of the walsender becomes full. Therefore, we must add
+ * an additional path for shutdown for immediate shutdown mode.
+ */
+ if (shutdown_mode == WALSND_SHUTDOWN_MODE_IMMIDEATE &&
+ got_STOPPING)
+ WalSndDone(XLogSendLogical);

10a.
Can this comment say something like "receiving worker" instead of
"opposite worker"?

SUGGESTION
This can happen when the receiving worker is stuck, and then the
send-buffer of the walsender...

~

10b.
IMO it makes more sense to check this around the other way. E.g. we
don't care what is the shutdown_mode value unless got_STOPPING is
true.

SUGGESTION
if (got_STOPPING && (shutdown_mode == WALSND_SHUTDOWN_MODE_IMMEDIATE))

~~~

11. WalSndDone

+ * If we are in the immediate shutdown mode, flush location and output
+ * buffer is not checked. This may break the consistency between nodes,
+ * but it may be useful for the system that has high-latency network to
+ * reduce the amount of time for shutdown.

Add some quotes for the mode.

SUGGESTION
'immediate' shutdown mode

~~~

12.
+/*
+ * Check options for walsender itself and set flags accordingly.
+ *
+ * Currently only one option is accepted.
+ */
+static void
+CheckWalSndOptions(const StartReplicationCmd *cmd)
+{
+ if (cmd->shutdownmode)
+ ParseShutdownMode(cmd->shutdownmode);
+}
+
+/*
+ * Parse given shutdown mode.
+ *
+ * Currently two values are accepted - "wait_flush" and "immediate"
+ */
+static void
+ParseShutdownMode(char *shutdownmode)
+{
+ if (pg_strcasecmp(shutdownmode, "wait_flush") == 0)
+ shutdown_mode = WALSND_SHUTDOWN_MODE_WAIT_FLUSH;
+ else if (pg_strcasecmp(shutdownmode, "immediate") == 0)
+ shutdown_mode = WALSND_SHUTDOWN_MODE_IMMIDEATE;
+ else
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid value for shutdown mode: \"%s\"", shutdownmode),
+ errhint("Available values: wait_flush, immediate."));
+}

IMO the ParseShutdownMode function seems unnecessary because it's not
really "parsing" anything and it is only called in one place. I
suggest wrapping everything into the CheckWalSndOptions function. The
end result is still only a simple function:

SUGGESTION

static void
CheckWalSndOptions(const StartReplicationCmd *cmd)
{
if (cmd->shutdownmode)
{
char *mode = cmd->shutdownmode;

if (pg_strcasecmp(mode, "wait_flush") == 0)
shutdown_mode = WALSND_SHUTDOWN_MODE_WAIT_FLUSH;
else if (pg_strcasecmp(mode, "immediate") == 0)
shutdown_mode = WALSND_SHUTDOWN_MODE_IMMEDIATE;

else
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("invalid value for shutdown mode: \"%s\"", mode),
errhint("Available values: wait_flush, immediate."));
}
}

======
src/include/replication/walreceiver.h

13.
@@ -170,6 +170,7 @@ typedef struct
* false if physical stream. */
char *slotname; /* Name of the replication slot or NULL. */
XLogRecPtr startpoint; /* LSN of starting point. */
+ char *shutdown_mode; /* Name of specified shutdown name */

union
{
~

13a.
Typo (shutdown name?)

SUGGESTION
/* The specified shutdown mode string, or NULL. */

~

13b.
Because they have the same member names I kept confusing this option
shutdown_mode with the other enum also called shutdown_mode.

I wonder if is it possible to call this one something like
'shutdown_mode_str' to make reading the code easier?

~

13c.
Is this member in the right place? AFAIK this is not even implemented
for physical replication. e.g. Why isn't this new member part of the
'logical' sub-structure in the union?

======
src/test/subscription/t/001_rep_changes.pl

14.
-# Set min_apply_delay parameter to 3 seconds
+# Check restart on changing min_apply_delay to 3 seconds
my $delay = 3;
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub_renamed SET (min_apply_delay = '${delay}s')");
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE
application_name = 'tap_sub_renamed' AND state = 'streaming';"
+ )
+ or die
+ "Timed out while waiting for the walsender to restart after
changing min_apply_delay to non-zero value";

IIUC this test is for verifying that a new walsender worker was
started if the delay was changed from 0 to non-zero. E.g. I think it
is for it is testing your new logic of the maybe_reread_subscription.

Probably more complete testing also needs to check the other scenarios:
* min_apply_delay from one non-zero value to another non-zero value
--> verify a new worker is NOT started.
* change min_apply_delay from non-zero to zero --> verify a new worker
IS started

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Masahiko Sawada 2023-02-10 07:15:46 Re: [PoC] Improve dead tuple storage for lazy vacuum
Previous Message John Naylor 2023-02-10 06:51:24 Re: [PoC] Improve dead tuple storage for lazy vacuum