RE: Time delayed LR (WAS Re: logical replication restrictions)

From: "Takamichi Osumi (Fujitsu)" <osumi(dot)takamichi(at)fujitsu(dot)com>
To: 'Peter Smith' <smithpb2250(at)gmail(dot)com>
Cc: vignesh C <vignesh21(at)gmail(dot)com>, Kyotaro Horiguchi <horikyota(dot)ntt(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, "shveta(dot)malik(at)gmail(dot)com" <shveta(dot)malik(at)gmail(dot)com>, "dilipbalaut(at)gmail(dot)com" <dilipbalaut(at)gmail(dot)com>, "amit(dot)kapila16(at)gmail(dot)com" <amit(dot)kapila16(at)gmail(dot)com>, "euler(at)eulerto(dot)com" <euler(at)eulerto(dot)com>, "m(dot)melihmutlu(at)gmail(dot)com" <m(dot)melihmutlu(at)gmail(dot)com>, "andres(at)anarazel(dot)de" <andres(at)anarazel(dot)de>, "marcos(at)f10(dot)com(dot)br" <marcos(at)f10(dot)com(dot)br>, "pgsql-hackers(at)postgresql(dot)org" <pgsql-hackers(at)postgresql(dot)org>
Subject: RE: Time delayed LR (WAS Re: logical replication restrictions)
Date: 2023-01-19 07:12:14
Message-ID: TYCPR01MB8373447440202B248BB63805EDC49@TYCPR01MB8373.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wednesday, January 18, 2023 4:06 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> Here are my review comments for the latest patch v16-0001. (excluding the
> test code)
Hi, thank you for your review !

> ======
>
> General
>
> 1.
>
> Since the value of min_apply_delay cannot be < 0, I was thinking probably it
> should have been declared everywhere in this patch as a
> uint64 instead of an int64, right?
No, we won't be able to adopt this idea.

It seems that we are not able to use uint for catalog type.
So, can't applying it to the pg_subscription.h definitions
and then similarly Int64GetDatum to store catalog variables
and the argument variable of Int64GetDatum.

Plus, there is a possibility that type Interval becomes negative value,
then we are not able to change the int64 variable to get
the return value of interval2ms().

> ======
>
> Commit message
>
> 2.
>
> If the subscription sets min_apply_delay parameter, the logical replication
> worker will delay the transaction commit for min_apply_delay milliseconds.
>
> ~
>
> IMO there should be another sentence before this just to say that a new
> parameter is being added:
>
> e.g.
> This patch implements a new subscription parameter called
> 'min_apply_delay'.
Added.

> ======
>
> doc/src/sgml/config.sgml
>
> 3.
>
> + <para>
> + For time-delayed logical replication, the apply worker sends a Standby
> + Status Update message to the corresponding publisher per the
> indicated
> + time of this parameter. Therefore, if this parameter is longer than
> + <literal>wal_sender_timeout</literal> on the publisher, then the
> + walsender doesn't get any update message during the delay and
> repeatedly
> + terminates due to the timeout errors. Hence, make sure this parameter
> is
> + shorter than the <literal>wal_sender_timeout</literal> of the
> publisher.
> + If this parameter is set to zero with time-delayed replication, the
> + apply worker doesn't send any feedback messages during the
> + <literal>min_apply_delay</literal>.
> + </para>
>
>
> This paragraph seemed confusing. I think it needs to be reworded to change all
> of the "this parameter" references because there are at least 3 different
> parameters mentioned in this paragraph. e.g. maybe just change them to
> explicitly name the parameter you are talking about.
>
> I also think it needs to mention the ‘min_apply_delay’ subscription parameter
> up-front and then refer to it appropriately.
>
> The end result might be something like I wrote below (this is just my guess ?
> probably you can word it better).
>
> SUGGESTION
> For time-delayed logical replication (i.e. when the subscription is created with
> parameter min_apply_delay > 0), the apply worker sends a Standby Status
> Update message to the publisher with a period of wal_receiver_status_interval .
> Make sure to set wal_receiver_status_interval less than the
> wal_sender_timeout on the publisher, otherwise, the walsender will repeatedly
> terminate due to the timeout errors. If wal_receiver_status_interval is set to zero,
> the apply worker doesn't send any feedback messages during the subscriber’s
> min_apply_delay period.
Applied. Also, I added one reference for min_apply_delay parameter
at the end of this description.

> ======
>
> doc/src/sgml/ref/create_subscription.sgml
>
> 4.
>
> + <para>
> + By default, the subscriber applies changes as soon as possible. As
> + with the physical replication feature
> + (<xref linkend="guc-recovery-min-apply-delay"/>), it can be
> useful to
> + have a time-delayed logical replica. This parameter lets the user to
> + delay the application of changes by a specified amount of
> time. If this
> + value is specified without units, it is taken as milliseconds. The
> + default is zero(no delay).
> + </para>
>
> 4a.
> As with the physical replication feature (recovery_min_apply_delay), it can be
> useful to have a time-delayed logical replica.
>
> IMO not sure that the above sentence is necessary. It seems only to be saying
> that this parameter can be useful. Why do we need to say that?
Removed the sentence.

> ~
>
> 4b.
> "This parameter lets the user to delay" -> "This parameter lets the user delay"
> OR
> "This parameter lets the user to delay" -> "This parameter allows the user to
> delay"
Fixed.


> ~
>
> 4c.
> "If this value is specified without units" -> "If the value is specified without
> units"
Fixed.

> ~
>
> 4d.
> "zero(no delay)." -> "zero (no delay)."
Fixed.

> ----
>
> 5.
>
> + <para>
> + The delay occurs only on WAL records for transaction begins and
> after
> + the initial table synchronization. It is possible that the
> + replication delay between publisher and subscriber exceeds the
> value
> + of this parameter, in which case no delay is added. Note that the
> + delay is calculated between the WAL time stamp as written on
> + publisher and the current time on the subscriber. Time
> spent in logical
> + decoding and in transferring the transaction may reduce the
> actual wait
> + time. If the system clocks on publisher and subscriber are not
> + synchronized, this may lead to apply changes earlier than
> expected,
> + but this is not a major issue because this parameter is
> typically much
> + larger than the time deviations between servers. Note that if this
> + parameter is set to a long delay, the replication will stop if the
> + replication slot falls behind the current LSN by more than
> + <link
> linkend="guc-max-slot-wal-keep-size"><literal>max_slot_wal_keep_size</
> literal></link>.
> + </para>
>
> I think the first part can be reworded slightly. See what you think about the
> suggestion below.
>
> SUGGESTION
> Any delay occurs only on WAL records for transaction begins after all initial
> table synchronization has finished. The delay is calculated between the WAL
> timestamp as written on the publisher and the current time on the subscriber.
> Any overhead of time spent in logical decoding and in transferring the
> transaction may reduce the actual wait time.
> It is also possible that the overhead already exceeds the requested
> 'min_apply_delay' value, in which case no additional wait is necessary. If the
> system clocks...
Addressed.

> ----
>
> 6.
>
> + <para>
> + Setting streaming to <literal>parallel</literal> mode and
> <literal>min_apply_delay</literal>
> + simultaneously is not supported.
> + </para>
>
> SUGGESTION
> A non-zero min_apply_delay parameter is not allowed when streaming in
> parallel mode.
Applied.

> ======
>
> src/backend/commands/subscriptioncmds.c
>
> 7. parse_subscription_options
>
> @@ -404,6 +445,17 @@ parse_subscription_options(ParseState *pstate, List
> *stmt_options,
> "slot_name = NONE", "create_slot = false")));
> }
> }
> +
> + /* Test the combination of streaming mode and min_apply_delay */ if
> + (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
> + opts->min_apply_delay > 0)
> + {
> + if (opts->streaming == LOGICALREP_STREAM_PARALLEL)
> ereport(ERROR,
> + errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually
> + exclusive options",
> + "min_apply_delay > 0", "streaming = parallel")); }
>
> SUGGESTION (comment)
> The combination of parallel streaming mode and min_apply_delay is not
> allowed.
Fixed.

