Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-10-18 02:35:47
Message-ID: CAHut+PsY0aevdVqeCUJOrRQMrwpg5Wz3-Mo+bU=mCxW2+9EBTg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi, here are my review comments for patch v38-0001.

======

.../replication/logical/applyparallelworker.c

1. parallel_apply_start_worker

+ /* Try to get a free parallel apply worker. */
+ foreach(lc, ParallelApplyWorkersList)
+ {
+ ParallelApplyWorkerInfo *tmp_winfo;
+
+ tmp_winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
+
+ if (!tmp_winfo->in_use)
+ {
+ /* Found a worker that has not been assigned a transaction. */
+ winfo = tmp_winfo;
+ break;
+ }
+ }

The "Found a worker..." comment seems redundant because it's already
clear from the prior comment and the 'in_use' member what this code is
doing.

~~~

2. LogicalParallelApplyLoop

+ void *data;
+ Size len;
+ int c;
+ int rc;
+ StringInfoData s;
+ MemoryContext oldctx;

Several of these vars (like 'c', 'rc', 's') can be declared deeper -
e.g. only in the scope where they are actually used.

~~~

3.

+ /* Ensure we are reading the data into our memory context. */
+ oldctx = MemoryContextSwitchTo(ApplyMessageContext);

Doesn't something need to switch back to this 'oldctx' prior to
breaking out of the for(;;) loop?

~~~

4.

+ apply_dispatch(&s);
+
+ MemoryContextReset(ApplyMessageContext);

Isn't this broken now? Since you've removed the
MemoryContextSwitchTo(oldctx), so next iteration will switch to
ApplyMessageContext again which will overwrite and lose knowledge of
the original 'oldctx' (??)

~~

5.

Maybe this is a silly idea, I'm not sure. Because this is an infinite
loop, then instead of the multiple calls to
MemoryContextReset(ApplyMessageContext) maybe there can be just a
single call to it immediately before you switch to that context in the
first place. The effect will be the same, won't it?

e.g.
+ /* Ensure we are reading the data into our memory context. */
+ MemoryContextReset(ApplyMessageContext); <=== THIS
+ oldctx = MemoryContextSwitchTo(ApplyMessageContext);

~~~

6.

The code logic keeps flip-flopping for several versions. I think if
you are going to check all the return types of shm_mq_receive then
using a switch(shmq_res) might be a better way than having multiple
if/else with some Asserts.

======

src/backend/replication/logical/launcher.c

7. logicalrep_worker_launch

Previously I'd suggested ([1] #12) that the process name should change
for consistency, and AFAIK Amit also said [2] that would be OK, but
this change is still not done in the current patch.

======

src/backend/replication/logical/worker.c

8. should_apply_changes_for_rel

* Should this worker apply changes for given relation.
*
* This is mainly needed for initial relation data sync as that runs in
* separate worker process running in parallel and we need some way to skip
* changes coming to the main apply worker during the sync of a table.

This existing comment refers to the "main apply worker". IMO it should
say "leader apply worker" to keep all the terminology consistent.

~~~

9. apply_handle_stream_start

+ *
+ * XXX We can avoid sending pairs of the START/STOP messages to the parallel
+ * worker because unlike apply worker it will process only one transaction at a
+ * time. However, it is not clear whether that is worth the effort because it
+ * is sent after logical_decoding_work_mem changes.
*/
static void
apply_handle_stream_start(StringInfo s)

As previously mentioned ([1] #13b) it's not obvious to me what that
last sentence means. e.g. "because it is sent" - what is "it"?

~~~

10. ApplyWorkerMain

else
{
/* This is main apply worker */
RepOriginId originid;
TimeLineID startpointTLI;
char *err;

Same as #8. IMO it should now say "leader apply worker" to keep all
the terminology consistent.

~~~

11.

+ /*
+ * Assign the appropriate streaming flag according to the 'streaming' mode
+ * and the publisher's ability to support that mode.
+ */

Maybe "streaming flag" -> "streaming string/flag". (sorry, it was my
bad suggestion last time)

~~~

12. get_transaction_apply_action

I still felt like there should be some tablesync checks/comments in
this function, just for sanity, even if it works as-is now.

For example, are you saying ([3] #22b) that there might be rare cases
where a Tablesync would call to parallel_apply_find_worker? That seems
strange, given that "for streaming transactions that are being applied
in the parallel ... we disallow applying changes on a table that is
not in the READY state".

------
[1] My v36 review -
https://www.postgresql.org/message-id/CAHut%2BPvxL8tJ2ZUpEjkbRFe6qKSH%2Br54BQ7wM8p%3D335tUbuXbg%40mail.gmail.com
[2] Amit's feedback for my v36 review -
https://www.postgresql.org/message-id/CAA4eK1%2BOyQ8-psruZZ0sYff5KactTHZneR-cfsHd%2Bn%2BN7khEKQ%40mail.gmail.com
[3] Hou's feedback for my v36 review -
https://www.postgresql.org/message-id/OS0PR01MB57162232BF51A09F4BD13C7594249%40OS0PR01MB5716.jpnprd01.prod.outlook.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Zhihong Yu 2022-10-18 02:40:07 Re: Allow WindowFuncs prosupport function to use more optimal WindowClause options
Previous Message Bharath Rupireddy 2022-10-18 02:01:33 Re: Switching XLog source from archive to streaming when primary available