Re: Time delayed LR (WAS Re: logical replication restrictions)

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "Takamichi Osumi (Fujitsu)" <osumi(dot)takamichi(at)fujitsu(dot)com>
Cc: vignesh C <vignesh21(at)gmail(dot)com>, Kyotaro Horiguchi <horikyota(dot)ntt(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, "shveta(dot)malik(at)gmail(dot)com" <shveta(dot)malik(at)gmail(dot)com>, "dilipbalaut(at)gmail(dot)com" <dilipbalaut(at)gmail(dot)com>, "amit(dot)kapila16(at)gmail(dot)com" <amit(dot)kapila16(at)gmail(dot)com>, "euler(at)eulerto(dot)com" <euler(at)eulerto(dot)com>, "m(dot)melihmutlu(at)gmail(dot)com" <m(dot)melihmutlu(at)gmail(dot)com>, "andres(at)anarazel(dot)de" <andres(at)anarazel(dot)de>, "marcos(at)f10(dot)com(dot)br" <marcos(at)f10(dot)com(dot)br>, "pgsql-hackers(at)postgresql(dot)org" <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Time delayed LR (WAS Re: logical replication restrictions)
Date: 2023-01-18 07:06:17
Message-ID: CAHut+PsydEoqQaDv861qS_366hBovYAE8RRoDu3=+sBM=yo3Dg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are my review comments for the latest patch v16-0001. (excluding
the test code)

======

General

1.

Since the value of min_apply_delay cannot be < 0, I was thinking
probably it should have been declared everywhere in this patch as a
uint64 instead of an int64, right?

======

Commit message

2.

If the subscription sets min_apply_delay parameter, the logical
replication worker will delay the transaction commit for min_apply_delay
milliseconds.

~

IMO there should be another sentence before this just to say that a
new parameter is being added:

e.g.
This patch implements a new subscription parameter called 'min_apply_delay'.

======

doc/src/sgml/config.sgml

3.

+ <para>
+ For time-delayed logical replication, the apply worker sends a Standby
+ Status Update message to the corresponding publisher per the indicated
+ time of this parameter. Therefore, if this parameter is longer than
+ <literal>wal_sender_timeout</literal> on the publisher, then the
+ walsender doesn't get any update message during the delay and repeatedly
+ terminates due to the timeout errors. Hence, make sure this parameter is
+ shorter than the <literal>wal_sender_timeout</literal> of the publisher.
+ If this parameter is set to zero with time-delayed replication, the
+ apply worker doesn't send any feedback messages during the
+ <literal>min_apply_delay</literal>.
+ </para>

This paragraph seemed confusing. I think it needs to be reworded to
change all of the "this parameter" references because there are at
least 3 different parameters mentioned in this paragraph. e.g. maybe
just change them to explicitly name the parameter you are talking
about.

I also think it needs to mention the ‘min_apply_delay’ subscription
parameter up-front and then refer to it appropriately.

The end result might be something like I wrote below (this is just my
guess – probably you can word it better).

SUGGESTION
For time-delayed logical replication (i.e. when the subscription is
created with parameter min_apply_delay > 0), the apply worker sends a
Standby Status Update message to the publisher with a period of
wal_receiver_status_interval . Make sure to set
wal_receiver_status_interval less than the wal_sender_timeout on the
publisher, otherwise, the walsender will repeatedly terminate due to
the timeout errors. If wal_receiver_status_interval is set to zero,
the apply worker doesn't send any feedback messages during the
subscriber’s min_apply_delay period.

======

doc/src/sgml/ref/create_subscription.sgml

4.

+ <para>
+ By default, the subscriber applies changes as soon as possible. As
+ with the physical replication feature
+ (<xref linkend="guc-recovery-min-apply-delay"/>), it can be useful to
+ have a time-delayed logical replica. This parameter lets the user to
+ delay the application of changes by a specified amount of
time. If this
+ value is specified without units, it is taken as milliseconds. The
+ default is zero(no delay).
+ </para>

4a.
As with the physical replication feature (recovery_min_apply_delay),
it can be useful to have a time-delayed logical replica.

IMO not sure that the above sentence is necessary. It seems only to be
saying that this parameter can be useful. Why do we need to say that?

~

4b.
"This parameter lets the user to delay" -> "This parameter lets the user delay"
OR
"This parameter lets the user to delay" -> "This parameter allows the
user to delay"

~

4c.
"If this value is specified without units" -> "If the value is
specified without units"

~

4d.
"zero(no delay)." -> "zero (no delay)."

----

5.

+ <para>
+ The delay occurs only on WAL records for transaction begins and after
+ the initial table synchronization. It is possible that the
+ replication delay between publisher and subscriber exceeds the value
+ of this parameter, in which case no delay is added. Note that the
+ delay is calculated between the WAL time stamp as written on
+ publisher and the current time on the subscriber. Time
spent in logical
+ decoding and in transferring the transaction may reduce the
actual wait
+ time. If the system clocks on publisher and subscriber are not
+ synchronized, this may lead to apply changes earlier than expected,
+ but this is not a major issue because this parameter is
typically much
+ larger than the time deviations between servers. Note that if this
+ parameter is set to a long delay, the replication will stop if the
+ replication slot falls behind the current LSN by more than
+ <link
linkend="guc-max-slot-wal-keep-size"><literal>max_slot_wal_keep_size</literal></link>.
+ </para>

I think the first part can be reworded slightly. See what you think
about the suggestion below.

SUGGESTION
Any delay occurs only on WAL records for transaction begins after all
initial table synchronization has finished. The delay is calculated
between the WAL timestamp as written on the publisher and the current
time on the subscriber. Any overhead of time spent in logical decoding
and in transferring the transaction may reduce the actual wait time.
It is also possible that the overhead already exceeds the requested
'min_apply_delay' value, in which case no additional wait is
necessary. If the system clocks...

----

6.

+ <para>
+ Setting streaming to <literal>parallel</literal> mode and
<literal>min_apply_delay</literal>
+ simultaneously is not supported.
+ </para>

SUGGESTION
A non-zero min_apply_delay parameter is not allowed when streaming in
parallel mode.

======

src/backend/commands/subscriptioncmds.c

7. parse_subscription_options

@@ -404,6 +445,17 @@ parse_subscription_options(ParseState *pstate,
List *stmt_options,
"slot_name = NONE", "create_slot = false")));
}
}
+
+ /* Test the combination of streaming mode and min_apply_delay */
+ if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
+ opts->min_apply_delay > 0)
+ {
+ if (opts->streaming == LOGICALREP_STREAM_PARALLEL)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("%s and %s are mutually exclusive options",
+ "min_apply_delay > 0", "streaming = parallel"));
+ }

