RE: Perform streaming logical transactions by background workers and parallel apply

From: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-19 03:25:31
Message-ID: OS3PR01MB6275EFC4B707650DAB9392859E4D9@OS3PR01MB6275.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Sep 15, 2022 at 19:40 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> On Thu, Sep 15, 2022 at 10:45 AM wangw(dot)fnst(at)fujitsu(dot)com
> <wangw(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > Attach the new patch set.
> >
>
> Review of v29-0001*

Thanks for your comments and patch!

> ==================
> 1.
> +parallel_apply_find_worker(TransactionId xid)
> {
> ...
> + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_FIND, &found);
> + if (found)
> + {
> + /* If any workers (or the postmaster) have died, we have failed. */
> + if (entry->winfo->error_mq_handle == NULL)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("lost connection to parallel apply worker")));
> ...
> }
>
> I think the above comment is incorrect because if the postmaster would
> have died then you wouldn't have found the entry in the hash table.
> How about something like: "We can't proceed if the parallel streaming
> worker has already exited."

Fixed.

> 2.
> +/*
> + * Find the previously assigned worker for the given transaction, if any.
> + */
> +ParallelApplyWorkerInfo *
> +parallel_apply_find_worker(TransactionId xid)
>
> No need to use word 'previously' in the above sentence.

Improved.

> 3.
> + * We need one key to register the location of the header, and we need
> + * another key to track the location of the message queue.
> + */
> + shm_toc_initialize_estimator(&e);
> + shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
> + shm_toc_estimate_chunk(&e, queue_size);
> + shm_toc_estimate_chunk(&e, error_queue_size);
> +
> + shm_toc_estimate_keys(&e, 3);
>
> Overall, three keys are used but the comment indicates two. You forgot
> to mention about error_queue.

Fixed.

> 4.
> + if (launched)
> + ParallelApplyWorkersList = lappend(ParallelApplyWorkersList, winfo);
> + else
> + {
> + shm_mq_detach(winfo->mq_handle);
> + shm_mq_detach(winfo->error_mq_handle);
> + dsm_detach(winfo->dsm_seg);
> + pfree(winfo);
> +
> + winfo = NULL;
> + }
>
> A. The code used in the else part to free worker info is the same as
> what is used in parallel_apply_free_worker. Can we move this to a
> separate function say parallel_apply_free_worker_info()?
> B. I think it will be better if you use {} for if branch to make it
> look consistent with else branch.

Improved.

> 5.
> + * case define a named savepoint, so that we are able to commit/rollback it
> + * separately later.
> + */
> +void
> +parallel_apply_subxact_info_add(TransactionId current_xid)
>
> I don't see the need of commit in the above message. So, we can
> slightly modify it to: "... so that we are able to rollback to it
> separately later."

Improved.

> 6.
> + for (i = list_length(subxactlist) - 1; i >= 0; i--)
> + {
> + xid = list_nth_xid(subxactlist, i);
> ...
> ...
>
> +/*
> + * Return the TransactionId value contained in the n'th element of the
> + * specified list.
> + */
> +static inline TransactionId
> +list_nth_xid(const List *list, int n)
> +{
> + Assert(IsA(list, XidList));
> + return lfirst_xid(list_nth_cell(list, n));
> +}
>
> I am not really sure that we need a new list function to use for this
> place. Can't we directly use lfirst_xid(list_nth_cell) instead?

Improved.

> 7.
> +void
> +parallel_apply_replorigin_setup(void)
> +{
> + RepOriginId originid;
> + char originname[NAMEDATALEN];
> + bool started_tx = false;
> +
> + /* This function might be called inside or outside of transaction. */
> + if (!IsTransactionState())
> + {
> + StartTransactionCommand();
> + started_tx = true;
> + }
>
> Is there a place in the patch where this function will be called
> without having an active transaction state? If so, then this coding is
> fine but if not, then I suggest keeping an assert for transaction
> state here. The same thing applies to
> parallel_apply_replorigin_reset() as well.

When using parallel apply, only the parallel apply worker is in a transaction
while the leader apply worker is not. So when invoking function
parallel_apply_replorigin_setup() in the leader apply worker, we need to start
a transaction block.

> 8.
> + *
> + * If write_abort_lsn is true, send the abort_lsn and abort_time fields,
> + * otherwise don't.
> */
> void
> logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
> - TransactionId subxid)
> + TransactionId subxid, XLogRecPtr abort_lsn,
> + TimestampTz abort_time, bool abort_info)
>
> In the comment, the name of the variable needs to be updated.

Fixed.

> 9.
> +TransactionId stream_xid = InvalidTransactionId;
>
> -static TransactionId stream_xid = InvalidTransactionId;
> ...
> ...
> +void
> +parallel_apply_subxact_info_add(TransactionId current_xid)
> +{
> + if (current_xid != stream_xid &&
> + !list_member_xid(subxactlist, current_xid))
>
> It seems you have changed the scope of stream_xid to use it in
> parallel_apply_subxact_info_add(). Won't it be better to pass it as a
> parameter (say top_xid)?

Improved.

> 10.
> --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> @@ -20,6 +20,7 @@
> #include <sys/time.h>
>
> #include "access/xlog.h"
> +#include "catalog/pg_subscription.h"
> #include "catalog/pg_type.h"
> #include "common/connect.h"
> #include "funcapi.h"
> @@ -443,9 +444,14 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
> appendStringInfo(&cmd, "proto_version '%u'",
> options->proto.logical.proto_version);
>
> - if (options->proto.logical.streaming &&
> - PQserverVersion(conn->streamConn) >= 140000)
> - appendStringInfoString(&cmd, ", streaming 'on'");
> + if (options->proto.logical.streaming != SUBSTREAM_OFF)
> + {
> + if (PQserverVersion(conn->streamConn) >= 160000 &&
> + options->proto.logical.streaming == SUBSTREAM_PARALLEL)
> + appendStringInfoString(&cmd, ", streaming 'parallel'");
> + else if (PQserverVersion(conn->streamConn) >= 140000)
> + appendStringInfoString(&cmd, ", streaming 'on'");
> + }
>
> It doesn't seem like a good idea to expose subscription options here.
> Can we think of having char *streaming_option instead of the current
> streaming parameter which is filled by the caller and used here
> directly?

Improved.

> 11. The error message used in pgoutput_startup() seems to be better
> than the current messages used in that function but it is better to be
> consistent with other messages. There is a discussion in the email
> thread [1] on improving those messages, so kindly suggest there.

Okay, I will try to modify the two messages and share them in the thread you
mentioned.

> 12. In addition to the above, I have changed/added a few comments in
> the attached patch.

Improved as suggested.

Regards,
Wang wei

Attachment Content-Type Size
v30-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 136.8 KB
v30-0002-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 74.5 KB
v30-0003-Add-some-checks-before-using-parallel-apply-work.patch application/octet-stream 49.9 KB
v30-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 60.2 KB
v30-0005-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 7.7 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2022-09-19 03:39:38 Re: Tree-walker callbacks vs -Wdeprecated-non-prototype
Previous Message Zhang Mingli 2022-09-19 03:05:24 Re: Fix typos in code comments