RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-27 12:26:44
Message-ID: OS0PR01MB571620741B90C769D2727D7F94559@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Saturday, September 24, 2022 7:40 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Thu, Sep 22, 2022 at 3:41 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> wrote:
> >
> > On Thu, Sep 22, 2022 at 8:59 AM wangw(dot)fnst(at)fujitsu(dot)com
> > <wangw(dot)fnst(at)fujitsu(dot)com> wrote:
> > >
> >
> > Few comments on v33-0001
> > =======================
> >
>
> Some more comments on v33-0001
> =============================
> 1.
> + /* Information from the corresponding LogicalRepWorker slot. */
> + uint16 logicalrep_worker_generation;
> +
> + int logicalrep_worker_slot_no;
> +} ParallelApplyWorkerShared;
>
> Both these variables are read/changed by leader/parallel workers without
> using any lock (mutex). It seems currently there is no problem because of the
> way the patch is using in_parallel_apply_xact but I think it won't be a good idea
> to rely on it. I suggest using mutex to operate on these variables and also check
> if the slot_no is in a valid range after reading it in parallel_apply_free_worker,
> otherwise error out using elog.

Changed.

> 2.
> static void
> apply_handle_stream_stop(StringInfo s)
> {
> - if (!in_streamed_transaction)
> + ParallelApplyWorkerInfo *winfo = NULL; TransApplyAction apply_action;
> +
> + if (!am_parallel_apply_worker() &&
> + (!in_streamed_transaction && !stream_apply_worker))
> ereport(ERROR,
> (errcode(ERRCODE_PROTOCOL_VIOLATION),
> errmsg_internal("STREAM STOP message without STREAM START")));
>
> This check won't be able to detect missing stream start messages for parallel
> apply workers apart from the first pair of start/stop. I thought of adding
> in_remote_transaction check along with
> am_parallel_apply_worker() to detect the same but that also won't work
> because the parallel worker doesn't reset it at the stop message.
> Another possibility is to introduce yet another variable for this but that doesn't
> seem worth it. I would like to keep this check simple.
> Can you think of any better way?

I feel we can reuse the in_streamed_transaction in parallel apply worker to
simplify the check there. I tried to set this flag in parallel apply worker
when stream starts and reset it when stream stop so that we can directly check
this flag for duplicate stream start message and other related things.

> 3. I think we can skip sending start/stop messages from the leader to the
> parallel worker because unlike apply worker it will process only one
> transaction-at-a-time. However, it is not clear whether that is worth the effort
> because it is sent after logical_decoding_work_mem changes. For now, I have
> added a comment for this in the attached patch but let me if I am missing
> something or if I am wrong.

I the suggested comments look good.

> 4.
> postgres=# select pid, leader_pid, application_name, backend_type from
> pg_stat_activity;
> pid | leader_pid | application_name | backend_type
> -------+------------+------------------+------------------------------
> 27624 | | | logical replication launcher
> 17336 | | psql | client backend
> 26312 | | | logical replication worker
> 26376 | | psql | client backend
> 14004 | | | logical replication worker
>
> Here, the second worker entry is for the parallel worker. Isn't it better if we
> distinguish this by keeping type as a logical replication parallel worker? I think
> for this you need to change bgw_type in logicalrep_worker_launch().

Changed.

> 5. Can we name parallel_apply_subxact_info_add() as
> parallel_apply_start_subtrans()?
>
> Apart from the above, I have added/edited a few comments and made a few
> other cosmetic changes in the attached.

Changed.

Best regards,
Hou zj

Attachment Content-Type Size
v35-0003-Add-some-checks-before-using-parallel-apply-work.patch application/octet-stream 50.4 KB
v35-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 61.4 KB
v35-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 8.7 KB
v35-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 136.4 KB
v35-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 74.5 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-09-27 12:27:18 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message Bharath Rupireddy 2022-09-27 12:03:33 Re: Use pg_pwritev_with_retry() instead of write() in dir_open_for_write() to avoid partial writes?