RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-05 12:40:34
Message-ID: OS0PR01MB57161DD88F755587E8FE2C3B947F9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Friday, September 2, 2022 2:10 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Thu, Sep 1, 2022 at 4:53 PM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> >
>
> Review of v27-0001*:

Thanks for the comments.

> ================
> 1. I feel the usage of in_remote_transaction and in_use flags is slightly complex.
> IIUC, the patch uses in_use flag to ensure commit ordering by waiting for it to
> become false before proceeding in transaction finish commands in leader
> apply worker. If so, I think it is better to name it in_parallel_apply_xact and set it
> to true only when we start applying xact in parallel apply worker and set it to
> false when we finish the xact in parallel apply worker. It can be initialized to false
> while setting up DSM. Also, accordingly change the function
> parallel_apply_wait_for_free() to parallel_apply_wait_for_xact_finish and
> parallel_apply_set_idle to parallel_apply_set_xact_finish. We can change the
> name of the in_remote_transaction flag to in_use.

Agreed. One thing I found when addressing this is that there could be a race
condition if we want to set the flag in parallel apply worker:

where the leader has already started waiting for the parallel apply worker to
finish processing the transaction(set the in_parallel_apply_xact to false)
while the child process has not yet processed the first STREAM_START and has
not set the in_parallel_apply_xact to true.

> Please explain about these flags in the struct where they are declared.
>
> 2. The worker_id in ParallelApplyWorkerShared struct could have wrong
> information after the worker is reused from the pool. Because we could have
> removed some other worker from the ParallelApplyWorkersList which will
> make the value of worker_id wrong. For error/debug messages, we can
> probably use LSN if available or can oid of subscription if required. I thought of
> using xid as well but I think it is better to avoid that in messages as it can
> wraparound. See, if the patch uses xid in other messages, it is better to either
> use it along with LSN or try to use only LSN.
> 3.
> elog(ERROR, "[Parallel Apply Worker #%u] unexpected message \"%c\"",
> + shared->worker_id, c);
>
> Also, I am not sure whether the above style (use of []) of messages is good. Did
> you follow the usage from some other place?
> 4.
> apply_handle_stream_stop(StringInfo s)
> {
> ...
> + if (apply_action == TA_APPLY_IN_PARALLEL_WORKER) { elog(DEBUG1,
> + "[Parallel Apply Worker #%u] ended processing streaming chunk, "
> + "waiting on shm_mq_receive", MyParallelShared->worker_id);
> ...
>
> I don't understand the relevance of "waiting on shm_mq_receive" in the
> above message because AFAICS, here we are not waiting on any receive
> call.
>
> 5. I suggest you please go through all the ERROR/LOG/DEBUG messages in
> the patch and try to improve them based on the above comments.

I removed the worker_id and also removed and improved some DEBUG/ERROR
messages which I think is not clear or we don't have similar message in existing code.

> 6.
> + * The dynamic shared memory segment will contain (1) a shm_mq that can be
> used
> + * to send errors (and other messages reported via elog/ereport) from the
> + * parallel apply worker to leader apply worker (2) another shm_mq that can
> be
> + * used to send changes in the transaction from leader apply worker to parallel
> + * apply worker
>
> Here, it would be better to switch (1) and (2). I feel it is better to
> explain first about how the main apply information is exchanged among
> workers.

Exchanged.

> 7.
> + /* Try to get a free parallel apply worker. */
> + foreach(lc, ParallelApplyWorkersList)
> + {
> + ParallelApplyWorkerInfo *tmp_winfo;
> +
> + tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
> +
> + if (tmp_winfo->error_mq_handle == NULL)
> + {
> + /*
> + * Release the worker information and try next one if the parallel
> + * apply worker exited cleanly.
> + */
> + ParallelApplyWorkersList =
> foreach_delete_current(ParallelApplyWorkersList, lc);
> + shm_mq_detach(tmp_winfo->mq_handle);
> + dsm_detach(tmp_winfo->dsm_seg);
> + pfree(tmp_winfo);
> +
> + continue;
> + }
> +
> + if (!tmp_winfo->in_remote_transaction)
> + {
> + winfo = tmp_winfo;
> + break;
> + }
> + }
>
> Can we write it as if ... else if? If so, then we don't need to
> continue in the first loop. And, can we add some more comments to
> explain these cases?

Changed.

Attach the new version patch set which addressed above comments and
also fixed another problem while subscriber to a low version publisher.

Best regards,
Hou zj

Attachment Content-Type Size
v28-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 7.7 KB
v28-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 128.6 KB
v28-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 74.5 KB
v28-0003-Add-some-checks-before-using-parallel-apply-work.patch application/octet-stream 49.7 KB
v28-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 60.0 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Matthias van de Meent 2022-09-05 12:45:57 Re: Different compression methods for FPI
Previous Message Ranier Vilela 2022-09-05 12:17:21 Re: Fix possible bogus array out of bonds (src/backend/access/brin/brin_minmax_multi.c)