Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-07-13 04:33:18
Message-ID: CAHut+PvN7fwtUE=bidzrsOUXSt+JpnkJztZ-Jn5t86moofaZ6g@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Below are my review comments for the v16* patch set:

========
v16-0001
========

1.0 <general>

There are places (comments, docs, errmsgs, etc) in the patch referring
to "parallel mode". I think every one of those references should be
found and renamed to "parallel streaming mode" or "streaming=parallel"
or at the very least match sure that "streaming" is in the same
sentence. IMO it's too vague just saying "parallel" without also
saying the context is for the "streaming" parameter.

I have commented on some of those examples below, but please search
everything anyway (including the docs) to catch the ones I haven't
explicitly mentioned.

======

1.1 src/backend/commands/subscriptioncmds.c

+defGetStreamingMode(DefElem *def)
+{
+ /*
+ * If no value given, assume "true" is meant.
+ */

Please fix this comment to identical to this pushed patch [1]

======

1.2 .../replication/logical/applybgworker.c - apply_bgworker_start

+ if (list_length(ApplyWorkersFreeList) > 0)
+ {
+ wstate = (ApplyBgworkerState *) llast(ApplyWorkersFreeList);
+ ApplyWorkersFreeList = list_delete_last(ApplyWorkersFreeList);
+ Assert(wstate->pstate->status == APPLY_BGWORKER_FINISHED);
+ }

The Assert that the entries in the free-list are FINISHED seems like
unnecessary checking. IIUC, code is already doing the Assert that
entries are FINISHED before allowing them into the free-list in the
first place.

~~~

1.3 .../replication/logical/applybgworker.c - apply_bgworker_find

+ if (found)
+ {
+ char status = entry->wstate->pstate->status;
+
+ /* If any workers (or the postmaster) have died, we have failed. */
+ if (status == APPLY_BGWORKER_EXIT)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("background worker %u failed to apply transaction %u",
+ entry->wstate->pstate->n,
+ entry->wstate->pstate->stream_xid)));
+
+ Assert(status == APPLY_BGWORKER_BUSY);
+
+ return entry->wstate;
+ }

Why not remove that Assert but change the condition to be:

if (status != APPLY_BGWORKER_BUSY)
ereport(...)

======

1.4 src/backend/replication/logical/proto.c - logicalrep_write_stream_abort

@@ -1163,31 +1163,56 @@ logicalrep_read_stream_commit(StringInfo in,
LogicalRepCommitData *commit_data)
/*
* Write STREAM ABORT to the output stream. Note that xid and subxid will be
* same for the top-level transaction abort.
+ *
+ * If write_abort_lsn is true, send the abort_lsn and abort_time fields.
+ * Otherwise not.
*/

"Otherwise not." -> ", otherwise don't."

~~~

1.5 src/backend/replication/logical/proto.c - logicalrep_read_stream_abort

+ *
+ * If read_abort_lsn is true, try to read the abort_lsn and abort_time fields.
+ * Otherwise not.
*/
void
-logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
- TransactionId *subxid)
+logicalrep_read_stream_abort(StringInfo in,
+ LogicalRepStreamAbortData *abort_data,
+ bool read_abort_lsn)

"Otherwise not." -> ", otherwise don't."

======

1.6 src/backend/replication/logical/worker.c - file comment

+ * If streaming = parallel, We assign a new apply background worker (if
+ * available) as soon as the xact's first stream is received. The main apply

"We" -> "we" ... or maybe better just remove it completely.

~~~

1.7 src/backend/replication/logical/worker.c - apply_handle_stream_prepare

+ /*
+ * After sending the data to the apply background worker, wait for
+ * that worker to finish. This is necessary to maintain commit
+ * order which avoids failures due to transaction dependencies and
+ * deadlocks.
+ */
+ apply_bgworker_send_data(wstate, s->len, s->data);
+ apply_bgworker_wait_for(wstate, APPLY_BGWORKER_FINISHED);
+ apply_bgworker_free(wstate);

The comment should be changed how you had suggested [2], so that it
will be formatted the same way as a couple of other similar comments.

~~~

1.8 src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ /* Check whether the publisher sends abort_lsn and abort_time. */
+ if (am_apply_bgworker())
+ read_abort_lsn = MyParallelState->server_version >= 160000;

This is handling decisions about read/write of the protocol bytes. I
think feel like it will be better to be checking the server *protocol*
version (not the server postgres version) to make this decision – e.g.
this code should be using the new macro you introduced so it will end
up looking much like how the pgoutput_stream_abort code is doing it.

~~~

1.9 src/backend/replication/logical/worker.c - store_flush_position

