Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-30 06:42:57
Message-ID: CAA4eK1+e8JsiC8uMZPU25xQRyxNvVS24M4=Zy-xD18jzX+vrmA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Aug 29, 2022 at 5:01 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Thursday, August 25, 2022 7:33 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> >
>
> > 11.
> > + /*
> > + * Attach to the message queue.
> > + */
> > + mq = shm_toc_lookup(toc, APPLY_BGWORKER_KEY_ERROR_QUEUE, false);
> > + shm_mq_set_sender(mq, MyProc);
> > + error_mqh = shm_mq_attach(mq, seg, NULL);
> > + pq_redirect_to_shm_mq(seg, error_mqh);
> > +
> > + /*
> > + * Now, we have initialized DSM. Attach to slot.
> > + */
> > + logicalrep_worker_attach(worker_slot);
> > + MyParallelShared->logicalrep_worker_generation =
> > MyLogicalRepWorker->generation;
> > + MyParallelShared->logicalrep_worker_slot_no = worker_slot;
> > +
> > + pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
> > + InvalidBackendId);
> >
> > Is there a reason to set parallel_leader immediately after
> > pq_redirect_to_shm_mq() as we are doing parallel.c?
>
> Moved the code.
>

Sorry, if I was not clear but what I wanted was something like the below:

diff --git a/src/backend/replication/logical/applyparallelworker.c
b/src/backend/replication/logical/applyparallelworker.c
index 832e99cd48..6646e00658 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -480,6 +480,9 @@ ApplyParallelWorkerMain(Datum main_arg)
mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
shm_mq_set_sender(mq, MyProc);
error_mqh = shm_mq_attach(mq, seg, NULL);
+ pq_redirect_to_shm_mq(seg, error_mqh);
+ pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
+ InvalidBackendId);

/*
* Primary initialization is complete. Now, we can attach to
our slot. This
@@ -490,10 +493,6 @@ ApplyParallelWorkerMain(Datum main_arg)
MyParallelShared->logicalrep_worker_generation =
MyLogicalRepWorker->generation;
MyParallelShared->logicalrep_worker_slot_no = worker_slot;

- pq_redirect_to_shm_mq(seg, error_mqh);
- pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
- InvalidBackendId);
-
MyLogicalRepWorker->last_send_time =
MyLogicalRepWorker->last_recv_time =
MyLogicalRepWorker->reply_time = 0;

Few other comments on v25-0001*
============================
1.
+ {
+ {"max_apply_parallel_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of apply parallel workers per subscription."),
+ NULL,
+ },
+ &max_apply_parallel_workers_per_subscription,

Let's model this to max_parallel_workers_per_gather and name this
max_parallel_apply_workers_per_subscription.

+typedef struct ApplyParallelWorkerEntry
+{
+ TransactionId xid; /* Hash key -- must be first */
+ ApplyParallelWorkerInfo *winfo;
+} ApplyParallelWorkerEntry;
+
+/* Apply parallel workers hash table (initialized on first use). */
+static HTAB *ApplyParallelWorkersHash = NULL;
+static List *ApplyParallelWorkersFreeList = NIL;
+static List *ApplyParallelWorkersList = NIL;

Similarly, for above let's name them as ParallelApply*. I think in
comments/doc changes it is better to refer as parallel apply worker.
we can keep filename as it is.

2.
+ * If there are enough apply parallel workers(reache half of the
+ * max_apply_parallel_workers_per_subscription)

/reache/reached. There should be a space before (.

3.
+ * The dynamic shared memory segment will contain (1) a shm_mq that can be used
+ * to transport errors (and other messages reported via elog/ereport) from the
+ * apply parallel worker to leader apply worker (2) another shm_mq that can
+ * be used to transport changes in the transaction from leader apply worker to
+ * apply parallel worker (3) necessary information to be shared among apply
+ * parallel workers to leader apply worker

I think it is better to use send instead of transport in above
paragraph. In (3), /apply parallel workers to leader apply
worker/apply parallel workers and leader apply worker

4.
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
{
...
...
+ else if (apply_action == TA_SEND_TO_PARALLEL_WORKER)
+ {
+ parallel_apply_send_data(winfo, s->len, s->data);

It is better to have an Assert for winfo being non-null here and other
similar usages.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Kapila 2022-08-30 06:44:40 Re: patch: Add missing descriptions for rmgr APIs
Previous Message Kyotaro Horiguchi 2022-08-30 05:50:26 Re: pg_rewind WAL segments deletion pitfall