RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-11 08:06:07
Message-ID: OS0PR01MB57169AEA399C6DC370EAF23B94649@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wednesday, August 10, 2022 5:40 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> Here are some review comments for the patch v20-0001:
> ======
>
> 1. doc/src/sgml/catalogs.sgml
>
> + <literal>p</literal> = apply changes directly using a background
> + worker, if available, otherwise, it behaves the same as 't'
>
> The different char values 'f','t','p' are separated by comma (,) in
> the list, which is normal for the pgdocs AFAIK. However, because of
> this I don't think it is a good idea to use those other commas within
> the description for 'p', I suggest you remove those ones to avoid
> ambiguity with the separators.

Changed.

> ======
>
> 2. doc/src/sgml/protocol.sgml
>
> @@ -3096,7 +3096,7 @@ psql "dbname=postgres replication=database" -c
> "IDENTIFY_SYSTEM;"
> <listitem>
> <para>
> Protocol version. Currently versions <literal>1</literal>,
> <literal>2</literal>,
> - and <literal>3</literal> are supported.
> + <literal>3</literal> and <literal>4</literal> are supported.
> </para>
>
> Put a comma after the penultimate value like it had before.
>

Changed.

> ======
>
> 3. src/backend/replication/logical/applybgworker.c - <general>
>
> There are multiple function comments and other code comments in this
> file that are missing a terminating period (.)
>
> ======
>

Changed.

> 4. src/backend/replication/logical/applybgworker.c - apply_bgworker_start
>
> +/*
> + * Try to get a free apply background worker.
> + *
> + * If there is at least one worker in the free list, then take one. Otherwise,
> + * try to start a new apply background worker. If successful, cache it in
> + * ApplyBgworkersHash keyed by the specified xid.
> + */
> +ApplyBgworkerState *
> +apply_bgworker_start(TransactionId xid)
>
> SUGGESTION (for function comment)
> Return the apply background worker that will be used for the specified xid.
>
> If an apply background worker is found in the free list then re-use
> it, otherwise start a fresh one. Cache the worker ApplyBgworkersHash
> keyed by the specified xid.
>
> ~~~
>

Changed.

> 5.
>
> + /* Try to get a free apply background worker */
> + if (list_length(ApplyBgworkersFreeList) > 0)
>
> if (list_length(ApplyBgworkersFreeList) > 0)
>
> AFAIK a non-empty list is guaranteed to be not NIL, and an empty list
> is guaranteed to be NIL. So if you want to the above can simply be
> written as:
>
> if (ApplyBgworkersFreeList)
>

Both ways are fine to me, so I kept the current style.

> ~~~
>
> 6. src/backend/replication/logical/applybgworker.c - apply_bgworker_find
>
> +/*
> + * Try to look up worker assigned before (see function
> apply_bgworker_get_free)
> + * inside ApplyBgworkersHash for requested xid.
> + */
> +ApplyBgworkerState *
> +apply_bgworker_find(TransactionId xid)
>
> SUGGESTION (for function comment)
> Find the worker previously assigned/cached for this xid. (see function
> apply_bgworker_start)
>

Changed.

> ~~~
>
> 7.
>
> + Assert(status == APPLY_BGWORKER_BUSY);
> +
> + return entry->wstate;
> + }
> + else
> + return NULL;
>
> IMO here it is better to just remove that 'else' and unconditionally
> return NULL at the end of this function.
>

Changed.

> ~~~
>
> 8. src/backend/replication/logical/applybgworker.c -
> apply_bgworker_subxact_info_add
>
> + * Inside apply background worker we can figure out that new subtransaction
> was
> + * started if new change arrived with different xid. In that case we can define
> + * named savepoint, so that we were able to commit/rollback it separately
> + * later.
> + * Special case is if the first change comes from subtransaction, then
> + * we check that current_xid differs from stream_xid.
> + */
> +void
> +apply_bgworker_subxact_info_add(TransactionId current_xid)
>
> It is not quite English. Can you improve it a bit?
>
> SUGGESTION (maybe like this?)
> The apply background worker can figure out if a new subtransaction was
> started by checking if the new change arrived with different xid. In
> that case define a named savepoint, so that we are able to
> commit/rollback it separately later. A special case is when the first
> change comes from subtransaction – this is determined by checking if
> the current_xid differs from stream_xid.
>

Changed.

> ======
>
> 9. src/backend/replication/logical/launcher.c -
> WaitForReplicationWorkerAttach
>
> + *
> + * Return false if the attach fails. Otherwise return true.
> */
> -static void
> +static bool
> WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
>
> Why not just say "Return whether the attach was successful."
>

Changed.