@@ -2636,6 +2999,10 @@ store_flush_position(XLogRecPtr remote_lsn)
{
FlushPosition *flushpos;

+ /* We only need to collect the LSN in main apply worker */
+ if (am_apply_bgworker())
+ return;
+

SUGGESTION
/* Skip if not the main apply worker */

======

1.10 src/backend/replication/pgoutput/pgoutput.c

@@ -1820,6 +1820,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn)
{
ReorderBufferTXN *toptxn;
+ bool write_abort_lsn = false;
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;

/*
* The abort should happen outside streaming block, even for streamed
@@ -1832,8 +1834,13 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,

Assert(rbtxn_is_streamed(toptxn));

+ /* We only send abort_lsn and abort_time if the subscriber needs them. */
+ if (data->protocol_version >= LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
+ write_abort_lsn = true;
+

IMO it's simpler to remove the declaration default assignment, and
instead this code can be written as:

write_abort_lsn = data->protocol_version >=
LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM;

======

1.11 src/include/replication/logicalproto.h

+ *
+ * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
+ * with support for streaming large transactions in apply background worker.
+ * Introduced in PG16.

"in apply background worker" -> "using apply background workers"

~~~

1.12

+extern void logicalrep_read_stream_abort(StringInfo in,
+ LogicalRepStreamAbortData *abort_data,
+ bool include_abort_lsn);

I think the "include_abort_lsn" is now renamed to "include_abort_lsn".

========
v16-0002
========

No comments.

========
v16-0003
========

3.0 <general>

Same comment about "parallel mode" as in comment #1.0

======

3.1 doc/src/sgml/ref/create_subscription.sgml

+ the publisher-side; 2) there cannot be any non-immutable functions
+ in the subscriber-side replicated table.

The functions are not table data so maybe it's better to say
"functions in the ..." -> "functions used by the ...". If you change
this then there are equivalent comments and commit messages that
should change to match it.

======

3.2 .../replication/logical/applybgworker.c - apply_bgworker_relation_check

+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot replicate target relation \"%s.%s\" in parallel "
+ "mode", rel->remoterel.nspname, rel->remoterel.relname),
+ errdetail("The unique column on subscriber is not the unique "
+ "column on publisher or there is at least one "
+ "non-immutable function."),
+ errhint("Please change the streaming option to 'on' instead of
'parallel'.")));

3.2a
SUGGESTED errmsg
"cannot replicate target relation \"%s.%s\" using subscription
parameter streaming=parallel"

3.2b
SUGGESTED errhint
"Please change to use subscription parameter streaming=on"

3.3
The errcode seems the wrong one. Perhaps it should be
ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE.

======

3.4 src/backend/replication/logical/proto.c - logicalrep_write_attrs

In [3] you wrote:
I think the file relcache.c should contain cache-build operations, and the code
I added doesn't have this operation. So I didn't change.

But I only gave relcache.c as an example. It can also be a new static
function in this same file, but anyway I still think this big slab of
code might be better if not done inline in logicalrep_write_attrs.

~~~

3.5 src/backend/replication/logical/proto.c - logicalrep_read_attrs

@@ -1012,11 +1062,14 @@ logicalrep_read_attrs(StringInfo in,
LogicalRepRelation *rel)
{
uint8 flags;

- /* Check for replica identity column */
+ /* Check for replica identity and unique column */
flags = pq_getmsgbyte(in);
- if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
+ if (flags & ATTR_IS_REPLICA_IDENTITY)
attkeys = bms_add_member(attkeys, i);

+ if (flags & ATTR_IS_UNIQUE)
+ attunique = bms_add_member(attunique, i);

The code comment really applies to all 3 statements so maybe better
not to have the blank line here.

======

3.6 src/backend/replication/logical/relation.c - logicalrep_rel_mark_parallel

3.6.a
+ /* Fast path if we marked 'parallel' flag. */
+ if (entry->parallel != PARALLEL_APPLY_UNKNOWN)
+ return;

SUGGESTED
Fast path if 'parallel' flag is already known.

~

3.6.b
+ /* Initialize the flag. */
+ entry->parallel = PARALLEL_APPLY_SAFE;

I think it makes more sense if assigning SAFE is the very *last* thing
this function does – not the first thing.

~

3.6.c
+ /*
+ * First, we check if the unique column in the relation on the
+ * subscriber-side is also the unique column on the publisher-side.
+ */

"First, we check..." -> "First, check..."

~

3.6.d
+ /*
+ * Then, We check if there is any non-immutable function in the local
+ * table. Look for functions in the following places:

"Then, We check..." -> "Then, check"

~~~

3.7 src/backend/replication/logical/relation.c - logicalrep_rel_mark_parallel

From [3] you wrote:
Personally, I do not like to use the `goto` syntax if it is not necessary,
because the `goto` syntax will forcibly change the flow of code execution.

Yes, but OTOH readability is a major consideration too, and in this
function by simply saying goto parallel_unsafe; you can have 3 returns
instead of 7 returns, and it will take ~10 lines less code to do the
same functionality.

======

3.8 src/include/replication/logicalrelation.h

+/*
+ * States to determine if changes on one relation can be applied by an apply
+ * background worker.
+ */
+typedef enum RelParallel
+{
+ PARALLEL_APPLY_UNKNOWN = 0, /* unknown */
+ PARALLEL_APPLY_SAFE, /* Can apply changes in an apply background
+ worker */
+ PARALLEL_APPLY_UNSAFE /* Can not apply changes in an apply background
+ worker */
+} RelParallel;

3.8a
"can be applied by an apply background worker." -> "can be applied
using an apply background worker."

~

3.8b
The enum is described, and IMO the enum values are self-explanatory
now. So commenting them individually is not adding any useful
information. I think those comments can be removed.

~

3.8c
The RelParallel name does not have much meaning to it - there is
nothing really about that name that says it is related to validation
states. Maybe "ParallelSafety" or "ParalleApplySafety" or something
similar?

~~~

3.9 src/include/replication/logicalrelation.h

+ RelParallel parallel; /* Can apply changes in an apply
+ background worker? */

This comment is like #3.8c.

IMO the member name 'parallel' doesn't really have enough meaning.
What about something like 'parallel_apply', or 'parallel_ok', or
'parallel_safe', or something similar.

======

3.10 .../subscription/t/032_streaming_apply.pl

In [3] you wrote:
Since it takes almost no time, I think a more detailed confirmation is fine.

Yes, but I think a confirmation is a confirmation regardless - the
test will either pass/fail and this additional code won't change the
result. e.g. Maybe the extra code does not hurt much, but AFAIK having
a "detailed confirmation" doesn't really achieve anything useful
either. I previously suggested to removed it simply because it means
less test code to maintain.

========
v16-0004
========

4.0 <general>

Same comment about "parallel mode" as in comment #1.0

======

4.1 Commit message

If the user sets the subscription_parameter "streaming" to "parallel", when
applying a streaming transaction, we will try to apply this transaction in
apply background worker. However, when the changes in this transaction cannot
be applied in apply background worker, the background worker will exit with an
error. In this case, we can retry applying this streaming transaction in "on"
mode. In this way, we may avoid blocking logical replication here.

So we introduce field "subretry" in catalog "pg_subscription". When the
subscriber exit with an error, we will try to set this flag to true, and when
the transaction is applied successfully, we will try to set this flag to false.

Then when we try to apply a streaming transaction in apply background worker,
we can see if this transaction has failed before based on the "subretry" field.

~

I reworded above to remove most of the "we" this and "we" that...

SUGGESTION
When the subscription parameter is set streaming=parallel, the logic
tries to apply the streaming transaction using an apply background
worker. If this fails the background worker exits with an error.

In this case, retry applying the streaming transaction using the
normal streaming=on mode. This is done to avoid getting caught in a
loop of the same retry errors.

A new flag field "subretry" has been introduced to catalog
"pg_subscription". If the subscriber exits with an error, this flag
will be set true, and whenever the transaction is applied
successfully, this flag is reset false. Now, when deciding how to
apply a streaming transaction, the logic can know if this transaction
has previously failed or not (by checking the "subretry" field).

======

4.2 doc/src/sgml/catalogs.sgml

+ <para>
+ True if the previous apply change failed and a retry was required.
+ </para></entry>

"was" required? "will be required"? It is a bit vague what tense to use...

SUGGESTION 1
True if the previous apply change failed, necessitating a retry

SUGGESTION 2
True if the previous apply change failed

======

4.3 doc/src/sgml/ref/create_subscription.sgml

+ <literal>parallel</literal> mode is disregarded when retrying;
+ instead the transaction will be applied using <literal>on</literal>
+ mode.

"on mode" etc sounds strange.

SUGGESTION
During the retry the streaming=parallel mode is ignored. The retried
transaction will be applied using streaming=on mode.

======

4.4 src/backend/replication/logical/worker.c - set_subscription_retry

+ if (MySubscription->retry == retry ||
+ am_apply_bgworker())
+ return;
+

Somehow I feel that this quick exit condition is not quite what it
seems. IIUC the purpose of this is really to avoid doing the tuple
updates if it is not necessary to do them. So if retry was already set
true then there is no need to update tuple to true again. So if retry
was already set false then there is no need to update the tuple to
false. But I just don't see how the (hypothetical) code below can work
as expected, because where is the code updating the value of
MySubscription->retry ???

set_subscription_retry(true);
set_subscription_retry(true);

I think at least there needs to be some detailed comments explaining
what this quick exit is really doing because my guess is that
currently it is not quite working as expected.

~~~

4.5

+ /* reset subretry */

Uppercase comment

------
[1] https://github.com/postgres/postgres/commit/8445f5a21d40b969673ca03918c74b4fbc882bf4
[2] https://www.postgresql.org/message-id/OS3PR01MB62755C6C9A75EB09F7218B589E839%40OS3PR01MB6275.jpnprd01.prod.outlook.com
[3] https://www.postgresql.org/message-id/OS3PR01MB6275120502A4730AB9932FCA9E839%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michel Pelletier 2022-07-13 04:33:41 Re: PATCH: Add Table Access Method option to pgbench
Previous Message Pavel Stehule 2022-07-13 04:26:47 Re: Documentation about PL transforms