RE: Perform streaming logical transactions by background workers and parallel apply

From: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-15 05:15:24
Message-ID: OS3PR01MB6275F145878B4A44586C46CE9E499@OS3PR01MB6275.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thur, Sep 8, 2022 at 14:52 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> On Mon, Sep 5, 2022 at 6:34 PM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > Attach the correct patch set this time.
> >
>
> Few comments on v28-0001*:

Thanks for your comments.

> 1.
> + /* Whether the worker is processing a transaction. */
> + bool in_use;
>
> I think this same comment applies to in_parallel_apply_xact flag as
> well. How about: "Indicates whether the worker is available to be used
> for parallel apply transaction?"?
>
> 2.
> + /*
> + * Set this flag in the leader instead of the parallel apply worker to
> + * avoid the race condition where the leader has already started waiting
> + * for the parallel apply worker to finish processing the transaction(set
> + * the in_parallel_apply_xact to false) while the child process has not yet
> + * processed the first STREAM_START and has not set the
> + * in_parallel_apply_xact to true.
>
> I think part of this comment "(set the in_parallel_apply_xact to
> false)" is not necessary. It will be clear without that.
>
> 3.
> + /* Create entry for requested transaction. */
> + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_ENTER, &found);
> + if (found)
> + elog(ERROR, "hash table corrupted");
> ...
> ...
> + hash_search(ParallelApplyWorkersHash, &xid, HASH_REMOVE, NULL);
>
> It is better to have a similar elog for HASH_REMOVE case as well. We
> normally seem to have such elog for HASH_REMOVE.
>
> 4.
> * Parallel apply is not supported when subscribing to a publisher which
> + * cannot provide the abort_time, abort_lsn and the column information
> used
> + * to verify the parallel apply safety.
>
>
> In this comment, which column information are you referring to?
>
> 5.
> + /*
> + * Set in_parallel_apply_xact to true again as we only aborted the
> + * subtransaction and the top transaction is still in progress. No
> + * need to lock here because currently only the apply leader are
> + * accessing this flag.
> + */
> + winfo->shared->in_parallel_apply_xact = true;
>
> This theory sounds good to me but I think it is better to update/read
> this flag under spinlock as the patch is doing at a few other places.
> I think that will make the code easier to follow without worrying too
> much about such special cases. There are a few asserts as well which
> read this without lock, it would be better to change those as well.
>
> 6.
> + * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum
> protocol version
> + * with support for streaming large transactions using parallel apply
> + * workers. Introduced in PG16.
>
> How about changing it to something like:
> "LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum
> protocol
> version where we support applying large streaming transactions in
> parallel. Introduced in PG16."
>
> 7.
> + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + bool write_abort_lsn = (data->protocol_version >=
> + LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM);
>
> /*
> * The abort should happen outside streaming block, even for streamed
> @@ -1856,7 +1859,8 @@ pgoutput_stream_abort(struct
> LogicalDecodingContext *ctx,
> Assert(rbtxn_is_streamed(toptxn));
>
> OutputPluginPrepareWrite(ctx, true);
> - logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
> + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn, abort_lsn,
> + write_abort_lsn);
>
> I think we need to send additional information if the client has used
> the parallel streaming option. Also, let's keep sending subxid as we
> were doing previously and add additional parameters required. It may
> be better to name write_abort_lsn as abort_info.
>
> 8.
> + /*
> + * Check whether the publisher sends abort_lsn and abort_time.
> + *
> + * Note that the paralle apply worker is only started when the publisher
> + * sends abort_lsn and abort_time.
> + */
> + if (am_parallel_apply_worker() ||
> + walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
> + read_abort_lsn = true;
> +
> + logicalrep_read_stream_abort(s, &abort_data, read_abort_lsn);
>
> This check should match with the check for the write operation where
> we are checking the protocol version as well. There is a typo as well
> in the comments (/paralle/parallel).

Improved as suggested.

Attach the new patch set.

Regards,
Wang wei

Attachment Content-Type Size
v29-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 137.0 KB
v29-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 74.5 KB
v29-0003-Add-some-checks-before-using-parallel-apply-work.patch application/octet-stream 49.8 KB
v29-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 60.1 KB
v29-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 7.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message wangw.fnst@fujitsu.com 2022-09-15 05:17:20 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message Tom Lane 2022-09-15 05:10:16 Re: [RFC] building postgres with meson - v13