Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-21 09:25:16
Message-ID: CAHut+PuVjRgGr4saN7qwq0oB8DANHVR7UfDiciB1Q3cYN54F6A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are some review comments for patch v30-0001.

======

1. Commit message

In addition, the patch extends the logical replication STREAM_ABORT message so
that abort_time and abort_lsn can also be sent which can be used to update the
replication origin in parallel apply worker when the streaming transaction is
aborted. Because this message extension is needed to support parallel
streaming, meaning that parallel streaming is not supported for publications on
servers < PG16.

"meaning that parallel streaming is not supported" -> "parallel
streaming is not supported"

======

2. doc/src/sgml/logical-replication.sgml

@@ -1611,8 +1622,12 @@ CONTEXT: processing remote data for
replication origin "pg_16395" during "INSER
to the subscriber, plus some reserve for table synchronization.
<varname>max_logical_replication_workers</varname> must be set to at least
the number of subscriptions, again plus some reserve for the table
- synchronization. Additionally the <varname>max_worker_processes</varname>
- may need to be adjusted to accommodate for replication workers, at least
+ synchronization. In addition, if the subscription parameter
+ <literal>streaming</literal> is set to <literal>parallel</literal>, please
+ increase <literal>max_logical_replication_workers</literal> according to
+ the desired number of parallel apply workers. Additionally the
+ <varname>max_worker_processes</varname> may need to be adjusted to
+ accommodate for replication workers, at least
(<varname>max_logical_replication_workers</varname>
+ <literal>1</literal>). Note that some extensions and parallel queries
also take worker slots from <varname>max_worker_processes</varname>.

IMO it looks a bit strange to have "In addition" followed by "Additionally".

Also, "to accommodate for replication workers"? seems like a typo (but
it is not caused by your patch)

BEFORE
In addition, if the subscription parameter streaming is set to
parallel, please increase max_logical_replication_workers according to
the desired number of parallel apply workers.

AFTER (???)
If the subscription parameter streaming is set to parallel,
max_logical_replication_workers should be increased according to the
desired number of parallel apply workers.

======

3. .../replication/logical/applyparallelworker.c - parallel_apply_can_start

+/*
+ * Returns true, if it is allowed to start a parallel apply worker, false,
+ * otherwise.
+ */
+static bool
+parallel_apply_can_start(TransactionId xid)

Seems a slightly complicated comment for a simple boolean function.

SUGGESTION
Returns true/false if it is OK to start a parallel apply worker.

======

4. .../replication/logical/applyparallelworker.c - parallel_apply_free_worker

+ winfo->in_use = false;
+
+ /* Are there enough workers in the pool? */
+ if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
+ {

I felt the comment/logic about "enough" needs a bit more description.
At least it should say to refer to the more detailed explanation atop
worker.c

======

5. .../replication/logical/applyparallelworker.c - parallel_apply_setup_dsm

+ /*
+ * Estimate how much shared memory we need.
+ *
+ * Because the TOC machinery may choose to insert padding of oddly-sized
+ * requests, we must estimate each chunk separately.
+ *
+ * We need one key to register the location of the header, and we need two
+ * other keys to track of the locations of the message queue and the error
+ * message queue.
+ */

"track of" -> "keep track of" ?

======

6. src/backend/replication/logical/launcher.c - logicalrep_worker_detach

logicalrep_worker_detach(void)
{
+ /* Stop the parallel apply workers. */
+ if (!am_parallel_apply_worker() && !am_tablesync_worker())
+ {
+ List *workers;
+ ListCell *lc;

The condition is not very obvious. This is why I previously suggested
adding another macro/function like 'isLeaderApplyWorker'. In the
absence of that, then I think the comment needs to be more
descriptive.

SUGGESTION
If this is the leader apply worker then stop the parallel apply workers.

======

7. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort

void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
- TransactionId subxid)
+ TransactionId subxid, XLogRecPtr abort_lsn,
+ TimestampTz abort_time, bool abort_info)
{
pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);

@@ -1175,19 +1179,40 @@ logicalrep_write_stream_abort(StringInfo out,
TransactionId xid,
/* transaction ID */
pq_sendint32(out, xid);
pq_sendint32(out, subxid);
+
+ if (abort_info)
+ {
+ pq_sendint64(out, abort_lsn);
+ pq_sendint64(out, abort_time);
+ }

The new param name 'abort_info' seems misleading.

Maybe a name like 'write_abort_info' is better?

~~~

8. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort

+logicalrep_read_stream_abort(StringInfo in,
+ LogicalRepStreamAbortData *abort_data,
+ bool read_abort_lsn)
{
- Assert(xid && subxid);
+ Assert(abort_data);
+
+ abort_data->xid = pq_getmsgint(in, 4);
+ abort_data->subxid = pq_getmsgint(in, 4);

- *xid = pq_getmsgint(in, 4);
- *subxid = pq_getmsgint(in, 4);
+ if (read_abort_lsn)
+ {
+ abort_data->abort_lsn = pq_getmsgint64(in);
+ abort_data->abort_time = pq_getmsgint64(in);
+ }

This name 'read_abort_lsn' is inconsistent with the 'abort_info' of
the logicalrep_write_stream_abort.

I suggest change these to 'read_abort_info/write_abort_info'

======

9. src/backend/replication/logical/worker.c - file header comment

+ * information is added to the ParallelApplyWorkersList. Once the worker
+ * finishes applying the transaction, we mark it available for use. Now,
+ * before starting a new worker to apply the streaming transaction, we check
+ * the list and use any worker, if available. Note that we maintain a maximum

9a.
"available for use." -> "available for re-use."

~

9b.
"we check the list and use any worker, if available" -> "we check the
list for any available worker"

~~~

10. src/backend/replication/logical/worker.c - handle_streamed_transaction

+ /* write the change to the current file */
+ stream_write_change(action, s);
+ return true;

Uppercase the comment.

~~~

11. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+static void
+apply_handle_stream_abort(StringInfo s)
+{
+ TransactionId xid;
+ TransactionId subxid;
+ LogicalRepStreamAbortData abort_data;
+ bool read_abort_lsn = false;
+ ParallelApplyWorkerInfo *winfo = NULL;
+ TransApplyAction apply_action;

The variable 'read_abort_lsn' name ought to be changed to match
consistently the parameter name.

======

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort

@@ -1843,6 +1850,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn)
{
ReorderBufferTXN *toptxn;
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ bool abort_info = (data->streaming == SUBSTREAM_PARALLEL);

The variable 'abort_info' name ought to be changed to be
'write_abort_info' (as suggested above) to match consistently the
parameter name.

======

13. src/include/replication/worker_internal.h

+ /*
+ * Indicates whether the worker is available to be used for parallel apply
+ * transaction?
+ */
+ bool in_use;

This comment seems backward for this member's name.

SUGGESTION (something like...)
Indicates whether this ParallelApplyWorkerInfo is currently being used
by a parallel apply worker processing a transaction. (If this flag is
false then it means the ParallelApplyWorkerInfo is available for
re-use by another parallel apply worker.)

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Thomas Munro 2022-09-21 09:32:25 Re: SLRUs in the main buffer pool, redux
Previous Message Alena Rybakina 2022-09-21 08:30:25 Re: RFC: Logging plan of the running query