Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-10 09:40:26
Message-ID: CAHut+PttbA_iwSh3Fi+xQtxe5hDxL9Ksun8ktSJ9_ndW1p=rhg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are some review comments for the patch v20-0001:

======

1. doc/src/sgml/catalogs.sgml

+ <literal>p</literal> = apply changes directly using a background
+ worker, if available, otherwise, it behaves the same as 't'

The different char values 'f','t','p' are separated by comma (,) in
the list, which is normal for the pgdocs AFAIK. However, because of
this I don't think it is a good idea to use those other commas within
the description for 'p', I suggest you remove those ones to avoid
ambiguity with the separators.

======

2. doc/src/sgml/protocol.sgml

@@ -3096,7 +3096,7 @@ psql "dbname=postgres replication=database" -c
"IDENTIFY_SYSTEM;"
<listitem>
<para>
Protocol version. Currently versions <literal>1</literal>,
<literal>2</literal>,
- and <literal>3</literal> are supported.
+ <literal>3</literal> and <literal>4</literal> are supported.
</para>

Put a comma after the penultimate value like it had before.

======

3. src/backend/replication/logical/applybgworker.c - <general>

There are multiple function comments and other code comments in this
file that are missing a terminating period (.)

======

4. src/backend/replication/logical/applybgworker.c - apply_bgworker_start

+/*
+ * Try to get a free apply background worker.
+ *
+ * If there is at least one worker in the free list, then take one. Otherwise,
+ * try to start a new apply background worker. If successful, cache it in
+ * ApplyBgworkersHash keyed by the specified xid.
+ */
+ApplyBgworkerState *
+apply_bgworker_start(TransactionId xid)

SUGGESTION (for function comment)
Return the apply background worker that will be used for the specified xid.

If an apply background worker is found in the free list then re-use
it, otherwise start a fresh one. Cache the worker ApplyBgworkersHash
keyed by the specified xid.

~~~

5.

+ /* Try to get a free apply background worker */
+ if (list_length(ApplyBgworkersFreeList) > 0)

if (list_length(ApplyBgworkersFreeList) > 0)

AFAIK a non-empty list is guaranteed to be not NIL, and an empty list
is guaranteed to be NIL. So if you want to the above can simply be
written as:

if (ApplyBgworkersFreeList)

~~~

6. src/backend/replication/logical/applybgworker.c - apply_bgworker_find

+/*
+ * Try to look up worker assigned before (see function apply_bgworker_get_free)
+ * inside ApplyBgworkersHash for requested xid.
+ */
+ApplyBgworkerState *
+apply_bgworker_find(TransactionId xid)

SUGGESTION (for function comment)
Find the worker previously assigned/cached for this xid. (see function
apply_bgworker_start)

~~~

7.

+ Assert(status == APPLY_BGWORKER_BUSY);
+
+ return entry->wstate;
+ }
+ else
+ return NULL;

IMO here it is better to just remove that 'else' and unconditionally
return NULL at the end of this function.

~~~

8. src/backend/replication/logical/applybgworker.c -
apply_bgworker_subxact_info_add

+ * Inside apply background worker we can figure out that new subtransaction was
+ * started if new change arrived with different xid. In that case we can define
+ * named savepoint, so that we were able to commit/rollback it separately
+ * later.
+ * Special case is if the first change comes from subtransaction, then
+ * we check that current_xid differs from stream_xid.
+ */
+void
+apply_bgworker_subxact_info_add(TransactionId current_xid)

It is not quite English. Can you improve it a bit?

SUGGESTION (maybe like this?)
The apply background worker can figure out if a new subtransaction was
started by checking if the new change arrived with different xid. In
that case define a named savepoint, so that we are able to
commit/rollback it separately later. A special case is when the first
change comes from subtransaction – this is determined by checking if
the current_xid differs from stream_xid.

======

9. src/backend/replication/logical/launcher.c - WaitForReplicationWorkerAttach

+ *
+ * Return false if the attach fails. Otherwise return true.
*/
-static void
+static bool
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,

Why not just say "Return whether the attach was successful."

~~~

10. src/backend/replication/logical/launcher.c - logicalrep_worker_stop

+ /* Found the main worker, then try to stop it. */
+ if (worker)
+ logicalrep_worker_stop_internal(worker);

IMO the comment is kind of pointless because it only says what the
code is clearly doing. If you really wanted to reinforce this worker
is a main apply worker then you can do that with code like:

if (worker)
{
Assert(!worker->subworker);
logicalrep_worker_stop_internal(worker);
}

~~~

11. src/backend/replication/logical/launcher.c - logicalrep_worker_detach

@@ -599,6 +632,29 @@ logicalrep_worker_attach(int slot)
static void
logicalrep_worker_detach(void)
{
+ /*
+ * This is the main apply worker, stop all the apply background workers we
+ * started before.
+ */
+ if (!MyLogicalRepWorker->subworker)

SUGGESTION (for comment)
This is the main apply worker. Stop all apply background workers
previously started from here.

~~~

12 src/backend/replication/logical/launcher.c - logicalrep_apply_bgworker_count

+/*
+ * Count the number of registered (not necessarily running) apply background
+ * workers for a subscription.
+ */
+int
+logicalrep_apply_bgworker_count(Oid subid)

SUGGESTION
Count the number of registered (but not necessarily running) apply
background workers for a subscription.

~~~

13.

+ /* Search for attached worker for a given subscription id. */
+ for (i = 0; i < max_logical_replication_workers; i++)

SUGGESTION
Scan all attached apply background workers, only counting those which
have the given subscription id.

======

14. src/backend/replication/logical/worker.c - apply_error_callback

+ {
+ if (errarg->remote_attnum < 0)
+ {
+ if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\"
during \"%s\" for replication target relation \"%s.%s\" in transaction
%u",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\"
during \"%s\" for replication target relation \"%s.%s\" in transaction
%u finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
+ }
+ else
+ {
+ if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\"
during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
in transaction %u",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\"
during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
in transaction %u finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
+ }
+ }

There is quite a lot of common code here:

"processing remote data for replication origin \"%s\" during \"%s\"
for replication target relation \"%s.%s\"

errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,

Is it worth trying to extract that common part to keep this code
shorter? E.g. It could be easily done just with some #defines

======

15. src/include/replication/worker_internal.h

+ /* proto version of publisher. */
+ uint32 proto_version;

SUGGESTION
Protocol version of publisher

~~~

16.

+ /* id of apply background worker */
+ uint32 worker_id;

Uppercase comment

~~~

17.

+/*
+ * Struct for maintaining an apply background worker.
+ */
+typedef struct ApplyBgworkerState

I'm not sure what this comment means. Perhaps there are some words missing?

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Bharath Rupireddy 2022-08-10 09:58:25 Re: Avoid erroring out when unable to remove or parse logical rewrite files to save checkpoint work
Previous Message Amit Kapila 2022-08-10 09:22:36 Re: hash_xlog_split_allocate_page: failed to acquire cleanup lock