RE: Perform streaming logical transactions by background workers and parallel apply

From: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-22 03:29:13
Message-ID: OS3PR01MB627567CF4B96A23DAFAF77269E4E9@OS3PR01MB6275.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Sep 21, 2022 at 17:25 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> Here are some review comments for patch v30-0001.

Thanks for your comments.

> ======
>
> 1. Commit message
>
> In addition, the patch extends the logical replication STREAM_ABORT message
> so
> that abort_time and abort_lsn can also be sent which can be used to update the
> replication origin in parallel apply worker when the streaming transaction is
> aborted. Because this message extension is needed to support parallel
> streaming, meaning that parallel streaming is not supported for publications on
> servers < PG16.
>
> "meaning that parallel streaming is not supported" -> "parallel
> streaming is not supported"

Improved as suggested.

> ======
>
> 2. doc/src/sgml/logical-replication.sgml
>
> @@ -1611,8 +1622,12 @@ CONTEXT: processing remote data for
> replication origin "pg_16395" during "INSER
> to the subscriber, plus some reserve for table synchronization.
> <varname>max_logical_replication_workers</varname> must be set to at
> least
> the number of subscriptions, again plus some reserve for the table
> - synchronization. Additionally the
> <varname>max_worker_processes</varname>
> - may need to be adjusted to accommodate for replication workers, at least
> + synchronization. In addition, if the subscription parameter
> + <literal>streaming</literal> is set to <literal>parallel</literal>, please
> + increase <literal>max_logical_replication_workers</literal> according to
> + the desired number of parallel apply workers. Additionally the
> + <varname>max_worker_processes</varname> may need to be adjusted to
> + accommodate for replication workers, at least
> (<varname>max_logical_replication_workers</varname>
> + <literal>1</literal>). Note that some extensions and parallel queries
> also take worker slots from <varname>max_worker_processes</varname>.
>
> IMO it looks a bit strange to have "In addition" followed by "Additionally".
>
> Also, "to accommodate for replication workers"? seems like a typo (but
> it is not caused by your patch)
>
> BEFORE
> In addition, if the subscription parameter streaming is set to
> parallel, please increase max_logical_replication_workers according to
> the desired number of parallel apply workers.
>
> AFTER (???)
> If the subscription parameter streaming is set to parallel,
> max_logical_replication_workers should be increased according to the
> desired number of parallel apply workers.

=> Reword
Improved as suggested.

=> typo?
Sorry, I am not sure. Do you mean
s/replication workers/workers for subscriptions/ or something else?
I think we should improve it in a new thread.

