RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-24 13:47:15
Message-ID: OS0PR01MB57166F15F4542FAE69A74EC094739@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Friday, August 19, 2022 4:49 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
>
> On Thu, Aug 18, 2022 at 5:14 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> wrote:
> >
> > On Wed, Aug 17, 2022 at 11:58 AM wangw(dot)fnst(at)fujitsu(dot)com
> > <wangw(dot)fnst(at)fujitsu(dot)com> wrote:
> > >
> > > Attach the new patches.
> > >
> >
> > Few comments on v23-0001
> > =======================
> >
>
> Some more comments on v23-0001
> ============================
> 1.
> static bool
> handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) { ...
> - /* not in streaming mode */
> - if (!in_streamed_transaction)
> + /* Not in streaming mode and not in apply background worker. */ if
> + (!(in_streamed_transaction || am_apply_bgworker()))
> return false;
>
> This check appears a bit strange because ideally in bgworker
> in_streamed_transaction should be false. I think we should set
> in_streamed_transaction to true in apply_handle_stream_start() only when we
> are going to write to file. Is there a reason for not doing the same?

No, I removed this.

> 2.
> + {
> + /* This is the main apply worker. */
> + ApplyBgworkerInfo *wstate = apply_bgworker_find(xid);
> +
> + /*
> + * Check if we are processing this transaction using an apply
> + * background worker and if so, send the changes to that worker.
> + */
> + if (wstate)
> + {
> + /* Send STREAM ABORT message to the apply background worker. */
> + apply_bgworker_send_data(wstate, s->len, s->data);
>
> Why at some places the patch needs to separately fetch ApplyBgworkerInfo
> whereas at other places it directly uses stream_apply_worker to pass the data
> to bgworker.
> 3. Why apply_handle_stream_abort() or apply_handle_stream_prepare()
> doesn't use apply_bgworker_active() to identify whether it needs to send the
> information to bgworker?

I think stream_apply_worker is only valid between STREAM_START and STREAM_END,
But it seems it's not clear from the code. So I added some comments and slightly refactor
the code.

> 4. In apply_handle_stream_prepare(), apply_handle_stream_abort(), and some
> other similar functions, the patch handles three cases (a) apply background
> worker, (b) sending data to bgworker, (c) handling for streamed transaction in
> apply worker. I think the code will look better if you move the respective code
> for all three cases into separate functions. Surely, if the code to deal with each
> of the cases is less then we don't need to move it to a separate function.

Refactored and simplified.

> 5.
> @@ -1088,24 +1177,78 @@ apply_handle_stream_prepare(StringInfo s) { ...
> + in_remote_transaction = false;
> +
> + /* Unlink the files with serialized changes and subxact info. */
> + stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); } }
>
> in_remote_transaction = false;
> ...
>
> We don't need to in_remote_transaction to false in multiple places.

Removed.

> 6.
> @@ -1177,36 +1311,93 @@ apply_handle_stream_start(StringInfo s) { ...
> ...
> + if (am_apply_bgworker())
> {
> - MemoryContext oldctx;
> -
> - oldctx = MemoryContextSwitchTo(ApplyContext);
> + /*
> + * Make sure the handle apply_dispatch methods are aware we're in a
> + * remote transaction.
> + */
> + in_remote_transaction = true;
>
> - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
> - FileSetInit(MyLogicalRepWorker->stream_fileset);
> + /* Begin the transaction. */
> + AcceptInvalidationMessages();
> + maybe_reread_subscription();
>
> - MemoryContextSwitchTo(oldctx);
> + StartTransactionCommand();
> + BeginTransactionBlock();
> + CommitTransactionCommand();
> }
> ...
>
> Why do we need to start a transaction here? Why can't it be done via
> begin_replication_step() during the first operation apply? Is it because we may
> need to define a save point in bgworker and we don't that information
> beforehand? If so, then also, can't it be handled by
> begin_replication_step() either by explicitly passing the information or
> checking it there and then starting a transaction block? In any case, please add
> a few comments to explain why this separate handling is required for
> bgworker?

The transaction block is used to define the savepoint and I moved these
codes to the place where the savepoint is defined which looks better now.

> 7. When we are already setting bgworker status as APPLY_BGWORKER_BUSY in
> apply_bgworker_setup_dsm() then why do we need to set it again in
> apply_bgworker_start()?

Removed.

> 8. It is not clear to me how APPLY_BGWORKER_EXIT status is used. Is it required
> for the cases where bgworker exists due to some error and then apply worker
> uses it to detect that and exits? How other bgworkers would notice this, is it
> done via apply_bgworker_check_status()?

It was used to detect the unexpected exit of bgworker and I have changed the design
of this which is now similar to what we have in parallel query.

Attach the new version patch set(v24) which address above comments.
Besides, I added some logic which try to stop the bgworker at transaction end
if there are enough workers in the pool.

Best regards,
Hou zj

Attachment Content-Type Size
v24-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 31.3 KB
v24-0001-Perform-streaming-logical-transactions-by-backgr.patch application/octet-stream 125.4 KB
v24-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 69.1 KB
v24-0003-Add-some-checks-before-using-apply-background-wo.patch application/octet-stream 55.7 KB
v24-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 7.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-08-24 13:50:43 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message Tom Lane 2022-08-24 13:30:18 Re: Strip -mmacosx-version-min options from plperl build