SUGGESTION (comment)
The combination of parallel streaming mode and min_apply_delay is not allowed.

~~~

8. AlterSubscription (general)

I observed during testing there are 3 different errors….

At subscription CREATE time you can get this error:
ERROR: min_apply_delay > 0 and streaming = parallel are mutually
exclusive options

If you try to ALTER the min_apply_delay when already streaming =
parallel you can get this error:
ERROR: cannot enable min_apply_delay for subscription in streaming =
parallel mode

If you try to ALTER the streaming to be parallel if there is already a
min_apply_delay > 0 then you can get this error:
ERROR: cannot enable streaming = parallel mode for subscription with
min_apply_delay

~

IMO there is no need to have 3 different error message texts. I think
all these cases are explained by just the first text (ERROR:
min_apply_delay > 0 and streaming = parallel are mutually exclusive
options)

~~~

9. AlterSubscription

@@ -1098,6 +1152,18 @@ AlterSubscription(ParseState *pstate,
AlterSubscriptionStmt *stmt,

if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
+ /*
+ * Test the combination of streaming mode and
+ * min_apply_delay
+ */
+ if (opts.streaming == LOGICALREP_STREAM_PARALLEL)
+ if ((IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) &&
opts.min_apply_delay > 0) ||
+ (!IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) &&
sub->minapplydelay > 0))
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot enable %s mode for subscription with %s",
+ "streaming = parallel", "min_apply_delay"));
+

9a.
SUGGESTION (comment)
The combination of parallel streaming mode and min_apply_delay is not allowed.

~

