RE: Perform streaming logical transactions by background workers and parallel apply

From: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-15 05:18:28
Message-ID: OS3PR01MB6275321CB63F469A3F03DC039E499@OS3PR01MB6275.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Fri, Sep 9, 2022 at 15:02 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> Here are my review comments for the v28-0001 patch:
>
> (There may be some overlap with other people's review comments and/or
> some fixes already made).

Thanks for your comments.

> 5. src/backend/libpq/pqmq.c
>
> + {
> + if (IsParallelWorker())
> + SendProcSignal(pq_mq_parallel_leader_pid,
> + PROCSIG_PARALLEL_MESSAGE,
> + pq_mq_parallel_leader_backend_id);
> + else
> + {
> + Assert(IsLogicalParallelApplyWorker());
> + SendProcSignal(pq_mq_parallel_leader_pid,
> + PROCSIG_PARALLEL_APPLY_MESSAGE,
> + pq_mq_parallel_leader_backend_id);
> + }
> + }
>
> This code can be simplified if you want to. For example,
>
> {
> ProcSignalReason reason;
> Assert(IsParallelWorker() || IsLogicalParallelApplyWorker());
> reason = IsParallelWorker() ? PROCSIG_PARALLEL_MESSAGE :
> PROCSIG_PARALLEL_APPLY_MESSAGE;
> SendProcSignal(pq_mq_parallel_leader_pid, reason,
> pq_mq_parallel_leader_backend_id);
> }

Not sure this would be better.

> 14.
>
> + /* Failed to start a new parallel apply worker. */
> + if (winfo == NULL)
> + return;
>
> There seem to be quite a lot of places (like this example) where
> something may go wrong and the behaviour apparently will just silently
> fall-back to using the non-parallel streaming. Maybe that is OK, but I
> am just wondering how can the user ever know this has happened? Maybe
> the docs can mention that this could happen and give some description
> of what processes users can look for (or some other strategy) so they
> can just confirm that the parallel streaming is really working like
> they assume it to be?

I think user could refer to the view pg_stat_subscription to check if the
parallel apply worker started.
BTW, we have documented the case if no parallel worker are available.

> 17. src/backend/replication/logical/applyparallelworker.c -
> parallel_apply_free_worker
>
> +/*
> + * Remove the parallel apply worker entry from the hash table. And stop the
> + * worker if there are enough workers in the pool.
> + */
> +void
> +parallel_apply_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId
> xid)
>
> I think the reason for doing the "enough workers in the pool" logic
> needs some more explanation.

Because the process is always running, So stop it to reduce waste of resources.

> 19. src/backend/replication/logical/applyparallelworker.c -
> LogicalParallelApplyLoop
>
> + ApplyMessageContext = AllocSetContextCreate(ApplyContext,
> + "ApplyMessageContext",
> + ALLOCSET_DEFAULT_SIZES);
>
> Should the name of this context be "ParallelApplyMessageContext"?

I think it is okay to use "ApplyMessageContext" here just like "ApplyContext".
I will change this if more people have the same idea as you.

> 20. src/backend/replication/logical/applyparallelworker.c -
> HandleParallelApplyMessage
>
> + default:
> + {
> + elog(ERROR, "unrecognized message type received from parallel apply
> worker: %c (message length %d bytes)",
> + msgtype, msg->len);
> + }
>
> "received from" -> "received by"
>
> ~~~
>
>
> 21. src/backend/replication/logical/applyparallelworker.c -
> HandleParallelApplyMessages
>
> +/*
> + * Handle any queued protocol messages received from parallel apply workers.
> + */
> +void
> +HandleParallelApplyMessages(void)
>
> 21a.
> "received from" -> "received by"
>
> ~
>
> 21b.
> I wonder if this comment should give some credit to the function in
> parallel.c - because this seems almost a copy of all that code.

Since the message is from parallel apply worker to main apply worker, I think
"from" looks a little better.

> 27. src/backend/replication/logical/launcher.c - logicalrep_worker_detach
>
> + /*
> + * This is the leader apply worker; stop all the parallel apply workers
> + * previously started from here.
> + */
> + if (!isParallelApplyWorker(MyLogicalRepWorker))
>
> 27a.
> The comment does not match the code. If this *is* the leader apply
> worker then why do we have the condition to check that?
>
> Maybe only needs a comment update like
>
> SUGGESTION
> If this is the leader apply worker then stop all the parallel...
>
> ~
>
> 27b.
> Code seems also assuming it cannot be a tablesync worker but it is not
> checking that. I am wondering if it will be better to have yet another
> macro/inline to do isLeaderApplyWorker() that will make sure this
> really is the leader apply worker. (This review comment suggestion is
> repeated later below).

=>27a.
Improved as suggested.

=>27b.
Changed the if-statement to
`if (!am_parallel_apply_worker() && !am_tablesync_worker())`.

> 42. src/backend/replication/logical/worker.c - InitializeApplyWorker
>
> +/*
> + * Initialize the database connection, in-memory subscription and necessary
> + * config options.
> + */
>
> I still think this should mention that this is common initialization
> code for "both leader apply workers, and parallel apply workers"

I'm not sure about this. I will change this if more people have the same idea
as you.

> 44. src/backend/replication/logical/worker.c - IsLogicalParallelApplyWorker
>
> +/*
> + * Is current process a logical replication parallel apply worker?
> + */
> +bool
> +IsLogicalParallelApplyWorker(void)
> +{
> + return am_parallel_apply_worker();
> +}
> +
>
> It seems a bit strange to have this function
> IsLogicalParallelApplyWorker, and also am_parallel_apply_worker()
> which are basically identical except one of them is static and one is
> not.
>
> I wonder if there should be just one function. And if you really do
> need 2 names for consistency then you can just define a synonym like
>
> #define am_parallel_apply_worker IsLogicalParallelApplyWorker

I am not sure whether this will be better. But I can change this if more people
prefer.

> 49. src/include/replication/worker_internal.h
>
> @@ -60,6 +64,12 @@ typedef struct LogicalRepWorker
> */
> FileSet *stream_fileset;
>
> + /*
> + * PID of leader apply worker if this slot is used for a parallel apply
> + * worker, InvalidPid otherwise.
> + */
> + pid_t apply_leader_pid;
> +
> /* Stats. */
> XLogRecPtr last_lsn;
> TimestampTz last_send_time;
> Whitespace indent of the new member ok?

I will run pgindent later.

The rest of the comments are changed as suggested.

The new patches were attached in [1].

[1] - https://www.postgresql.org/message-id/OS3PR01MB6275F145878B4A44586C46CE9E499%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message wangw.fnst@fujitsu.com 2022-09-15 05:20:00 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message Andres Freund 2022-09-15 05:17:54 Re: [RFC] building postgres with meson - v13