RE: Exit walsender before confirming remote flush in logical replication

From: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>
To: 'Peter Smith' <smithpb2250(at)gmail(dot)com>
Cc: "Takamichi Osumi (Fujitsu)" <osumi(dot)takamichi(at)fujitsu(dot)com>, "pgsql-hackers(at)postgresql(dot)org" <pgsql-hackers(at)postgresql(dot)org>, "sawada(dot)mshk(at)gmail(dot)com" <sawada(dot)mshk(at)gmail(dot)com>, "michael(at)paquier(dot)xyz" <michael(at)paquier(dot)xyz>, "peter(dot)eisentraut(at)enterprisedb(dot)com" <peter(dot)eisentraut(at)enterprisedb(dot)com>, "dilipbalaut(at)gmail(dot)com" <dilipbalaut(at)gmail(dot)com>, "andres(at)anarazel(dot)de" <andres(at)anarazel(dot)de>, "amit(dot)kapila16(at)gmail(dot)com" <amit(dot)kapila16(at)gmail(dot)com>, Kyotaro Horiguchi <horikyota(dot)ntt(at)gmail(dot)com>
Subject: RE: Exit walsender before confirming remote flush in logical replication
Date: 2023-02-10 11:54:01
Message-ID: TYAPR01MB5866BE7EA26E20A6D19B2297F5DE9@TYAPR01MB5866.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Dear Peter,

Thank you for reviewing! PSA new version.

> ======
> Commit Message
>
> 1.
> This commit extends START_REPLICATION to accept SHUTDOWN_MODE term.
> Currently,
> it works well only for logical replication.
>
> ~
>
> 1a.
> "to accept SHUTDOWN term" --> "to include a SHUTDOWN_MODE clause."

Fixed.

> 1b.
> "it works well only for..." --> do you mean "it is currently
> implemented only for..."

Fixed.

> 2.
> When 'wait_flush', which is the default, is specified, the walsender will wait
> for all the sent WALs to be flushed on the subscriber side, before exiting the
> process. 'immediate' will exit without confirming the remote flush. This may
> break the consistency between publisher and subscriber, but it may be useful
> for a system that has a high-latency network to reduce the amount of time for
> shutdown. This may be useful to shut down the publisher even when the
> worker is stuck.
>
> ~
>
> SUGGESTION
> The shutdown modes are:
>
> 1) 'wait_flush' (the default). In this mode, the walsender will wait
> for all the sent WALs to be flushed on the subscriber side, before
> exiting the process.
>
> 2) 'immediate'. In this mode, the walsender will exit without
> confirming the remote flush. This may break the consistency between
> publisher and subscriber. This mode might be useful for a system that
> has a high-latency network (to reduce the amount of time for
> shutdown), or to allow the shutdown of the publisher even when the
> worker is stuck.
>
> ======
> doc/src/sgml/protocol.sgml
>
> 3.
> + <varlistentry>
> + <term><literal>SHUTDOWN_MODE { 'wait_flush' | 'immediate'
> }</literal></term>
> + <listitem>
> + <para>
> + Decides the behavior of the walsender process at shutdown. If the
> + shutdown mode is <literal>'wait_flush'</literal>, which is the
> + default, the walsender waits for all the sent WALs to be flushed
> + on the subscriber side. If it is <literal>'immediate'</literal>,
> + the walsender exits without confirming the remote flush.
> + </para>
> + </listitem>
> + </varlistentry>
>
> The synopsis said:
> [ SHUTDOWN_MODE shutdown_mode ]
>
> But then the 'shutdown_mode' term was never mentioned again (??).
> Instead it says:
> SHUTDOWN_MODE { 'wait_flush' | 'immediate' }
>
> IMO the detailed explanation should not say SHUTDOWN_MODE again. It
> should be writtenmore like this:
>
> SUGGESTION
> shutdown_mode
>
> Determines the behavior of the walsender process at shutdown. If
> shutdown_mode is 'wait_flush', the walsender waits for all the sent
> WALs to be flushed on the subscriber side. This is the default when
> SHUTDOWN_MODE is not specified.
>
> If shutdown_mode is 'immediate', the walsender exits without
> confirming the remote flush.

Fixed.

> .../libpqwalreceiver/libpqwalreceiver.c
>
> 4.
> + /* Add SHUTDOWN_MODE option if needed */
> + if (options->shutdown_mode &&
> + PQserverVersion(conn->streamConn) >= 160000)
> + appendStringInfo(&cmd, " SHUTDOWN_MODE '%s'",
> + options->shutdown_mode);
>
> Maybe you can expand on the meaning of "if needed".
>
> SUGGESTION
> Add SHUTDOWN_MODE clause if needed (i.e. if not using the default
> shutdown_mode)

