Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-02 06:09:53
Message-ID: CAA4eK1J2UAXsLTbSx1s407W9rHh2RdfF99atMr1khEHou6SgQw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Sep 1, 2022 at 4:53 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>

Review of v27-0001*:
================
1. I feel the usage of in_remote_transaction and in_use flags is
slightly complex. IIUC, the patch uses in_use flag to ensure commit
ordering by waiting for it to become false before proceeding in
transaction finish commands in leader apply worker. If so, I think it
is better to name it in_parallel_apply_xact and set it to true only
when we start applying xact in parallel apply worker and set it to
false when we finish the xact in parallel apply worker. It can be
initialized to false while setting up DSM. Also, accordingly change
the function parallel_apply_wait_for_free() to
parallel_apply_wait_for_xact_finish and parallel_apply_set_idle to
parallel_apply_set_xact_finish. We can change the name of the
in_remote_transaction flag to in_use.

Please explain about these flags in the struct where they are declared.

2. The worker_id in ParallelApplyWorkerShared struct could have wrong
information after the worker is reused from the pool. Because we could
have removed some other worker from the ParallelApplyWorkersList which
will make the value of worker_id wrong. For error/debug messages, we
can probably use LSN if available or can oid of subscription if
required. I thought of using xid as well but I think it is better to
avoid that in messages as it can wraparound. See, if the patch uses
xid in other messages, it is better to either use it along with LSN or
try to use only LSN.

3.
elog(ERROR, "[Parallel Apply Worker #%u] unexpected message \"%c\"",
+ shared->worker_id, c);

Also, I am not sure whether the above style (use of []) of messages is
good. Did you follow the usage from some other place?

4.
apply_handle_stream_stop(StringInfo s)
{
...
+ if (apply_action == TA_APPLY_IN_PARALLEL_WORKER)
+ {
+ elog(DEBUG1, "[Parallel Apply Worker #%u] ended processing streaming chunk, "
+ "waiting on shm_mq_receive", MyParallelShared->worker_id);
...

I don't understand the relevance of "waiting on shm_mq_receive" in the
above message because AFAICS, here we are not waiting on any receive
call.

5. I suggest you please go through all the ERROR/LOG/DEBUG messages in
the patch and try to improve them based on the above comments.

6.
+ * The dynamic shared memory segment will contain (1) a shm_mq that can be used
+ * to send errors (and other messages reported via elog/ereport) from the
+ * parallel apply worker to leader apply worker (2) another shm_mq that can be
+ * used to send changes in the transaction from leader apply worker to parallel
+ * apply worker

Here, it would be better to switch (1) and (2). I feel it is better to
explain first about how the main apply information is exchanged among
workers.

7.
+ /* Try to get a free parallel apply worker. */
+ foreach(lc, ParallelApplyWorkersList)
+ {
+ ParallelApplyWorkerInfo *tmp_winfo;
+
+ tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
+
+ if (tmp_winfo->error_mq_handle == NULL)
+ {
+ /*
+ * Release the worker information and try next one if the parallel
+ * apply worker exited cleanly.
+ */
+ ParallelApplyWorkersList =
foreach_delete_current(ParallelApplyWorkersList, lc);
+ shm_mq_detach(tmp_winfo->mq_handle);
+ dsm_detach(tmp_winfo->dsm_seg);
+ pfree(tmp_winfo);
+
+ continue;
+ }
+
+ if (!tmp_winfo->in_remote_transaction)
+ {
+ winfo = tmp_winfo;
+ break;
+ }
+ }

Can we write it as if ... else if? If so, then we don't need to
continue in the first loop. And, can we add some more comments to
explain these cases?

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Thomas Munro 2022-09-02 06:20:42 Re: pg15b3: recovery fails with wal prefetch enabled
Previous Message Dilip Kumar 2022-09-02 05:57:23 Re: test_decoding assertion failure for the loss of top-sub transaction relationship