> ~~~
>
> 10. src/backend/replication/logical/launcher.c - logicalrep_worker_stop
>
> + /* Found the main worker, then try to stop it. */
> + if (worker)
> + logicalrep_worker_stop_internal(worker);
>
> IMO the comment is kind of pointless because it only says what the
> code is clearly doing. If you really wanted to reinforce this worker
> is a main apply worker then you can do that with code like:
>
> if (worker)
> {
> Assert(!worker->subworker);
> logicalrep_worker_stop_internal(worker);
> }
>

Changed.

> ~~~
>
> 11. src/backend/replication/logical/launcher.c - logicalrep_worker_detach
>
> @@ -599,6 +632,29 @@ logicalrep_worker_attach(int slot)
> static void
> logicalrep_worker_detach(void)
> {
> + /*
> + * This is the main apply worker, stop all the apply background workers we
> + * started before.
> + */
> + if (!MyLogicalRepWorker->subworker)
>
> SUGGESTION (for comment)
> This is the main apply worker. Stop all apply background workers
> previously started from here.
>

Changed.

> ~~~
>
> 12 src/backend/replication/logical/launcher.c -
> logicalrep_apply_bgworker_count
>
> +/*
> + * Count the number of registered (not necessarily running) apply background
> + * workers for a subscription.
> + */
> +int
> +logicalrep_apply_bgworker_count(Oid subid)
>
> SUGGESTION
> Count the number of registered (but not necessarily running) apply
> background workers for a subscription.
>

Changed.

> ~~~
>
> 13.
>
> + /* Search for attached worker for a given subscription id. */
> + for (i = 0; i < max_logical_replication_workers; i++)
>
> SUGGESTION
> Scan all attached apply background workers, only counting those which
> have the given subscription id.
>

Changed.

> ======
>
> 14. src/backend/replication/logical/worker.c - apply_error_callback
>
> + {
> + if (errarg->remote_attnum < 0)
> + {
> + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" in transaction
> %u",
> + errarg->origin_name,
> + logicalrep_message_type(errarg->command),
> + errarg->rel->remoterel.nspname,
> + errarg->rel->remoterel.relname,
> + errarg->remote_xid);
> + else
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" in transaction
> %u finished at %X/%X",
> + errarg->origin_name,
> + logicalrep_message_type(errarg->command),
> + errarg->rel->remoterel.nspname,
> + errarg->rel->remoterel.relname,
> + errarg->remote_xid,
> + LSN_FORMAT_ARGS(errarg->finish_lsn));
> + }
> + else
> + {
> + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> in transaction %u",
> + errarg->origin_name,
> + logicalrep_message_type(errarg->command),
> + errarg->rel->remoterel.nspname,
> + errarg->rel->remoterel.relname,
> + errarg->rel->remoterel.attnames[errarg->remote_attnum],
> + errarg->remote_xid);
> + else
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> in transaction %u finished at %X/%X",
> + errarg->origin_name,
> + logicalrep_message_type(errarg->command),
> + errarg->rel->remoterel.nspname,
> + errarg->rel->remoterel.relname,
> + errarg->rel->remoterel.attnames[errarg->remote_attnum],
> + errarg->remote_xid,
> + LSN_FORMAT_ARGS(errarg->finish_lsn));
> + }
> + }
>
> There is quite a lot of common code here:
>
> "processing remote data for replication origin \"%s\" during \"%s\"
> for replication target relation \"%s.%s\"
>
> errarg->origin_name,
> logicalrep_message_type(errarg->command),
> errarg->rel->remoterel.nspname,
> errarg->rel->remoterel.relname,
>
> Is it worth trying to extract that common part to keep this code
> shorter? E.g. It could be easily done just with some #defines
>

I am not sure do we have a clean way to change this, any suggestions ?

> ======
>
> 15. src/include/replication/worker_internal.h
>
> + /* proto version of publisher. */
> + uint32 proto_version;
>
> SUGGESTION
> Protocol version of publisher
>
> ~~~
>

Changed.

> 16.
>
> + /* id of apply background worker */
> + uint32 worker_id;
>
> Uppercase comment
>

Changed.

>
> 17.
>
> +/*
> + * Struct for maintaining an apply background worker.
> + */
> +typedef struct ApplyBgworkerState
>
> I'm not sure what this comment means. Perhaps there are some words missing?
>

I renamed the struct to ApplyBgworkerInfo which sounds better to me and changed the comments.

Best regards,
Hou zj

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Dilip Kumar 2022-08-11 08:15:19 Re: making relfilenodes 56 bits
Previous Message houzj.fnst@fujitsu.com 2022-08-11 08:04:40 RE: Perform streaming logical transactions by background workers and parallel apply