Fixed, but not completely same as your suggestion.

> src/backend/replication/logical/worker.c
>
> 5. maybe_reread_subscription
>
> + *
> + * minapplydelay affects SHUTDOWN_MODE option. 'immediate' shutdown
> mode
> + * will be specified if it is set to non-zero, otherwise default mode will
> + * be set.
>
> Reworded this comment slightly and give a reference to ApplyWorkerMain.
>
> SUGGESTION
> Time-delayed logical replication affects the SHUTDOWN_MODE clause. The
> 'immediate' shutdown mode will be specified if min_apply_delay is
> non-zero, otherwise the default shutdown mode will be used. See
> ApplyWorkerMain.

Fixed.

> 6. ApplyWorkerMain
> + /*
> + * time-delayed logical replication does not support tablesync
> + * workers, so only the leader apply worker can request walsenders to
> + * exit before confirming remote flush.
> + */
>
> "time-delayed" --> "Time-delayed"

Fixed.

> src/backend/replication/repl_gram.y
>
> 7.
> @@ -91,6 +92,7 @@ Node *replication_parse_result;
> %type <boolval> opt_temporary
> %type <list> create_slot_options create_slot_legacy_opt_list
> %type <defelt> create_slot_legacy_opt
> +%type <str> opt_shutdown_mode
>
> The tab alignment seemed not quite right. Not 100% sure.

Fixed accordingly.

> 8.
> @@ -270,20 +272,22 @@ start_replication:
> cmd->slotname = $2;
> cmd->startpoint = $4;
> cmd->timeline = $5;
> + cmd->shutdownmode = NULL;
> $$ = (Node *) cmd;
> }
>
> It seemed a bit inconsistent. E.g. the cmd->options member was not set
> for physical replication, so why then set this member?
>
> Alternatively, maybe should set cmd->options = NULL here as well?

Removed. I checked makeNode() macro, found that palloc0fast() is called there.
This means that we do not have to initialize unused attributes.

> src/backend/replication/walsender.c
>
> 9.
> +/* Indicator for specifying the shutdown mode */
> +typedef enum
> +{
> + WALSND_SHUTDOWN_MODE_WAIT_FLUSH = 0,
> + WALSND_SHUTDOWN_MODE_IMMIDEATE
> +} WalSndShutdownMode;
>
> ~
>
> 9a.
> "Indicator for specifying" (??). How about just saying: "Shutdown modes"

Fixed.

> 9b.
> Typo: WALSND_SHUTDOWN_MODE_IMMIDEATE ==>
> WALSND_SHUTDOWN_MODE_IMMEDIATE

Replaced.

> 9c.
> AFAICT the fact that the first enum value is assigned 0 is not really
> of importance. If that is correct, then IMO maybe it's better to
> remove the "= 0" because the explicit assignment made me expect that
> it had special meaning, and then it was confusing when I could not
> find a reason.

Removed. This was added for skipping the initialization for previous version,
but no longer needed.

> 10. ProcessPendingWrites
>
> + /*
> + * In this function, there is a possibility that the walsender is
> + * stuck. It is caused when the opposite worker is stuck and then the
> + * send-buffer of the walsender becomes full. Therefore, we must add
> + * an additional path for shutdown for immediate shutdown mode.
> + */
> + if (shutdown_mode == WALSND_SHUTDOWN_MODE_IMMIDEATE &&
> + got_STOPPING)
> + WalSndDone(XLogSendLogical);
>
> 10a.
> Can this comment say something like "receiving worker" instead of
> "opposite worker"?
>
> SUGGESTION
> This can happen when the receiving worker is stuck, and then the
> send-buffer of the walsender...

Changed.

> 10b.
> IMO it makes more sense to check this around the other way. E.g. we
> don't care what is the shutdown_mode value unless got_STOPPING is
> true.
>
> SUGGESTION
> if (got_STOPPING && (shutdown_mode ==
> WALSND_SHUTDOWN_MODE_IMMEDIATE))

Changed.

> 11. WalSndDone
>
> + * If we are in the immediate shutdown mode, flush location and output
> + * buffer is not checked. This may break the consistency between nodes,
> + * but it may be useful for the system that has high-latency network to
> + * reduce the amount of time for shutdown.
>
> Add some quotes for the mode.
>
> SUGGESTION
> 'immediate' shutdown mode

Changed.

