RE: Perform streaming logical transactions by background workers and parallel apply

From: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-06-23 07:21:43
Message-ID: OS3PR01MB6275F61CC74C5F3E47C7DD229EB59@OS3PR01MB6275.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Jun 20, 2022 at 11:00 AM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> I have improved the comments in this and other related sections of the
> patch. See attached.
Thanks for your comments and patch!
Improved the comments as you suggested.

> > > 3.
> > > +
> > > + <para>
> > > + Setting streaming mode to <literal>apply</literal> could export invalid
> LSN
> > > + as finish LSN of failed transaction. Changing the streaming mode and
> making
> > > + the same conflict writes the finish LSN of the failed transaction in the
> > > + server log if required.
> > > + </para>
> > >
> > > How will the user identify that this is an invalid LSN value and she
> > > shouldn't use it to SKIP the transaction? Can we change the second
> > > sentence to: "User should change the streaming mode to 'on' if they
> > > would instead wish to see the finish LSN on error. Users can use
> > > finish LSN to SKIP applying the transaction." I think we can give
> > > reference to docs where the SKIP feature is explained.
> > Improved the sentence as suggested.
> >
>
> You haven't answered first part of the comment: "How will the user
> identify that this is an invalid LSN value and she shouldn't use it to
> SKIP the transaction?". Have you checked what value it displays? For
> example, in one of the case in apply_error_callback as shown in below
> code, we don't even display finish LSN if it is invalid.
> else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" in transaction %u",
> errarg->origin_name,
> logicalrep_message_type(errarg->command),
> errarg->remote_xid);
I am sorry that I missed something in my previous reply.
The invalid LSN value here is to say InvalidXLogRecPtr (0/0).
Here is an example :
```
2022-06-23 14:30:11.343 CST [822333] logical replication worker CONTEXT: processing remote data for replication origin "pg_16389" during "INSERT" for replication target relation "public.tab" in transaction 727 finished at 0/0
```
So I try to improve the sentence in pg-doc by changing from
```
Setting streaming mode to <literal>apply</literal> could export invalid LSN as
finish LSN of failed transaction.
```
to
```
Setting streaming mode to <literal>apply</literal> could export invalid LSN
(0/0) as finish LSN of failed transaction.
```

I also improved the patches as you suggested in [1]:
> 1.
> +/*
> + * Count the number of registered (not necessarily running) apply background
> + * worker for a subscription.
> + */
>
> /worker/workers
Improved as suggested.

> 2.
> +static void
> +apply_bgworker_setup_dsm(ApplyBgworkerState *wstate)
> +{
> ...
> ...
> + int64 queue_size = 160000000; /* 16 MB for now */
>
> I think it would be better to use define for this rather than a
> hard-coded value.
Improved as suggested.
Added a macro like this:
```
/* queue size of DSM, 16 MB for now. */
#define DSM_QUEUE_SIZE 160000000
```

> 3.
> +/*
> + * Status for apply background worker.
> + */
> +typedef enum ApplyBgworkerStatus
> +{
> + APPLY_BGWORKER_ATTACHED = 0,
> + APPLY_BGWORKER_READY,
> + APPLY_BGWORKER_BUSY,
> + APPLY_BGWORKER_FINISHED,
> + APPLY_BGWORKER_EXIT
> +} ApplyBgworkerStatus;
>
> It would be better if you can add comments to explain each of these states.
Improved as suggested.
Added the comments like below:
```
APPLY_BGWORKER_BUSY = 0, /* assigned to a transaction */
APPLY_BGWORKER_FINISHED, /* transaction is completed */
APPLY_BGWORKER_EXIT /* exit */
```
In addition, after improving the point #7 as you suggested, I removed
"APPLY_BGWORKER_ATTACHED". And I removed "APPLY_BGWORKER_READY" in v12.

> 4.
> + /* Set up one message queue per worker, plus one. */
> + mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
> + (Size) queue_size);
> + shm_toc_insert(toc, APPLY_BGWORKER_KEY_MQ, mq);
> + shm_mq_set_sender(mq, MyProc);
>
>
> I don't understand the meaning of 'plus one' in the above comment as
> the patch seems to be setting up just one queue here?
Yes, you are right. Improved as below:
```
/* Set up message queue for the worker. */
```

> 5.
> +
> + /* Attach the queues. */
> + wstate->mq_handle = shm_mq_attach(mq, seg, NULL);
>
> Similar to above. If there is only one queue then the comment should
> say queue instead of queues.
Improved as suggested.

> 6.
> snprintf(bgw.bgw_name, BGW_MAXLEN,
> "logical replication worker for subscription %u", subid);
> + else
> + snprintf(bgw.bgw_name, BGW_MAXLEN,
> + "logical replication background apply worker for subscription %u ", subid);
>
> No need for extra space after %u in the above code.
Improved as suggested.

