Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-08-18 08:57:11
Message-ID: CAA4eK1KpuQAk_fiqVXy16WkDrKPBwA9E61VpvLfkse-o31NNVA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Aug 18, 2022 at 11:59 AM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> Here are my review comments for patch v21-0001:
>
> 4. Commit message
>
> In addition, the patch extends the logical replication STREAM_ABORT message so
> that abort_time and abort_lsn can also be sent which can be used to update the
> replication origin in apply background worker when the streaming transaction is
> aborted.
>
> 4a.
> Should this para also mention something about the introduction of
> protocol version 4?
>
> 4b.
> Should this para also mention that these extensions are not strictly
> mandatory for the parallel streaming to still work?
>

Without parallel streaming/apply, we don't need to send this extra
message. So, I don't think it will be correct to say that.

>
> 46. src/backend/replication/logical/worker.c - apply_error_callback
>
> + if (errarg->remote_attnum < 0)
> + {
> + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" in transaction
> %u",
> + errarg->origin_name,
> + logicalrep_message_type(errarg->command),
> + errarg->rel->remoterel.nspname,
> + errarg->rel->remoterel.relname,
> + errarg->remote_xid);
> + else
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" in transaction
> %u finished at %X/%X",
> + errarg->origin_name,
> + logicalrep_message_type(errarg->command),
> + errarg->rel->remoterel.nspname,
> + errarg->rel->remoterel.relname,
> + errarg->remote_xid,
> + LSN_FORMAT_ARGS(errarg->finish_lsn));
> + }
> + else
> + {
> + if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> in transaction %u",
> + errarg->origin_name,
> + logicalrep_message_type(errarg->command),
> + errarg->rel->remoterel.nspname,
> + errarg->rel->remoterel.relname,
> + errarg->rel->remoterel.attnames[errarg->remote_attnum],
> + errarg->remote_xid);
> + else
> + errcontext("processing remote data for replication origin \"%s\"
> during \"%s\" for replication target relation \"%s.%s\" column \"%s\"
> in transaction %u finished at %X/%X",
> + errarg->origin_name,
> + logicalrep_message_type(errarg->command),
> + errarg->rel->remoterel.nspname,
> + errarg->rel->remoterel.relname,
> + errarg->rel->remoterel.attnames[errarg->remote_attnum],
> + errarg->remote_xid,
> + LSN_FORMAT_ARGS(errarg->finish_lsn));
> + }
> + }
>
> Hou-san had asked [3](comment #14) me how the above code can be
> shortened. Below is one idea, but maybe you won't like it ;-)
>
> #define MSG_O_T_S_R "processing remote data for replication origin
> \"%s\" during \"%s\" for replication target relation \"%s.%s\" "
> #define O_T_S_R\
> errarg->origin_name,\
> logicalrep_message_type(errarg->command),\
> errarg->rel->remoterel.nspname,\
> errarg->rel->remoterel.relname
>
> if (errarg->remote_attnum < 0)
> {
> if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> errcontext(MSG_O_T_S_R "in transaction %u",
> O_T_S_R,
> errarg->remote_xid);
> else
> errcontext(MSG_O_T_S_R "in transaction %u finished at %X/%X",
> O_T_S_R,
> errarg->remote_xid,
> LSN_FORMAT_ARGS(errarg->finish_lsn));
> }
> else
> {
> if (XLogRecPtrIsInvalid(errarg->finish_lsn))
> errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u",
> O_T_S_R,
> errarg->rel->remoterel.attnames[errarg->remote_attnum],
> errarg->remote_xid);
> else
> errcontext(MSG_O_T_S_R "column \"%s\" in transaction %u finished at %X/%X",
> O_T_S_R,
> errarg->rel->remoterel.attnames[errarg->remote_attnum],
> errarg->remote_xid,
> LSN_FORMAT_ARGS(errarg->finish_lsn));
> }
> #undef O_T_S_R
> #undef MSG_O_T_S_R
>
> ======
>

I don't like this much. I think this reduces readability.

> 47. src/include/replication/logicalproto.h
>
> @@ -32,12 +32,17 @@
> *
> * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
> * support for two-phase commit decoding (at prepare time). Introduced in PG15.
> + *
> + * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
> + * with support for streaming large transactions using apply background
> + * workers. Introduced in PG16.
> */
> #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
> #define LOGICALREP_PROTO_VERSION_NUM 1
> #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
> #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
> -#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
> +#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
> +#define LOGICALREP_PROTO_MAX_VERSION_NUM
> LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
>
> 47a.
> I don't think that comment is strictly true. IIUC the new protocol
> version 4 is currently only affecting the *extra* STREAM_ABORT members
> – but in fact streaming=parallel is still functional without using
> those extra members, isn't it? So maybe this description needed to be
> modified a bit to be more accurate?
>

The reason for sending this extra abort members is to ensure that
after aborting the transaction, if the subscriber/apply worker
restarts, it doesn't need to request the transaction again. Do you
have suggestions for improving this comment?

>
> 52.
>
> +/* Apply background worker setup and interactions */
> +extern ApplyBgworkerInfo *apply_bgworker_start(TransactionId xid);
> +extern ApplyBgworkerInfo *apply_bgworker_find(TransactionId xid);
> +extern void apply_bgworker_wait_for(ApplyBgworkerInfo *wstate,
> + ApplyBgworkerStatus wait_for_status);
> +extern void apply_bgworker_send_data(ApplyBgworkerInfo *wstate, Size nbytes,
> + const void *data);
> +extern void apply_bgworker_free(ApplyBgworkerInfo *wstate);
> +extern void apply_bgworker_check_status(void);
> +extern void apply_bgworker_set_status(ApplyBgworkerStatus status);
> +extern void apply_bgworker_subxact_info_add(TransactionId current_xid);
> +extern void apply_bgworker_savepoint_name(Oid suboid, Oid relid,
> + char *spname, int szsp);
>
> This big block of similarly named externs might as well be in
> alphabetical order instead of apparently random.
>

I think it is better to order them based on related functionality if
they are not already instead of using alphabetical order.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Alvaro Herrera 2022-08-18 09:04:25 Re: cataloguing NOT NULL constraints
Previous Message Peter Smith 2022-08-18 08:26:33 Re: shadow variables - pg15 edition