Re: Perform streaming logical transactions by background workers and parallel apply

From: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-07 11:51:15
Message-ID: CAD21AoCZ3i9w1Rz-81Lv1QB+JGP60Ypiom4+wM9eP3aQTx0STQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Dec 5, 2022 at 1:29 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Sunday, December 4, 2022 7:17 PM houzj(dot)fnst(at)fujitsu(dot)com <houzj(dot)fnst(at)fujitsu(dot)com>
> >
> > Thursday, December 1, 2022 8:40 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> > wrote:
> > > Some other comments:
> > ...
> > Attach the new version patch set which addressed most of the comments
> > received so far except some comments being discussed[1].
> > [1] https://www.postgresql.org/message-id/OS0PR01MB57167BF64FC0891734C8E81A94149%40OS0PR01MB5716.jpnprd01.prod.outlook.com
>
> Attach a new version patch set which fixed a testcase failure on CFbot.

Here are some comments on v56 0001, 0002 patches. Please ignore
comments if you already incorporated them in v57.

+static void
+ProcessParallelApplyInterrupts(void)
+{
+ CHECK_FOR_INTERRUPTS();
+
+ if (ShutdownRequestPending)
+ {
+ ereport(LOG,
+ (errmsg("logical replication parallel
apply worker for subscrip
tion \"%s\" has finished",
+ MySubscription->name)));
+
+ apply_worker_clean_exit(false);
+ }
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+}

I personally think that we don't need to have a function to do only
these few things.

---
+/* Disallow streaming in-progress transactions. */
+#define SUBSTREAM_OFF 'f'
+
+/*
+ * Streaming in-progress transactions are written to a temporary file and
+ * applied only after the transaction is committed on upstream.
+ */
+#define SUBSTREAM_ON 't'
+
+/*
+ * Streaming in-progress transactions are applied immediately via a parallel
+ * apply worker.
+ */
+#define SUBSTREAM_PARALLEL 'p'
+

While these names look good to me, we already have the following
existing values:

*/
#define LOGICALREP_TWOPHASE_STATE_DISABLED 'd'
#define LOGICALREP_TWOPHASE_STATE_PENDING 'p'
#define LOGICALREP_TWOPHASE_STATE_ENABLED 'e'

/*
* The subscription will request the publisher to
* have any origin.
*/
#define LOGICALREP_ORIGIN_NONE "none"

/*
* The subscription will request the publisher to
* of their origin.
*/
#define LOGICALREP_ORIGIN_ANY "any"

Should we change the names to something like LOGICALREP_STREAM_PARALLEL?

---
+ * The lock graph for the above example will look as follows:
+ * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
+ * acquire the lock on the remote transaction) -> LA

and

+ * The lock graph for the above example will look as follows:
+ * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
+ * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
+ * lock) -> LA

"(waiting to acquire the lock on the remote transaction)" in the first
example and "(waiting to acquire the stream lock)" in the second
example is the same meaning, right? If so, I think we should use
either term for consistency.

---
+ bool write_abort_info = (data->streaming ==
SUBSTREAM_PARALLEL);

I think that instead of setting write_abort_info every time when
pgoutput_stream_abort() is called, we can set it once, probably in
PGOutputData, at startup.

---
server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options.proto.logical.proto_version =
+ server_version >= 160000 ?
LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
LOGICALREP_PROTO_VERSION_NUM;

Instead of always using the new protocol version, I think we can use
LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not
'parallel'. That way, we don't need to change protocl version check
logic in pgoutput.c and don't need to expose defGetStreamingMode().
What do you think?

---
When max_parallel_apply_workers_per_subscription is changed to a value
lower than the number of parallel worker running at that time, do we
need to stop extra workers?

---
If a value of max_parallel_apply_workers_per_subscription is not
sufficient, we get the LOG "out of parallel apply workers" every time
when the apply worker doesn't launch a worker. But do we really need
this log? It seems not consistent with
max_sync_workers_per_subscription behavior. I think we can check if
the number of running parallel workers is less than
max_parallel_apply_workers_per_subscription before calling
logicalrep_worker_launch(). What do you think?

---
+ if (server_version >= 160000 &&
+ MySubscription->stream == SUBSTREAM_PARALLEL)
+ {
+ options.proto.logical.streaming_str = pstrdup("parallel");
+ MyLogicalRepWorker->parallel_apply = true;
+ }
+ else if (server_version >= 140000 &&
+ MySubscription->stream != SUBSTREAM_OFF)
+ {
+ options.proto.logical.streaming_str = pstrdup("on");
+ MyLogicalRepWorker->parallel_apply = false;
+ }

I think we don't need to use pstrdup().

---
- BeginTransactionBlock();
- CommitTransactionCommand(); /* Completes the preceding Begin command. */
+ if (!IsTransactionBlock())
+ {
+ BeginTransactionBlock();
+ CommitTransactionCommand(); /* Completes the preceding
Begin command. */
+ }

Do we need this change? In my environment, 'make check-world' passes
without this change.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Langote 2022-12-07 11:54:36 moving extraUpdatedCols out of RangeTblEntry (into ModifyTable)
Previous Message Alvaro Herrera 2022-12-07 11:44:15 Re: ExecRTCheckPerms() and many prunable partitions