9b.
(see AlterSubscription general review comment #8 above)
Here you can use the same comment error message that says
min_apply_delay > 0 and streaming = parallel are mutually exclusive
options.

~~~

10. AlterSubscription

@@ -1111,6 +1177,25 @@ AlterSubscription(ParseState *pstate,
AlterSubscriptionStmt *stmt,
= true;
}

+ if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY))
+ {
+ /*
+ * Test the combination of streaming mode and
+ * min_apply_delay
+ */
+ if (opts.min_apply_delay > 0)
+ if ((IsSet(opts.specified_opts, SUBOPT_STREAMING) && opts.streaming
== LOGICALREP_STREAM_PARALLEL) ||
+ (!IsSet(opts.specified_opts, SUBOPT_STREAMING) && sub->stream ==
LOGICALREP_STREAM_PARALLEL))
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot enable %s for subscription in %s mode",
+ "min_apply_delay", "streaming = parallel"));
+
+ values[Anum_pg_subscription_subminapplydelay - 1] =
+ Int64GetDatum(opts.min_apply_delay);
+ replaces[Anum_pg_subscription_subminapplydelay - 1] = true;
+ }

10a.
SUGGESTION (comment)
The combination of parallel streaming mode and min_apply_delay is not allowed.

~

10b.
(see AlterSubscription general review comment #8 above)
Here you can use the same comment error message that says
min_apply_delay > 0 and streaming = parallel are mutually exclusive
options.

======

.../replication/logical/applyparallelworker.c

11.

@@ -704,7 +704,8 @@ pa_process_spooled_messages_if_required(void)
{
apply_spooled_messages(&MyParallelShared->fileset,
MyParallelShared->xid,
- InvalidXLogRecPtr);
+ InvalidXLogRecPtr,
+ 0);

IMO this passing of 0 is a bit strange because it is currently acting
like a dummy value since the apply_spooled_messages will never make
use of the 'finish_ts' anyway (since this call is from a parallel
apply worker).

I think a better way to code this might be to pass the 0 (same as you
are doing here) but inside the apply_spooled_messages change the code:

FROM
if (!am_parallel_apply_worker())
maybe_delay_apply(finish_ts);

TO
if (finish_ts)
maybe_delay_apply(finish_ts);

That does 2 things.
- It makes the passed-in 0 have some meaning
- It simplifies the apply_spooled_messages code

======

src/backend/replication/logical/worker.c

12.

@@ -318,6 +318,17 @@ static List *on_commit_wakeup_workers_subids = NIL;
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;

+/*
+ * In order to avoid walsender's timeout during time-delayed replication,
+ * it's necessary to keep sending feedback messages during the delay from the
+ * worker process. Meanwhile, the feature delays the apply before starting the
+ * transaction and thus we don't write WALs for the suspended changes during
+ * the wait. Hence, in the case the worker process sends a feedback message
+ * during the delay, we should not make positions of the flushed and apply LSN
+ * overwritten by the last received latest LSN. See send_feedback()
for details.
+ */
+static XLogRecPtr last_received = InvalidXLogRecPtr;

12a.
Suggest a small change to the first sentence of the comment.

BEFORE
In order to avoid walsender's timeout during time-delayed replication,
it's necessary to keep sending feedback messages during the delay from
the worker process.

AFTER
In order to avoid walsender timeout for time-delayed replication the
worker process keeps sending feedback messages during the delay
period.

~

12b.
"Hence, in the case" -> "When"

~~~

13. forward declare

-static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
+static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply,
+ bool in_delaying_apply);

Change the param name:

"in_delaying_apply" -> "in_delayed_apply” (??)

~~~

14. maybe_delay_apply

+ /* Nothing to do if no delay set */
+ if (MySubscription->minapplydelay <= 0)
+ return;

IIUC min_apply_delay cannot be < 0 so this condition could simply be:

if (!MySubscription->minapplydelay)
return;

~~~

15. maybe_delay_apply

+ /*
+ * The min_apply_delay parameter is ignored until all tablesync workers
+ * have reached READY state. If we allow the delay during the catchup
+ * phase, once we reach the limit of tablesync workers, it will impose a
+ * delay for each subsequent worker. It means it will take a long time to
+ * finish the initial table synchronization.
+ */
+ if (!AllTablesyncsReady())
+ return;

SUGGESTION (slight rewording)
The min_apply_delay parameter is ignored until all tablesync workers
have reached READY state. This is because if we allowed the delay
during the catchup phase, then once we reached the limit of tablesync
workers it would impose a delay for each subsequent worker. That would
cause initial table synchronization completion to take a long time.

~~~

16. maybe_delay_apply

