Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-19 08:48:43
Message-ID: CAA4eK1++B2PFQiin2ZGMs_tszsQ=NJeKuj18-M4PEgckfEYqNA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Aug 18, 2022 at 5:14 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Wed, Aug 17, 2022 at 11:58 AM wangw(dot)fnst(at)fujitsu(dot)com
> <wangw(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > Attach the new patches.
> >
>
> Few comments on v23-0001
> =======================
>

Some more comments on v23-0001
============================
1.
static bool
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
{
...
- /* not in streaming mode */
- if (!in_streamed_transaction)
+ /* Not in streaming mode and not in apply background worker. */
+ if (!(in_streamed_transaction || am_apply_bgworker()))
return false;

This check appears a bit strange because ideally in bgworker
in_streamed_transaction should be false. I think we should set
in_streamed_transaction to true in apply_handle_stream_start() only
when we are going to write to file. Is there a reason for not doing
the same?

2.
+ {
+ /* This is the main apply worker. */
+ ApplyBgworkerInfo *wstate = apply_bgworker_find(xid);
+
+ /*
+ * Check if we are processing this transaction using an apply
+ * background worker and if so, send the changes to that worker.
+ */
+ if (wstate)
+ {
+ /* Send STREAM ABORT message to the apply background worker. */
+ apply_bgworker_send_data(wstate, s->len, s->data);

Why at some places the patch needs to separately fetch
ApplyBgworkerInfo whereas at other places it directly uses
stream_apply_worker to pass the data to bgworker.

3. Why apply_handle_stream_abort() or apply_handle_stream_prepare()
doesn't use apply_bgworker_active() to identify whether it needs to
send the information to bgworker?

4. In apply_handle_stream_prepare(), apply_handle_stream_abort(), and
some other similar functions, the patch handles three cases (a) apply
background worker, (b) sending data to bgworker, (c) handling for
streamed transaction in apply worker. I think the code will look
better if you move the respective code for all three cases into
separate functions. Surely, if the code to deal with each of the cases
is less then we don't need to move it to a separate function.

5.
@@ -1088,24 +1177,78 @@ apply_handle_stream_prepare(StringInfo s)
{
...
+ in_remote_transaction = false;
+
+ /* Unlink the files with serialized changes and subxact info. */
+ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
+ }
+ }

in_remote_transaction = false;
...

We don't need to in_remote_transaction to false in multiple places.

6.
@@ -1177,36 +1311,93 @@ apply_handle_stream_start(StringInfo s)
{
...
...
+ if (am_apply_bgworker())
{
- MemoryContext oldctx;
-
- oldctx = MemoryContextSwitchTo(ApplyContext);
+ /*
+ * Make sure the handle apply_dispatch methods are aware we're in a
+ * remote transaction.
+ */
+ in_remote_transaction = true;

- MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
- FileSetInit(MyLogicalRepWorker->stream_fileset);
+ /* Begin the transaction. */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();

- MemoryContextSwitchTo(oldctx);
+ StartTransactionCommand();
+ BeginTransactionBlock();
+ CommitTransactionCommand();
}
...

Why do we need to start a transaction here? Why can't it be done via
begin_replication_step() during the first operation apply? Is it
because we may need to define a save point in bgworker and we don't
that information beforehand? If so, then also, can't it be handled by
begin_replication_step() either by explicitly passing the information
or checking it there and then starting a transaction block? In any
case, please add a few comments to explain why this separate handling
is required for bgworker?

7. When we are already setting bgworker status as APPLY_BGWORKER_BUSY
in apply_bgworker_setup_dsm() then why do we need to set it again in
apply_bgworker_start()?

8. It is not clear to me how APPLY_BGWORKER_EXIT status is used. Is it
required for the cases where bgworker exists due to some error and
then apply worker uses it to detect that and exits? How other
bgworkers would notice this, is it done via
apply_bgworker_check_status()?

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Peter Smith 2022-08-19 09:06:14 Re: Perform streaming logical transactions by background workers and parallel apply
Previous Message Drouvot, Bertrand 2022-08-19 08:12:57 Patch proposal: make use of regular expressions for the username in pg_hba.conf