> ~~~
>
> 8. AlterSubscription (general)
>
> I observed during testing there are 3 different errors….
>
> At subscription CREATE time you can get this error:
> ERROR: min_apply_delay > 0 and streaming = parallel are mutually exclusive
> options
>
> If you try to ALTER the min_apply_delay when already streaming = parallel you
> can get this error:
> ERROR: cannot enable min_apply_delay for subscription in streaming =
> parallel mode
>
> If you try to ALTER the streaming to be parallel if there is already a
> min_apply_delay > 0 then you can get this error:
> ERROR: cannot enable streaming = parallel mode for subscription with
> min_apply_delay
Yes. This is because the existing error message styles
in AlterSubscription and parse_subscription_options.

The former uses "mutually exclusive" messages consistently,
while the latter does "cannot enable ..." ones.
> ~
>
> IMO there is no need to have 3 different error message texts. I think all these
> cases are explained by just the first text (ERROR:
> min_apply_delay > 0 and streaming = parallel are mutually exclusive
> options)
Then, we followed this kind of formats.

> ~~~
>
> 9. AlterSubscription
>
> @@ -1098,6 +1152,18 @@ AlterSubscription(ParseState *pstate,
> AlterSubscriptionStmt *stmt,
>
> if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
> {
> + /*
> + * Test the combination of streaming mode and
> + * min_apply_delay
> + */
> + if (opts.streaming == LOGICALREP_STREAM_PARALLEL) if
> + ((IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) &&
> opts.min_apply_delay > 0) ||
> + (!IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) &&
> sub->minapplydelay > 0))
> + ereport(ERROR,
> + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("cannot enable %s mode for subscription with %s",
> + "streaming = parallel", "min_apply_delay"));
> +
>
> 9a.
> SUGGESTION (comment)
> The combination of parallel streaming mode and min_apply_delay is not
> allowed.
Fixed.