> 12.
> +/*
> + * Check options for walsender itself and set flags accordingly.
> + *
> + * Currently only one option is accepted.
> + */
> +static void
> +CheckWalSndOptions(const StartReplicationCmd *cmd)
> +{
> + if (cmd->shutdownmode)
> + ParseShutdownMode(cmd->shutdownmode);
> +}
> +
> +/*
> + * Parse given shutdown mode.
> + *
> + * Currently two values are accepted - "wait_flush" and "immediate"
> + */
> +static void
> +ParseShutdownMode(char *shutdownmode)
> +{
> + if (pg_strcasecmp(shutdownmode, "wait_flush") == 0)
> + shutdown_mode = WALSND_SHUTDOWN_MODE_WAIT_FLUSH;
> + else if (pg_strcasecmp(shutdownmode, "immediate") == 0)
> + shutdown_mode = WALSND_SHUTDOWN_MODE_IMMIDEATE;
> + else
> + ereport(ERROR,
> + errcode(ERRCODE_SYNTAX_ERROR),
> + errmsg("invalid value for shutdown mode: \"%s\"", shutdownmode),
> + errhint("Available values: wait_flush, immediate."));
> +}
>
> IMO the ParseShutdownMode function seems unnecessary because it's not
> really "parsing" anything and it is only called in one place. I
> suggest wrapping everything into the CheckWalSndOptions function. The
> end result is still only a simple function:
>
> SUGGESTION
>
> static void
> CheckWalSndOptions(const StartReplicationCmd *cmd)
> {
> if (cmd->shutdownmode)
> {
> char *mode = cmd->shutdownmode;
>
> if (pg_strcasecmp(mode, "wait_flush") == 0)
> shutdown_mode = WALSND_SHUTDOWN_MODE_WAIT_FLUSH;
> else if (pg_strcasecmp(mode, "immediate") == 0)
> shutdown_mode = WALSND_SHUTDOWN_MODE_IMMEDIATE;
>
> else
> ereport(ERROR,
> errcode(ERRCODE_SYNTAX_ERROR),
> errmsg("invalid value for shutdown mode: \"%s\"", mode),
> errhint("Available values: wait_flush, immediate."));
> }
> }

Removed.

> ======
> src/include/replication/walreceiver.h
>
> 13.
> @@ -170,6 +170,7 @@ typedef struct
> * false if physical stream. */
> char *slotname; /* Name of the replication slot or NULL. */
> XLogRecPtr startpoint; /* LSN of starting point. */
> + char *shutdown_mode; /* Name of specified shutdown name */
>
> union
> {
> ~
>
> 13a.
> Typo (shutdown name?)
>
> SUGGESTION
> /* The specified shutdown mode string, or NULL. */

Fixed.

> 13b.
> Because they have the same member names I kept confusing this option
> shutdown_mode with the other enum also called shutdown_mode.
>
> I wonder if is it possible to call this one something like
> 'shutdown_mode_str' to make reading the code easier?

Changed.

> 13c.
> Is this member in the right place? AFAIK this is not even implemented
> for physical replication. e.g. Why isn't this new member part of the
> 'logical' sub-structure in the union?

I remained for future extendibility, but it seemed not to be needed. Moved.

> ======
> src/test/subscription/t/001_rep_changes.pl
>
> 14.
> -# Set min_apply_delay parameter to 3 seconds
> +# Check restart on changing min_apply_delay to 3 seconds
> my $delay = 3;
> $node_subscriber->safe_psql('postgres',
> "ALTER SUBSCRIPTION tap_sub_renamed SET (min_apply_delay =
> '${delay}s')");
> +$node_publisher->poll_query_until('postgres',
> + "SELECT pid != $oldpid FROM pg_stat_replication WHERE
> application_name = 'tap_sub_renamed' AND state = 'streaming';"
> + )
> + or die
> + "Timed out while waiting for the walsender to restart after
> changing min_apply_delay to non-zero value";
>
> IIUC this test is for verifying that a new walsender worker was
> started if the delay was changed from 0 to non-zero. E.g. I think it
> is for it is testing your new logic of the maybe_reread_subscription.
>
> Probably more complete testing also needs to check the other scenarios:
> * min_apply_delay from one non-zero value to another non-zero value
> --> verify a new worker is NOT started.
> * change min_apply_delay from non-zero to zero --> verify a new worker
> IS started

Hmm. These tests do not improve the coverage, so not sure we should test or not.
Moreover, IIUC we do not have a good way to verify that the worker does not restart.
Even if the old pid is remained in the pg_stat_replication, there is a possibility
that walsender exits after that. So currently I added only the case that change
min_apply_delay from non-zero to zero.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

Attachment Content-Type Size
v7-0001-Time-delayed-logical-replication-subscriber.patch application/octet-stream 77.6 KB
v7-0002-Extend-START_REPLICATION-command-to-accept-walsen.patch application/octet-stream 15.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Kapila 2023-02-10 12:15:01 Re: Exit walsender before confirming remote flush in logical replication
Previous Message Takamichi Osumi (Fujitsu) 2023-02-10 11:26:21 RE: Time delayed LR (WAS Re: logical replication restrictions)