RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-27 12:27:18
Message-ID: OS0PR01MB5716A8B3EDCABAFC3031769B94559@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Monday, September 26, 2022 6:58 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Mon, Sep 26, 2022 at 8:41 AM wangw(dot)fnst(at)fujitsu(dot)com
> <wangw(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > On Thur, Sep 22, 2022 at 18:12 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> wrote:
> >
> > > 3.
> > > ApplyWorkerMain()
> > > {
> > > ...
> > > ...
> > > +
> > > + if (server_version >= 160000 &&
> > > + MySubscription->stream == SUBSTREAM_PARALLEL)
> > > + options.proto.logical.streaming = pstrdup("parallel");
> > >
> > > After deciding here whether the parallel streaming mode is enabled
> > > or not, we recheck the same thing in apply_handle_stream_abort() and
> > > parallel_apply_can_start(). In parallel_apply_can_start(), we do it
> > > via two different checks. How about storing this information say in
> > > structure MyLogicalRepWorker in ApplyWorkerMain() and then use it at
> > > other places?
> >
> > Improved as suggested.
> > Added a new flag "in_parallel_apply" to structure MyLogicalRepWorker.
> >
>
> Can we name the variable in_parallel_apply as parallel_apply and set it in
> logicalrep_worker_launch() instead of in ParallelApplyWorkerMain()?

Changed.

> Few other comments:
> ==================
> 1.
> + if (is_subworker &&
> + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
> + {
> + LWLockRelease(LogicalRepWorkerLock);
> +
> + ereport(DEBUG1,
> + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
> + errmsg("out of parallel apply workers"), errhint("You might need to
> + increase
> max_parallel_apply_workers_per_subscription.")));
>
> I think it is better to keep the level of this as LOG. Similar messages at other
> places use WARNING or LOG. Here, I prefer LOG because the system can still
> proceed without blocking anything.

Changed.

> 2.
> +/* Reset replication origin tracking. */ void
> +parallel_apply_replorigin_reset(void)
> +{
> + bool started_tx = false;
> +
> + /* This function might be called inside or outside of transaction. */
> + if (!IsTransactionState()) { StartTransactionCommand(); started_tx =
> + true; }
>
> Why do we need a transaction in this function?

I think we don't need it and removed this in the new version patch.

> 3. Few suggestions to improve in the patch:
> diff --git a/src/backend/replication/logical/worker.c
> b/src/backend/replication/logical/worker.c
> index 1623c9e2fa..d9c519dfab 100644
> --- a/src/backend/replication/logical/worker.c
> +++ b/src/backend/replication/logical/worker.c
> @@ -1264,6 +1264,10 @@ apply_handle_stream_prepare(StringInfo s)
> case TRANS_LEADER_SEND_TO_PARALLEL:
> Assert(winfo);
>
> + /*
> + * The origin can be active only in one process. See
> + * apply_handle_stream_commit.
> + */
> parallel_apply_replorigin_reset();
>
> /* Send STREAM PREPARE message to the parallel apply worker. */ @@
> -1623,12 +1627,7 @@ apply_handle_stream_abort(StringInfo s)
> (errcode(ERRCODE_PROTOCOL_VIOLATION),
> errmsg_internal("STREAM ABORT message without STREAM STOP")));
>
> - /*
> - * Check whether the publisher sends abort_lsn and abort_time.
> - *
> - * Note that the parallel apply worker is only started when the publisher
> - * sends abort_lsn and abort_time.
> - */
> + /* We receive abort information only when we can apply in parallel. */
> if (MyLogicalRepWorker->in_parallel_apply)
> read_abort_info = true;
>
> @@ -1656,7 +1655,13 @@ apply_handle_stream_abort(StringInfo s)
> Assert(winfo);
>
> if (subxid == xid)
> + {
> + /*
> + * The origin can be active only in one process. See
> + * apply_handle_stream_commit.
> + */
> parallel_apply_replorigin_reset();
> + }
>
> /* Send STREAM ABORT message to the parallel apply worker. */
> parallel_apply_send_data(winfo, s->len, s->data); @@ -1858,6 +1863,12 @@
> apply_handle_stream_commit(StringInfo s)
> case TRANS_LEADER_SEND_TO_PARALLEL:
> Assert(winfo);
>
> + /*
> + * We need to reset the replication origin before sending the commit
> + * message and set it up again after confirming that parallel worker
> + * has processed the message. This is required because origin can be
> + * active only in one process at-a-time.
> + */
> parallel_apply_replorigin_reset();
>
> /* Send STREAM COMMIT message to the parallel apply worker. */ diff --git
> a/src/include/replication/worker_internal.h
> b/src/include/replication/worker_internal.h
> index 4cbfb43492..2bd9664f86 100644
> --- a/src/include/replication/worker_internal.h
> +++ b/src/include/replication/worker_internal.h
> @@ -70,11 +70,7 @@ typedef struct LogicalRepWorker
> */
> pid_t apply_leader_pid;
>
> - /*
> - * Indicates whether to use parallel apply workers.
> - *
> - * Determined based on streaming parameter and publisher version.
> - */
> + /* Indicates whether apply can be performed parallelly. */
> bool in_parallel_apply;
>

Merged, thanks.

Best regards,
Hou zj

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-09-27 12:31:15 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message houzj.fnst@fujitsu.com 2022-09-27 12:26:44 RE: Perform streaming logical transactions by background workers and parallel apply