> ~
>
> 9b.
> (see AlterSubscription general review comment #8 above) Here you can use the
> same comment error message that says min_apply_delay > 0 and streaming =
> parallel are mutually exclusive options.
As described above, we followed the current style in the existing functions.

> ~~~
>
> 10. AlterSubscription
>
> @@ -1111,6 +1177,25 @@ AlterSubscription(ParseState *pstate,
> AlterSubscriptionStmt *stmt,
> = true;
> }
>
> + if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY)) {
> + /*
> + * Test the combination of streaming mode and
> + * min_apply_delay
> + */
> + if (opts.min_apply_delay > 0)
> + if ((IsSet(opts.specified_opts, SUBOPT_STREAMING) && opts.streaming
> == LOGICALREP_STREAM_PARALLEL) ||
> + (!IsSet(opts.specified_opts, SUBOPT_STREAMING) && sub->stream ==
> LOGICALREP_STREAM_PARALLEL))
> + ereport(ERROR,
> + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("cannot enable %s for subscription in %s mode",
> + "min_apply_delay", "streaming = parallel"));
> +
> + values[Anum_pg_subscription_subminapplydelay - 1] =
> + Int64GetDatum(opts.min_apply_delay);
> + replaces[Anum_pg_subscription_subminapplydelay - 1] = true; }
>
> 10a.
> SUGGESTION (comment)
> The combination of parallel streaming mode and min_apply_delay is not
> allowed.
Fixed.

