RE: Optionally automatically disable logical replication subscriptions on error

From: "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>
To: 'Peter Smith' <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "tanghy(dot)fnst(at)fujitsu(dot)com" <tanghy(dot)fnst(at)fujitsu(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Greg Nancarrow <gregn4422(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Mark Dilger <mark(dot)dilger(at)enterprisedb(dot)com>, "Smith, Peter" <peters(at)fast(dot)au(dot)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Optionally automatically disable logical replication subscriptions on error
Date: 2022-02-21 00:25:22
Message-ID: TYCPR01MB837326C879A9CDE756073A7FED3A9@TYCPR01MB8373.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Friday, February 18, 2022 3:27 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> Hi. Below are my code review comments for v18.
Thank you for your review !

> ==========
>
> 1. Commit Message - wording
>
> BEFORE
> To partially remedy the situation, adding a new subscription_parameter named
> 'disable_on_error'.
>
> AFTER
> To partially remedy the situation, this patch adds a new
> subscription_parameter named 'disable_on_error'.
Fixed.

> ~~~
>
> 2. Commit message - wording
>
> BEFORE
> Require to bump catalog version.
>
> AFTER
> A catalog version bump is required.
Fixed.

> ~~~
>
> 3. doc/src/sgml/ref/alter_subscription.sgml - whitespace
>
> @@ -201,8 +201,8 @@ ALTER SUBSCRIPTION <replaceable
> class="parameter">name</replaceable> RENAME TO <
> information. The parameters that can be altered
> are <literal>slot_name</literal>,
> <literal>synchronous_commit</literal>,
> - <literal>binary</literal>, and
> - <literal>streaming</literal>.
> + <literal>binary</literal>,<literal>streaming</literal>, and
> + <literal>disable_on_error</literal>.
> </para>
>
> There is a missing space before <literal>streaming</literal>.
Fixed.

> ~~~
>
> 4. src/backend/replication/logical/worker.c - WorkerErrorRecovery
>
> @@ -2802,6 +2803,89 @@ LogicalRepApplyLoop(XLogRecPtr
> last_received) }
>
> /*
> + * Worker error recovery processing, in preparation for disabling the
> + * subscription.
> + */
> +static void
> +WorkerErrorRecovery(void)
>
> I was wondering about the need for this to be a separate function? It is only
> called immediately before calling 'DisableSubscriptionOnError'
> so would it maybe be better just to put this code inside
> DisableSubscriptionOnError with the appropriate comments?
I preferred to have one specific for error handling,
because from caller sides, when we catch error, it's apparent
that error recovery is done. But, the function name "DisableSubscriptionOnError"
by itself should have the nuance that we do something on error.
So, we can think that it's okay to have error recovery processing
in this function.

So, I removed the function and fixed some related comments.

> ~~~
>
> 5. src/backend/replication/logical/worker.c - DisableSubscriptionOnError
>
> + /*
> + * We would not be here unless this subscription's disableonerr field
> + was
> + * true when our worker began applying changes, but check whether that
> + * field has changed in the interim.
> + */
>
> Apparently, this function might just do nothing if it detects some situation
> where the flag was changed somehow, but I'm not 100% sure that the callers
> are properly catering for when nothing happens.
>
> IMO it would be better if this function would return true/false to mean "did
> disable subscription happen or not?" because that will give the calling code the
> chance to check the function return and do the right thing - e.g. if the caller first
> thought it should be disabled but then it turned out it did NOT disable...
I don't think we need to do something more.
After this function, table sync worker and the apply worker
just exit. IMO, we don't need to do additional work for
already-disabled subscription on the caller sides.
It should be sufficient to fulfill the purpose of
DisableSubscriptionOnError or confirm it has been fulfilled.

> ~~~
>
> 6. src/backend/replication/logical/worker.c - LogicalRepHandleTableSync
> name
>
> +/*
> + * Execute the initial sync with error handling. Disable the
> +subscription,
> + * if it's required.
> + */
> +static void
> +LogicalRepHandleTableSync(XLogRecPtr *origin_startpos,
> + char **myslotname, MemoryContext cctx)
>
> I felt that it is a bit overkill to put a "LogicalRep" prefix here because it is a static
> function.
>
> IMO this function should be renamed as 'SyncTableStartWrapper' because that
> describes better what it is doing.
Makes sense. Fixed.

> ~~~
>
> 7. src/backend/replication/logical/worker.c - LogicalRepHandleTableSync
> Assert
>
> Even though we can know this to be true because of where it is called from, I
> think the readability of the function will be improved if you add an assertion at
> the top:
>
> Assert(am_tablesync_worker());
Fixed.

> And then, because the function is clearly for Tablesync worker only there is no
> need to keep mentioning that in the subsequent comments...
>
> e.g.1
> /* This is table synchronization worker, call initial sync. */
> AFTER:
> /* Call initial sync. */
Fixed.

> e.g.2
> /*
> * Report the table sync error. There is no corresponding message type
> * for table synchronization.
> */
> AFTER
> /*
> * Report the error. There is no corresponding message type for table
> * synchronization.
> */
Agreed. Fixed

> ~~~
>
> 8. src/backend/replication/logical/worker.c - LogicalRepHandleTableSync
> unnecessarily complex
>
> +static void
> +LogicalRepHandleTableSync(XLogRecPtr *origin_startpos,
> + char **myslotname, MemoryContext cctx) {
> + char *syncslotname;
> + bool error_recovery_done = false;
>
> IMO this logic is way more complex than it needed to be. IIUC that
> 'error_recovery_done' and various conditions can be removed, and the whole
> thing be simplified quite a lot.
>
> I re-wrote this function as a POC. Please see the attached file [2].
> All the tests are still passing OK.
>
> (Perhaps the scenario for my comment #5 above still needs to be addressed?)
Removed the 'error_recovery_done' flag and fixed.


> ~~~
>
> 9. src/backend/replication/logical/worker.c -
> LogicalRepHandleApplyMessages name
>
> +/*
> + * Run the apply loop with error handling. Disable the subscription,
> + * if necessary.
> + */
> +static void
> +LogicalRepHandleApplyMessages(XLogRecPtr origin_startpos,
> + MemoryContext cctx)
>
> I felt that it is a bit overkill to put a "LogicalRep" prefix here because it is a static
> function.
>
> IMO this function should be renamed as 'ApplyLoopWrapper' because that
> describes better what it is doing.
Fixed.

> ~~~
>
> 10. src/backend/replication/logical/worker.c -
> LogicalRepHandleApplyMessages unnecessarily complex
>
> +static void
> +LogicalRepHandleApplyMessages(XLogRecPtr origin_startpos,
> + MemoryContext cctx)
> +{
> + bool error_recovery_done = false;
>
> IMO this logic is way more complex than it needed to be. IIUC that
> 'error_recovery_done' and various conditions can be removed, and the whole
> thing be simplified quite a lot.
>
> I re-wrote this function as a POC. Please see the attached file [2].
> All the tests are still passing OK.
>
> (Perhaps the scenario for my comment #5 above still needs to be addressed?)
Fixed.

> ~~~
>
> 11. src/bin/pg_dump/pg_dump.c - dumpSubscription
>
> @@ -4441,6 +4451,9 @@ dumpSubscription(Archive *fout, const
> SubscriptionInfo *subinfo)
> if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
> appendPQExpBufferStr(query, ", two_phase = on");
>
> + if (strcmp(subinfo->subdisableonerr, "f") != 0)
> + appendPQExpBufferStr(query, ", disable_on_error = on");
> +
>
> I felt saying disable_on_err is "true" would look more natural than saying it is
> "on".
Fixed.

> ~~~
>
> 12. src/bin/psql/describe.c - describeSubscriptions typo
>
> @@ -6096,11 +6096,13 @@ describeSubscriptions(const char *pattern, bool
> verbose)
> gettext_noop("Binary"),
> gettext_noop("Streaming"));
>
> - /* Two_phase is only supported in v15 and higher */
> + /* Two_phase and disable_on_error is only supported in v15 and higher
> + */
>
> Typo
>
> "is only" --> "are only"
Fixed.

> ~~~
>
> 13. src/include/catalog/pg_subscription.h - comments
>
> @@ -103,6 +106,9 @@ typedef struct Subscription
> * binary format */
> bool stream; /* Allow streaming in-progress transactions. */
> char twophasestate; /* Allow streaming two-phase transactions */
> + bool disableonerr; /* Indicates if the subscription should be
> + * automatically disabled when subscription
> + * workers detect any errors. */
>
> It's not usual to have a full stop here.
> Maybe not needed to repeat the word "subscription".
> IMO, generally, it all can be simplified a bit.
>
> BEFORE
> Indicates if the subscription should be automatically disabled when
> subscription workers detect any errors.
>
> AFTER
> Indicates if the subscription should be automatically disabled if a worker error
> occurs
Fixed.

> ~~~
>
> 14. src/test/regress/sql/subscription.sql - missing test case.
>
> The "conflicting options" error from the below code is not currently being
> tested.
>
> @@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List
> *stmt_options,
> opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
> opts->twophase = defGetBoolean(defel);
> }
> + else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
> + strcmp(defel->defname, "disable_on_error") == 0) { if
> + (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
> + errorConflictingDefElem(defel, pstate);
We don't have this test in other options as well.
So, this should be aligned.

> ~~~
>
> 15. src/test/subscription/t/028_disable_on_error.pl - 028 clash
>
> Just a heads-up that this 028 is going to clash with the Row-Filter patch 028
> which has been announced to be pushed soon, so be prepared to change this
> number again shortly :)
Thank you for letting me know.

> ~~~
>
> 16. src/test/subscription/t/028_disable_on_error.pl - done_testing
>
> AFAIK is a new style now for the TAP tests where it uses "done_testing();"
> instead of saying up-front how many tests there are.
> See here [1].
Fixed.

> ~~~
>
> 17. src/test/subscription/t/028_disable_on_error.pl - more comments
>
> +# Create an additional unique index in schema s1 on the subscriber
> +only. When # we create subscriptions, below, this should cause
> +subscription "s1" on the # subscriber to fail during initial
> +synchronization and to get automatically # disabled.
>
> I felt it could be made a bit more obvious upfront in a comment that 2 pairs of
> pub/sub will be created, and their names will same as the
> schemas:
> e.g.
> Publisher "s1" --> Subscriber "s1"
> Publisher "s2" --> Subscriber "s2"
Comments are fixed.

> ~~~
>
> 18. src/test/subscription/t/028_disable_on_error.pl - ALTER tests?
>
> The tests here are only using the hardwired 'disable_on_error' options set at
> CREATE SUBSCRIPTION time. There are no TAP tests for changing the
> disable_on_error using ALTER SUBSCRIPTION.
>
> Should there be?
I don't think so. Toggling the flag 'disable_on_error' is already tested
in the subscription.sql file. Both new paths for table sync and apply
worker to disable on error are already covered.

FYI : I skipped one change of worker.c.peter.txt
about "enabled" flag, which is independent from
disable_on_error option.

Kindly have a look at the attached v19.

Best Regards,
Takamichi Osumi

Attachment Content-Type Size
v19-0001-Optionally-disable-subscriptions-on-error.patch application/octet-stream 51.2 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2022-02-21 00:39:08 Re: set TESTDIR from perl rather than Makefile
Previous Message Tom Lane 2022-02-20 23:37:07 Re: Fix overflow in DecodeInterval