Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-18 11:44:59
Message-ID: CAA4eK1+_oHZHoDooAR7QcYD2CeTUWNSwkqVcLWC2iQijAJC4Cg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Aug 17, 2022 at 11:58 AM wangw(dot)fnst(at)fujitsu(dot)com
<wangw(dot)fnst(at)fujitsu(dot)com> wrote:
>
> Attach the new patches.
>

Few comments on v23-0001
=======================
1.
+ /*
+ * Attach to the dynamic shared memory segment for the parallel query, and
+ * find its table of contents.
+ *
+ * Note: at this point, we have not created any ResourceOwner in this
+ * process. This will result in our DSM mapping surviving until process
+ * exit, which is fine. If there were a ResourceOwner, it would acquire
+ * ownership of the mapping, but we have no need for that.
+ */

In the first sentence, instead of a parallel query, you need to use
parallel apply. I think we don't need to repeat the entire note as we
have in ParallelWorkerMain. You can say something like: "Like parallel
query, we don't need resource owner by this time. See
ParallelWorkerMain"

2.
+/*
+ * There are three fields in message: start_lsn, end_lsn and send_time. Because
+ * we have updated these statistics in apply worker, we could ignore these
+ * fields in apply background worker. (see function LogicalRepApplyLoop).
+ */
+#define SIZE_STATS_MESSAGE (2*sizeof(XLogRecPtr)+sizeof(TimestampTz))

The first sentence in the above comment isn't clear about which
message it talks about. I think it is about any message received by
this apply bgworker, if so, can we change it to: "There are three
fields in each message received by apply worker: start_lsn, end_lsn,
and send_time."

3.
+/*
+ * Return the apply background worker that will be used for the specified xid.
+ *
+ * If an apply background worker is found in the free list then re-use it,
+ * otherwise start a fresh one. Cache the worker ApplyBgworkersHash keyed by
+ * the specified xid.
+ */
+ApplyBgworkerInfo *
+apply_bgworker_start(TransactionId xid)

The first sentence should say apply background worker info. Can we
change the cache-related sentence in the above comment to "Cache the
worker info in ApplyBgworkersHash keyed by the specified xid."?

4.
/*
+ * We use first byte of message for additional communication between
+ * main Logical replication worker and apply background workers, so if
+ * it differs from 'w', then process it first.
+ */
+ c = pq_getmsgbyte(&s);
+ switch (c)
+ {
+ /* End message of streaming chunk */
+ case LOGICAL_REP_MSG_STREAM_STOP:
+ elog(DEBUG1, "[Apply BGW #%u] ended processing streaming chunk, "
+ "waiting on shm_mq_receive", shared->worker_id);
+

Why do we need special handling of LOGICAL_REP_MSG_STREAM_STOP message
here? Instead, why not let it get handled via apply_dispatch path? You
will require special handling for apply_bg_worker but I see other
messages do have similar handling.

5.
+ /*
+ * Now, we have initialized DSM. Attach to slot.
+ */
+ logicalrep_worker_attach(worker_slot);

Can we change this comment to something like: "Primary initialization
is complete. Now, we can attach to our slot.". IIRC, we have done it
after initialization to avoid some race conditions among leader apply
worker and this parallel apply worker. If so, can we explain the same
in the comments?

6.
+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a control region that contains a ApplyBgworkerShared,
+ * plus one region per message queue. There are as many message queues as
+ * the number of workers.
+ */
+static bool
+apply_bgworker_setup_dsm(ApplyBgworkerInfo *wstate)

I think the part of the comment: "There are as many message queues as
the number of workers." doesn't seem to fit atop this function as this
has nothing to do with the number of workers. It would be a good idea
to write something about what all is communicated via DSM in the
description you have written about apply bg workers in worker.c.

7.
+ /* Check if there are free worker slot(s). */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ napplyworkers = logicalrep_apply_bgworker_count(MyLogicalRepWorker->subid);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ if (napplyworkers >= max_apply_bgworkers_per_subscription)
+ return NULL;

Won't it be better to check this restriction in
logicalrep_worker_launch() as we do for tablesync workers? That way
all similar restrictions will be in one place.

8.
+ if (rel->state != SUBREL_STATE_READY)
+ ereport(ERROR,
+ (errmsg("logical replication apply workers for subscription \"%s\"
will restart",
+ MySubscription->name),
+ errdetail("Cannot handle streamed replication transaction by apply "
+ "background workers until all tables are synchronized")));

errdetail messages always end with a full stop.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Justin Pryzby 2022-08-18 13:34:06 pg15b3: crash in paralell vacuum
Previous Message Pavan Deolasee 2022-08-18 11:28:24 Re: Assertion failure on PG15 with modified test_shm_mq test