Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-06-17 08:57:45
Message-ID: CAA4eK1+QQHGb0afmM_Cf2qu=UJoCnvs3VcZ+1xTiySx205fU1w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Fri, Jun 17, 2022 at 12:47 PM wangw(dot)fnst(at)fujitsu(dot)com
<wangw(dot)fnst(at)fujitsu(dot)com> wrote:
>
> Attach the new patches.
> Only changed patches 0001, 0004.
>

Few more comments on the previous version of patch:
===========================================
1.
+/*
+ * Count the number of registered (not necessarily running) apply background
+ * worker for a subscription.
+ */

/worker/workers

2.
+static void
+apply_bgworker_setup_dsm(ApplyBgworkerState *wstate)
+{
...
...
+ int64 queue_size = 160000000; /* 16 MB for now */

I think it would be better to use define for this rather than a
hard-coded value.

3.
+/*
+ * Status for apply background worker.
+ */
+typedef enum ApplyBgworkerStatus
+{
+ APPLY_BGWORKER_ATTACHED = 0,
+ APPLY_BGWORKER_READY,
+ APPLY_BGWORKER_BUSY,
+ APPLY_BGWORKER_FINISHED,
+ APPLY_BGWORKER_EXIT
+} ApplyBgworkerStatus;

It would be better if you can add comments to explain each of these states.

4.
+ /* Set up one message queue per worker, plus one. */
+ mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
+ (Size) queue_size);
+ shm_toc_insert(toc, APPLY_BGWORKER_KEY_MQ, mq);
+ shm_mq_set_sender(mq, MyProc);

I don't understand the meaning of 'plus one' in the above comment as
the patch seems to be setting up just one queue here?

5.
+
+ /* Attach the queues. */
+ wstate->mq_handle = shm_mq_attach(mq, seg, NULL);

Similar to above. If there is only one queue then the comment should
say queue instead of queues.

6.
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u", subid);
+ else
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication background apply worker for subscription %u ", subid);

No need for extra space after %u in the above code.

7.
+ launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ InvalidOid,
+ dsm_segment_handle(wstate->dsm_seg));
+
+ if (launched)
+ {
+ /* Wait for worker to attach. */
+ apply_bgworker_wait_for(wstate, APPLY_BGWORKER_ATTACHED);

In logicalrep_worker_launch(), we already seem to be waiting for
workers to attach via WaitForReplicationWorkerAttach(), so it is not
clear to me why we need to wait again? If there is a genuine reason
then it is better to add some comments to explain it. I think in some
way, we need to know if the worker is successfully attached and we may
not get that via WaitForReplicationWorkerAttach, so there needs to be
some way to know that but this doesn't sound like a very good idea. If
that understanding is correct then can we think of a better way?

8. I think we can simplify apply_bgworker_find_or_start by having
separate APIs for find and start. Most of the places need to use find
API except for the first stream. If we do that then I think you don't
need to make a hash entry unless we established ApplyBgworkerState
which currently looks odd as you need to remove the entry if we fail
to allocate the state.

9.
+ /*
+ * TO IMPROVE: Do we need to display the apply background worker's
+ * information in pg_stat_replication ?
+ */
+ UpdateWorkerStats(last_received, send_time, false);

In this do you mean to say pg_stat_subscription? If so, then to decide
whether we need to update stats here we should see what additional
information we can update here which is not possible via the main
apply worker?

10.
ApplyBgworkerMain
{
...
+ /* Load the subscription into persistent memory context. */
+ ApplyContext = AllocSetContextCreate(TopMemoryContext,
...

This comment seems to be copied from ApplyWorkerMain but doesn't apply here.

--
With Regards,
Amit Kapila.

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message James Coleman 2022-06-17 12:33:29 Re: PG 15 (and to a smaller degree 14) regression due to ExprEvalStep size
Previous Message Peter Eisentraut 2022-06-17 07:50:38 Re: "buffer too small" or "path too long"?