> 7.
> + launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
> + MySubscription->oid,
> + MySubscription->name,
> + MyLogicalRepWorker->userid,
> + InvalidOid,
> + dsm_segment_handle(wstate->dsm_seg));
> +
> + if (launched)
> + {
> + /* Wait for worker to attach. */
> + apply_bgworker_wait_for(wstate, APPLY_BGWORKER_ATTACHED);
>
> In logicalrep_worker_launch(), we already seem to be waiting for
> workers to attach via WaitForReplicationWorkerAttach(), so it is not
> clear to me why we need to wait again? If there is a genuine reason
> then it is better to add some comments to explain it. I think in some
> way, we need to know if the worker is successfully attached and we may
> not get that via WaitForReplicationWorkerAttach, so there needs to be
> some way to know that but this doesn't sound like a very good idea. If
> that understanding is correct then can we think of a better way?
Improved the related logic.
The reason we wait again here in previous version is to wait for apply bgworker
to attach the memory queue, but function WaitForReplicationWorkerAttach could
not do that.
Now to improve this, we invoke the function logicalrep_worker_attach after the
attaching the memory queue instead of before.
Also to make sure worker has not die due to error or some reasons, I modified
the function logicalrep_worker_launch and function
WaitForReplicationWorkerAttach. And then, we could judge whether the worker
started successfully or died according to the return value of the function
logicalrep_worker_launch.

> 8. I think we can simplify apply_bgworker_find_or_start by having
> separate APIs for find and start. Most of the places need to use find
> API except for the first stream. If we do that then I think you don't
> need to make a hash entry unless we established ApplyBgworkerState
> which currently looks odd as you need to remove the entry if we fail
> to allocate the state.
Improved as suggested.

> 9.
> + /*
> + * TO IMPROVE: Do we need to display the apply background worker's
> + * information in pg_stat_replication ?
> + */
> + UpdateWorkerStats(last_received, send_time, false);
>
> In this do you mean to say pg_stat_subscription? If so, then to decide
> whether we need to update stats here we should see what additional
> information we can update here which is not possible via the main
> apply worker?
Yes, it should be pg_stat_subscription. I think we do not need to update these
statistics here.
I think the messages received in function LogicalApplyBgwLoop in apply bgworker
have handled in function LogicalRepApplyLoop in apply worker, these statistics
have been updated. (see function LogicalRepApplyLoop)

> 10.
> ApplyBgworkerMain
> {
> ...
> + /* Load the subscription into persistent memory context. */
> + ApplyContext = AllocSetContextCreate(TopMemoryContext,
> ...
>
> This comment seems to be copied from ApplyWorkerMain but doesn't apply
> here.
Yes, you are right. Improved as below:
```
/* Init the memory context for the apply background worker to work in. */
```

In addition, I also tried to improve the patches by following points:
a.
In the function apply_handle_stream_abort, when invoking the function
set_apply_error_context_xact, I forgot to change the second input parameter.
So changed "InvalidXLogRecPtr" to "abort_lsn".
b.
Improved the function name from "canstartapplybgworker" to
"apply_bgworker_can_start".
c.
Detach the dsm segment if we fail to launch a apply bgworker. (see function
apply_bgworker_setup)

BTW, I deleted the temporary patch 0003 (v12) and rebased patches because the
commit 26b3455afa and ac0e2d387a in HEAD.
And now, I am improving the patches as suggested by Peter-san in [3]. I will
send new patches soon.

Attach the new patches.

[1] - https://www.postgresql.org/message-id/CAA4eK1%2BQQHGb0afmM_Cf2qu%3DUJoCnvs3VcZ%2B1xTiySx205fU1w%40mail.gmail.com
[2] - https://www.postgresql.org/message-id/OS3PR01MB6275208A2F8ED832710F65E09EA49%40OS3PR01MB6275.jpnprd01.prod.outlook.com
[3] - https://www.postgresql.org/message-id/CAHut%2BPtu_eWOVWAKrwkUFdTAh_r-RZsbDFkFmKwEAmxws%3DSh5w%40mail.gmail.com

Regards,
Wang wei

Attachment Content-Type Size
v13-0001-Perform-streaming-logical-transactions-by-backgr.patch application/octet-stream 100.3 KB
v13-0002-Test-streaming-apply-option-in-tap-test.patch application/octet-stream 68.9 KB
v13-0003-Add-some-checks-before-using-apply-background-wo.patch application/octet-stream 35.7 KB
v13-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 25.5 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Peter Smith 2022-06-23 07:35:37 Fix typo in pg_publication.c
Previous Message Peter Eisentraut 2022-06-23 07:09:49 Re: gcc -ftabstop option