> ======
>
> 4. .../replication/logical/applyparallelworker.c - parallel_apply_free_worker
>
> + winfo->in_use = false;
> +
> + /* Are there enough workers in the pool? */
> + if (napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
> + {
>
> I felt the comment/logic about "enough" needs a bit more description.
> At least it should say to refer to the more detailed explanation atop
> worker.c

Added related comment atop this function.

> ======
>
> 5. .../replication/logical/applyparallelworker.c - parallel_apply_setup_dsm
>
> + /*
> + * Estimate how much shared memory we need.
> + *
> + * Because the TOC machinery may choose to insert padding of oddly-sized
> + * requests, we must estimate each chunk separately.
> + *
> + * We need one key to register the location of the header, and we need two
> + * other keys to track of the locations of the message queue and the error
> + * message queue.
> + */
>
> "track of" -> "keep track of" ?

Improved.

> ======
>
> 6. src/backend/replication/logical/launcher.c - logicalrep_worker_detach
>
> logicalrep_worker_detach(void)
> {
> + /* Stop the parallel apply workers. */
> + if (!am_parallel_apply_worker() && !am_tablesync_worker())
> + {
> + List *workers;
> + ListCell *lc;
>
> The condition is not very obvious. This is why I previously suggested
> adding another macro/function like 'isLeaderApplyWorker'. In the
> absence of that, then I think the comment needs to be more
> descriptive.
>
> SUGGESTION
> If this is the leader apply worker then stop the parallel apply workers.

Added the new function am_leader_apply_worker.

> ======
>
> 7. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort
>
> void
> logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
> - TransactionId subxid)
> + TransactionId subxid, XLogRecPtr abort_lsn,
> + TimestampTz abort_time, bool abort_info)
> {
> pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
>
> @@ -1175,19 +1179,40 @@ logicalrep_write_stream_abort(StringInfo out,
> TransactionId xid,
> /* transaction ID */
> pq_sendint32(out, xid);
> pq_sendint32(out, subxid);
> +
> + if (abort_info)
> + {
> + pq_sendint64(out, abort_lsn);
> + pq_sendint64(out, abort_time);
> + }
>
>
> The new param name 'abort_info' seems misleading.
>
> Maybe a name like 'write_abort_info' is better?

Improved as suggested.

> ~~~
>
> 8. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort
>
> +logicalrep_read_stream_abort(StringInfo in,
> + LogicalRepStreamAbortData *abort_data,
> + bool read_abort_lsn)
> {
> - Assert(xid && subxid);
> + Assert(abort_data);
> +
> + abort_data->xid = pq_getmsgint(in, 4);
> + abort_data->subxid = pq_getmsgint(in, 4);
>
> - *xid = pq_getmsgint(in, 4);
> - *subxid = pq_getmsgint(in, 4);
> + if (read_abort_lsn)
> + {
> + abort_data->abort_lsn = pq_getmsgint64(in);
> + abort_data->abort_time = pq_getmsgint64(in);
> + }
>
> This name 'read_abort_lsn' is inconsistent with the 'abort_info' of
> the logicalrep_write_stream_abort.
>
> I suggest change these to 'read_abort_info/write_abort_info'

Improved as suggested.

> ======
>
> 9. src/backend/replication/logical/worker.c - file header comment
>
> + * information is added to the ParallelApplyWorkersList. Once the worker
> + * finishes applying the transaction, we mark it available for use. Now,
> + * before starting a new worker to apply the streaming transaction, we check
> + * the list and use any worker, if available. Note that we maintain a maximum
>
> 9a.
> "available for use." -> "available for re-use."
>
> ~
>
> 9b.
> "we check the list and use any worker, if available" -> "we check the
> list for any available worker"

Improved as suggested.

> ~~~
>
> 10. src/backend/replication/logical/worker.c - handle_streamed_transaction
>
> + /* write the change to the current file */
> + stream_write_change(action, s);
> + return true;
>
> Uppercase the comment.

Improved as suggested.

> ~~~
>
> 11. src/backend/replication/logical/worker.c - apply_handle_stream_abort
>
> +static void
> +apply_handle_stream_abort(StringInfo s)
> +{
> + TransactionId xid;
> + TransactionId subxid;
> + LogicalRepStreamAbortData abort_data;
> + bool read_abort_lsn = false;
> + ParallelApplyWorkerInfo *winfo = NULL;
> + TransApplyAction apply_action;
>
> The variable 'read_abort_lsn' name ought to be changed to match
> consistently the parameter name.

Improved as suggested.

> ======
>
> 12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort
>
> @@ -1843,6 +1850,8 @@ pgoutput_stream_abort(struct
> LogicalDecodingContext *ctx,
> XLogRecPtr abort_lsn)
> {
> ReorderBufferTXN *toptxn;
> + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + bool abort_info = (data->streaming == SUBSTREAM_PARALLEL);
>
> The variable 'abort_info' name ought to be changed to be
> 'write_abort_info' (as suggested above) to match consistently the
> parameter name.

Improved as suggested.

Attach the new patch set.

Regards,
Wang wei

Attachment Content-Type Size
v33-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 138.9 KB
v33-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 74.5 KB
v33-0003-Add-some-checks-before-using-parallel-apply-work.patch application/octet-stream 49.8 KB
v33-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 60.2 KB
v33-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 6.3 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message wangw.fnst@fujitsu.com 2022-09-22 03:36:02 RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher
Previous Message osumi.takamichi@fujitsu.com 2022-09-22 03:22:04 RE: Allow logical replication to copy tables in binary format