> ~
>
> 10b.
> (see AlterSubscription general review comment #8 above) Here you can use the
> same comment error message that says min_apply_delay > 0 and streaming =
> parallel are mutually exclusive options.
Same as 9b.

> ======
>
> .../replication/logical/applyparallelworker.c
>
> 11.
>
> @@ -704,7 +704,8 @@ pa_process_spooled_messages_if_required(void)
> {
> apply_spooled_messages(&MyParallelShared->fileset,
> MyParallelShared->xid,
> - InvalidXLogRecPtr);
> + InvalidXLogRecPtr,
> + 0);
>
> IMO this passing of 0 is a bit strange because it is currently acting like a dummy
> value since the apply_spooled_messages will never make use of the 'finish_ts'
> anyway (since this call is from a parallel apply worker).
>
> I think a better way to code this might be to pass the 0 (same as you are doing
> here) but inside the apply_spooled_messages change the code:
>
> FROM
> if (!am_parallel_apply_worker())
> maybe_delay_apply(finish_ts);
>
> TO
> if (finish_ts)
> maybe_delay_apply(finish_ts);
>
> That does 2 things.
> - It makes the passed-in 0 have some meaning
> - It simplifies the apply_spooled_messages code
Adopted.

> ======
>
> src/backend/replication/logical/worker.c
>
> 12.
>
> @@ -318,6 +318,17 @@ static List *on_commit_wakeup_workers_subids =
> NIL; bool in_remote_transaction = false; static XLogRecPtr
> remote_final_lsn = InvalidXLogRecPtr;
>
> +/*
> + * In order to avoid walsender's timeout during time-delayed
> +replication,
> + * it's necessary to keep sending feedback messages during the delay
> +from the
> + * worker process. Meanwhile, the feature delays the apply before
> +starting the
> + * transaction and thus we don't write WALs for the suspended changes
> +during
> + * the wait. Hence, in the case the worker process sends a feedback
> +message
> + * during the delay, we should not make positions of the flushed and
> +apply LSN
> + * overwritten by the last received latest LSN. See send_feedback()
> for details.
> + */
> +static XLogRecPtr last_received = InvalidXLogRecPtr;
>
> 12a.
> Suggest a small change to the first sentence of the comment.
>
> BEFORE
> In order to avoid walsender's timeout during time-delayed replication, it's
> necessary to keep sending feedback messages during the delay from the
> worker process.
>
> AFTER
> In order to avoid walsender timeout for time-delayed replication the worker
> process keeps sending feedback messages during the delay period.
Fixed.

> ~
>
> 12b.
> "Hence, in the case" -> "When"
Fixed.


> ~~~
>
> 13. forward declare
>
> -static void send_feedback(XLogRecPtr recvpos, bool force, bool
> requestReply);
> +static void send_feedback(XLogRecPtr recvpos, bool force, bool
> requestReply,
> + bool in_delaying_apply);
>
> Change the param name:
>
> "in_delaying_apply" -> "in_delayed_apply” (??)
Changed. The initial intention to append the "in_"
prefix is to make the variable name aligned with
some other variables such as "in_remote_transaction" and
"in_streamed_transaction" that mean the current status
for the transaction. So, until there is a better name proposed,
we can keep it.

> ~~~
>
> 14. maybe_delay_apply
>
> + /* Nothing to do if no delay set */
> + if (MySubscription->minapplydelay <= 0) return;
>
> IIUC min_apply_delay cannot be < 0 so this condition could simply be:
>
> if (!MySubscription->minapplydelay)
> return;
Fixed.

> ~~~
>
> 15. maybe_delay_apply
>
> + /*
> + * The min_apply_delay parameter is ignored until all tablesync workers
> + * have reached READY state. If we allow the delay during the catchup
> + * phase, once we reach the limit of tablesync workers, it will impose
> + a
> + * delay for each subsequent worker. It means it will take a long time
> + to
> + * finish the initial table synchronization.
> + */
> + if (!AllTablesyncsReady())
> + return;
>
> SUGGESTION (slight rewording)
> The min_apply_delay parameter is ignored until all tablesync workers have
> reached READY state. This is because if we allowed the delay during the
> catchup phase, then once we reached the limit of tablesync workers it would
> impose a delay for each subsequent worker. That would cause initial table
> synchronization completion to take a long time.
Fixed.

> ~~~
>
> 16. maybe_delay_apply
>
> + while (true)
> + {
> + long diffms;
> +
> + ResetLatch(MyLatch);
> +
> + CHECK_FOR_INTERRUPTS();
>
> IMO there should be some small explanatory comment here at the top of the
> while loop.
Added.

> ~~~
>
> 17. apply_spooled_messages
>
> @@ -2024,6 +2141,21 @@ apply_spooled_messages(FileSet *stream_fileset,
> TransactionId xid,
> int fileno;
> off_t offset;
>
> + /*
> + * Should we delay the current transaction?
> + *
> + * Unlike the regular (non-streamed) cases, the delay is applied in a
> + * STREAM COMMIT/STREAM PREPARE message for streamed transactions.
> The
> + * STREAM START message does not contain a commit/prepare time (it will
> + be
> + * available when the in-progress transaction finishes). Hence, it's
> + not
> + * appropriate to apply a delay at that time.
> + *
> + * It's not allowed to execute time-delayed replication with parallel
> + * apply feature.
> + */
> + if (!am_parallel_apply_worker())
> + maybe_delay_apply(finish_ts);
>
> That whole comment part "Unlike the regular (non-streamed) cases"
> seems misplaced here. Perhaps this part of the comment is better put into
> the function header where the meaning of 'finish_ts' is explained?
Moved it to the header comment for maybe_delay_apply.

> ~~~
>
> 18. apply_spooled_messages
>
> + * It's not allowed to execute time-delayed replication with parallel
> + * apply feature.
> + */
> + if (!am_parallel_apply_worker())
> + maybe_delay_apply(finish_ts);
>
> As was mentioned in comment #11 above this code could be changed like
>
> if (finish_ts)
> maybe_delay_apply(finish_ts);
> then you don't even need to make mention of "parallel apply" at all here.
>
> OTOH if you want to still have the parallel apply comment then maybe reword it
> like this:
> "It is not allowed to combine time-delayed replication with the parallel apply
> feature."
Changed and now I don't mention the parallel apply feature.

> ~~~
>
> 19. apply_spooled_messages
>
> If you chose not to do my suggestion from comment #11, then there are
> 2 identical conditions (!am_parallel_apply_worker()); In this case, I was
> wondering if it would be better to refactor to use a single condition instead.
I applied #11 comment. Now, the conditions are not identical.

> ~~~
>
> 20. send_feedback
> (same as comment #13)
>
> Maybe change the new param name to “in_delayed_apply”?
Changed.

> ~~~
>
> 21.
>
> @@ -3737,8 +3869,15 @@ send_feedback(XLogRecPtr recvpos, bool force,
> bool requestReply)
> /*
> * No outstanding transactions to flush, we can report the latest received
> * position. This is important for synchronous replication.
> + *
> + * During the delay of time-delayed replication, do not tell the
> + publisher
> + * that the received latest LSN is already applied and flushed at this
> + * stage, since we don't apply the transaction yet. If we do so, it
> + leads
> + * to a wrong assumption of logical replication progress on the
> + publisher
> + * side. Here, we just send a feedback message to avoid publisher's
> + * timeout during the delay.
> */
>
> Minor rewording of the comment
>
> SUGGESTION
> If the subscriber side apply is delayed (because of time-delayed
> replication) then do not tell the publisher that the received latest LSN is already
> applied and flushed, otherwise, it leads to the publisher side making a wrong
> assumption of logical replication progress. Instead, we just send a feedback
> message to avoid a publisher timeout during the delay.
Adopted.

> ======
>
>
> src/bin/pg_dump/pg_dump.c
>
> 22.
>
> @@ -4546,9 +4547,14 @@ getSubscriptions(Archive *fout)
> LOGICALREP_TWOPHASE_STATE_DISABLED);
>
> if (fout->remoteVersion >= 160000)
> - appendPQExpBufferStr(query, " s.suborigin\n");
> + appendPQExpBufferStr(query,
> + " s.suborigin,\n"
> + " s.subminapplydelay\n");
> else
> - appendPQExpBuffer(query, " '%s' AS suborigin\n",
> LOGICALREP_ORIGIN_ANY);
> + {
> + appendPQExpBuffer(query, " '%s' AS suborigin,\n",
> + LOGICALREP_ORIGIN_ANY); appendPQExpBufferStr(query, " 0 AS
> + subminapplydelay\n"); }
>
> Can’t those appends in the else part can be combined to a single
> appendPQExpBuffer
>
> appendPQExpBuffer(query,
> " '%s' AS suborigin,\n"
> " 0 AS subminapplydelay\n"
> LOGICALREP_ORIGIN_ANY);
Adopted.

> ======
>
> src/include/catalog/pg_subscription.h
>
> 23.
>
> @@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId)
> BKI_SHARED_RELATION BKI_ROW
> XLogRecPtr subskiplsn; /* All changes finished at this LSN are
> * skipped */
>
> + int64 subminapplydelay; /* Replication apply delay */
> +
> NameData subname; /* Name of the subscription */
>
> Oid subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */
>
> SUGGESTION (for comment)
> Replication apply delay (ms)
Fixed.

> ~~
>
> 24.
>
> @@ -120,6 +122,7 @@ typedef struct Subscription
> * in */
> XLogRecPtr skiplsn; /* All changes finished at this LSN are
> * skipped */
> + int64 minapplydelay; /* Replication apply delay */
>
> SUGGESTION (for comment)
> Replication apply delay (ms)
Fixed.

Kindly have a look at the latest v17 patch in [1].

[1] - https://www.postgresql.org/message-id/TYCPR01MB8373F5162C7A0E6224670CF0EDC49%40TYCPR01MB8373.jpnprd01.prod.outlook.com

Best Regards,
Takamichi Osumi

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Nikita Malakhov 2023-01-19 07:34:38 Re: Inconsistency in vacuum behavior
Previous Message Amit Kapila 2023-01-19 07:05:38 Re: Support logical replication of DDLs