+ while (true)
+ {
+ long diffms;
+
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();

IMO there should be some small explanatory comment here at the top of
the while loop.

~~~

17. apply_spooled_messages

@@ -2024,6 +2141,21 @@ apply_spooled_messages(FileSet *stream_fileset,
TransactionId xid,
int fileno;
off_t offset;

+ /*
+ * Should we delay the current transaction?
+ *
+ * Unlike the regular (non-streamed) cases, the delay is applied in a
+ * STREAM COMMIT/STREAM PREPARE message for streamed transactions. The
+ * STREAM START message does not contain a commit/prepare time (it will be
+ * available when the in-progress transaction finishes). Hence, it's not
+ * appropriate to apply a delay at that time.
+ *
+ * It's not allowed to execute time-delayed replication with parallel
+ * apply feature.
+ */
+ if (!am_parallel_apply_worker())
+ maybe_delay_apply(finish_ts);

That whole comment part "Unlike the regular (non-streamed) cases"
seems misplaced here. Perhaps this part of the comment is better put
into the function header where the meaning of 'finish_ts' is
explained?

~~~

18. apply_spooled_messages

+ * It's not allowed to execute time-delayed replication with parallel
+ * apply feature.
+ */
+ if (!am_parallel_apply_worker())
+ maybe_delay_apply(finish_ts);

As was mentioned in comment #11 above this code could be changed like

if (finish_ts)
maybe_delay_apply(finish_ts);
then you don't even need to make mention of "parallel apply" at all here.

OTOH if you want to still have the parallel apply comment then maybe
reword it like this:
"It is not allowed to combine time-delayed replication with the
parallel apply feature."

~~~

19. apply_spooled_messages

If you chose not to do my suggestion from comment #11, then there are
2 identical conditions (!am_parallel_apply_worker()); In this case, I
was wondering if it would be better to refactor to use a single
condition instead.

~~~

20. send_feedback
(same as comment #13)

Maybe change the new param name to “in_delayed_apply”?

~~~

21.

@@ -3737,8 +3869,15 @@ send_feedback(XLogRecPtr recvpos, bool force,
bool requestReply)
/*
* No outstanding transactions to flush, we can report the latest received
* position. This is important for synchronous replication.
+ *
+ * During the delay of time-delayed replication, do not tell the publisher
+ * that the received latest LSN is already applied and flushed at this
+ * stage, since we don't apply the transaction yet. If we do so, it leads
+ * to a wrong assumption of logical replication progress on the publisher
+ * side. Here, we just send a feedback message to avoid publisher's
+ * timeout during the delay.
*/

Minor rewording of the comment

SUGGESTION
If the subscriber side apply is delayed (because of time-delayed
replication) then do not tell the publisher that the received latest
LSN is already applied and flushed, otherwise, it leads to the
publisher side making a wrong assumption of logical replication
progress. Instead, we just send a feedback message to avoid a
publisher timeout during the delay.

======

src/bin/pg_dump/pg_dump.c

22.

@@ -4546,9 +4547,14 @@ getSubscriptions(Archive *fout)
LOGICALREP_TWOPHASE_STATE_DISABLED);

if (fout->remoteVersion >= 160000)
- appendPQExpBufferStr(query, " s.suborigin\n");
+ appendPQExpBufferStr(query,
+ " s.suborigin,\n"
+ " s.subminapplydelay\n");
else
- appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+ {
+ appendPQExpBuffer(query, " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY);
+ appendPQExpBufferStr(query, " 0 AS subminapplydelay\n");
+ }

Can’t those appends in the else part can be combined to a single
appendPQExpBuffer

appendPQExpBuffer(query,
" '%s' AS suborigin,\n"
" 0 AS subminapplydelay\n"
LOGICALREP_ORIGIN_ANY);

======

src/include/catalog/pg_subscription.h

23.

@@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId)
BKI_SHARED_RELATION BKI_ROW
XLogRecPtr subskiplsn; /* All changes finished at this LSN are
* skipped */

+ int64 subminapplydelay; /* Replication apply delay */
+
NameData subname; /* Name of the subscription */

Oid subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */

SUGGESTION (for comment)
Replication apply delay (ms)

~~

24.

@@ -120,6 +122,7 @@ typedef struct Subscription
* in */
XLogRecPtr skiplsn; /* All changes finished at this LSN are
* skipped */
+ int64 minapplydelay; /* Replication apply delay */

SUGGESTION (for comment)
Replication apply delay (ms)

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2023-01-18 07:11:02 Re: Modify the document of Logical Replication configuration settings
Previous Message Michael Paquier 2023-01-18 07:04:23 Re: Generating code for query jumbling through gen_node_support.pl