Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-08 06:51:49
Message-ID: CAA4eK1Lu-6oXMk7ZaGYLwm3CRLBuzueGbasyHnNpJxu6Mq3mmg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Sep 5, 2022 at 6:34 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> Attach the correct patch set this time.
>

Few comments on v28-0001*:
=======================
1.
+ /* Whether the worker is processing a transaction. */
+ bool in_use;

I think this same comment applies to in_parallel_apply_xact flag as
well. How about: "Indicates whether the worker is available to be used
for parallel apply transaction?"?

2.
+ /*
+ * Set this flag in the leader instead of the parallel apply worker to
+ * avoid the race condition where the leader has already started waiting
+ * for the parallel apply worker to finish processing the transaction(set
+ * the in_parallel_apply_xact to false) while the child process has not yet
+ * processed the first STREAM_START and has not set the
+ * in_parallel_apply_xact to true.

I think part of this comment "(set the in_parallel_apply_xact to
false)" is not necessary. It will be clear without that.

3.
+ /* Create entry for requested transaction. */
+ entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_ENTER, &found);
+ if (found)
+ elog(ERROR, "hash table corrupted");
...
...
+ hash_search(ParallelApplyWorkersHash, &xid, HASH_REMOVE, NULL);

It is better to have a similar elog for HASH_REMOVE case as well. We
normally seem to have such elog for HASH_REMOVE.

4.
* Parallel apply is not supported when subscribing to a publisher which
+ * cannot provide the abort_time, abort_lsn and the column information used
+ * to verify the parallel apply safety.

In this comment, which column information are you referring to?

5.
+ /*
+ * Set in_parallel_apply_xact to true again as we only aborted the
+ * subtransaction and the top transaction is still in progress. No
+ * need to lock here because currently only the apply leader are
+ * accessing this flag.
+ */
+ winfo->shared->in_parallel_apply_xact = true;

This theory sounds good to me but I think it is better to update/read
this flag under spinlock as the patch is doing at a few other places.
I think that will make the code easier to follow without worrying too
much about such special cases. There are a few asserts as well which
read this without lock, it would be better to change those as well.

6.
+ * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
+ * with support for streaming large transactions using parallel apply
+ * workers. Introduced in PG16.

How about changing it to something like:
"LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol
version where we support applying large streaming transactions in
parallel. Introduced in PG16."

7.
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ bool write_abort_lsn = (data->protocol_version >=
+ LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM);

/*
* The abort should happen outside streaming block, even for streamed
@@ -1856,7 +1859,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
Assert(rbtxn_is_streamed(toptxn));

OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
+ logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn, abort_lsn,
+ write_abort_lsn);

I think we need to send additional information if the client has used
the parallel streaming option. Also, let's keep sending subxid as we
were doing previously and add additional parameters required. It may
be better to name write_abort_lsn as abort_info.

8.
+ /*
+ * Check whether the publisher sends abort_lsn and abort_time.
+ *
+ * Note that the paralle apply worker is only started when the publisher
+ * sends abort_lsn and abort_time.
+ */
+ if (am_parallel_apply_worker() ||
+ walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
+ read_abort_lsn = true;
+
+ logicalrep_read_stream_abort(s, &abort_data, read_abort_lsn);

This check should match with the check for the write operation where
we are checking the protocol version as well. There is a typo as well
in the comments (/paralle/parallel).

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message John Naylor 2022-09-08 07:10:45 Re: [RFC] building postgres with meson - v12
Previous Message Julien Rouhaud 2022-09-08 06:50:12 Re: [PATCH] Query Jumbling for CALL and SET utility statements