Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-07-01 06:43:21
Message-ID: CAHut+Pu5ah+NhYMfb7KTbkYTxnuGY2j-7vT0G1MZZsS4SKiHew@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Below are some review comments for patches v14-0001, and v14-0002:

========
v14-0001
========

1.1 Commit message

For now, 'parallel' means the streaming will be applied
via a apply background worker if available. 'on' means the streaming
transaction will be spilled to disk. By the way, we do not change the default
behaviour.

SUGGESTION (minor tweaks)
The parameter value 'parallel' means the streaming will be applied via
an apply background worker, if available. The parameter value 'on'
means the streaming transaction will be spilled to disk. The default
value is 'off' (same as current behaviour).

======

1.2 doc/src/sgml/protocol.sgml - Protocol constants

Previously I wrote that since there are protocol changes here,
shouldn’t there also be some corresponding LOGICALREP_PROTO_XXX
constants and special checking added in the worker.c?

But you said [1 comment #6] you think it is OK because...

IMO, I still disagree with the reply. The fact is that the protocol
*has* been changed, so IIUC that is precisely the reason for having
those protocol constants.

e.g I am guessing you might assign the new one somewhere here:
--
server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options.proto.logical.proto_version =
server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
LOGICALREP_PROTO_VERSION_NUM;
--

And then later you would refer to this new protocol version (instead
of the server version) when calling to the apply_handle_stream_abort
function.

======

1.3 doc/src/sgml/ref/create_subscription.sgml

+ <para>
+ If set to <literal>on</literal>, the changes of transaction are
+ written to temporary files and then applied at once after the
+ transaction is committed on the publisher.
+ </para>

Previously I suggested changing some text but it was rejected [1
comment #8] because you said there may be *multiple* files, not just
one. That is fair enough, but there were some other changes to that
suggested text unrelated to the number of files.

SUGGESTION #2
If set to on, the incoming changes are written to temporary files and
then applied only after the transaction is committed on the publisher.

~~~

1.4

+ <para>
+ If set to <literal>parallel</literal>, incoming changes are directly
+ applied via one of the apply background workers, if available. If no
+ background worker is free to handle streaming transaction then the
+ changes are written to a file and applied after the transaction is
+ committed. Note that if an error happens when applying changes in a
+ background worker, the finish LSN of the remote transaction might
+ not be reported in the server log.
</para>

Should this also say "written to temporary files" instead of "written
to a file"?

======

1.5 src/backend/commands/subscriptioncmds.c

+ /*
+ * If no parameter given, assume "true" is meant.
+ */

Previously I suggested an update for this comment, but it was rejected
[1 comment #12] saying you wanted consistency with defGetBoolean.

Sure, that is one point of view. Another one is that "two wrongs don't
make a right". IIUC that comment as it currently stands is incorrect
because in this case there *is* a parameter given - it is just the
parameter *value* that is missing. Maybe see what other people think?

======

1.6. src/backend/replication/logical/Makefile

It seems to me like these files were intended to be listed in
alphabetical order, so you should move this new file accordingly.

======

1.7 .../replication/logical/applybgworker.c

+/* queue size of DSM, 16 MB for now. */
+#define DSM_QUEUE_SIZE 160000000

The comment should start uppercase.

~~~

1.8 .../replication/logical/applybgworker.c - apply_bgworker_can_start

Maybe this is just my opinion but it sounds a bit strange to over-use
"we" in all the comments.

1.8.a
+/*
+ * Confirm if we can try to start a new apply background worker.
+ */
+static bool
+apply_bgworker_can_start(TransactionId xid)

SUGGESTION
Check if starting a new apply background worker is allowed.

1.8.b
+ /*
+ * We don't start new background worker if we are not in streaming parallel
+ * mode.
+ */

SUGGESTION
Don't start a new background worker if not in streaming parallel mode.

1.8.c
+ /*
+ * We don't start new background worker if user has set skiplsn as it's
+ * possible that user want to skip the streaming transaction. For
+ * streaming transaction, we need to spill the transaction to disk so that
+ * we can get the last LSN of the transaction to judge whether to skip
+ * before starting to apply the change.
+ */

SUGGESTION
Don't start a new background worker if...

~~~

1.9 .../replication/logical/applybgworker.c - apply_bgworker_start

+/*
+ * Try to start worker inside ApplyWorkersHash for requested xid.
+ */
+ApplyBgworkerState *
+apply_bgworker_start(TransactionId xid)

The comment seems not quite right.

SUGGESTION
Try to start an apply background worker and, if successful, cache it
in ApplyWorkersHash keyed by the specified xid.

~~~

1.10 .../replication/logical/applybgworker.c - apply_bgworker_find

+ /*
+ * Find entry for requested transaction.
+ */
+ entry = hash_search(ApplyWorkersHash, &xid, HASH_FIND, &found);
+ if (found)
+ {
+ entry->wstate->pstate->status = APPLY_BGWORKER_BUSY;
+ return entry->wstate;
+ }
+ else
+ return NULL;
+}

IMO it is an unexpected side-effect for the function called "find" to
be also modifying the thing that it found. IMO this setting BUSY
should either be done by the caller, or else this function name should
be renamed to make it obvious that this is doing more than just
"finding" something.

~~~

1.11 .../replication/logical/applybgworker.c - LogicalApplyBgwLoop

+ /*
+ * Push apply error context callback. Fields will be filled applying
+ * applying a change.
+ */

Typo: "applying applying"

~~~

1.12 .../replication/logical/applybgworker.c - apply_bgworker_setup

+ if (launched)
+ ApplyWorkersList = lappend(ApplyWorkersList, wstate);
+ else
+ {
+ shm_mq_detach(wstate->mq_handle);
+ dsm_detach(wstate->dsm_seg);
+ pfree(wstate);
+
+ wstate->mq_handle = NULL;
+ wstate->dsm_seg = NULL;
+ wstate = NULL;
+ }

I am not sure what those first 2 NULL assignments are trying to
achieve. Nothing AFAICT. In any case, it looks like a bug to deference
the 'wstate' after you already pfree-d it in the line above.

~~~

1.13 .../replication/logical/applybgworker.c - apply_bgworker_check_status

+ * Exit if any relation is not in the READY state and if any worker is handling
+ * the streaming transaction at the same time. Because for streaming
+ * transactions that is being applied in apply background worker, we cannot
+ * decide whether to apply the change for a relation that is not in the READY
+ * state (see should_apply_changes_for_rel) as we won't know remote_final_lsn
+ * by that time.
+ */
+void
+apply_bgworker_check_status(void)

Somehow, I felt that this "Exit if..." comment really belonged at the
appropriate place in the function body, instead of in the function
header.

======

1.14 src/backend/replication/logical/launcher.c - WaitForReplicationWorkerAttach

@@ -151,8 +153,10 @@ get_subscription_list(void)
*
* This is only needed for cleaning up the shared memory in case the worker
* fails to attach.
+ *
+ * Returns false if the attach fails. Otherwise return true.
*/
-static void
+static bool
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,

Comment should say either "Return" or "returns"; not one of each.

~~~

1.15. src/backend/replication/logical/launcher.c -
WaitForReplicationWorkerAttach

+ return worker->in_use ? true : false;

Same as just:
return worker->in_use;

~~~

1.16. src/backend/replication/logical/launcher.c - logicalrep_worker_launch

+ bool is_subworker = (subworker_dsm != DSM_HANDLE_INVALID);
+
+ /* We don't support table sync in subworker */
+ Assert(!(is_subworker && OidIsValid(relid)));

I'm not sure the comment is good. It sounds like it is something that
might be possible but is just current "not supported". In fact, I
thought this is really just a sanity check because the combination of
those params is just plain wrong isn't it? Maybe a better comment is
just:
/* Sanity check */

======

1.17 src/backend/replication/logical/proto.c

+ /*
+ * If the version of the publisher is lower than the version of the
+ * subscriber, it may not support sending these two fields. So these
+ * fields are only taken if they are included.
+ */
+ if (include_abort_lsn)

1.17a
I thought that the comment about "versions of publishers lower than
version of subscribers..." is bogus. Perhaps you have in mind just
thinking about versions prior to PG16 but that is not what the comment
is saying. E.g. sometime in the future, the publisher may be PG18 and
the subscriber might be PG25. So that might work fine (even though the
publisher is a lower version), but this comment will be completely
misleading. BTW this is another reason I think code needs to be using
protocol versions (not server versions). [See other comment #1.2]

1.17b.
Anyway, I felt that any comment describing the meaning of the the
'include_abort_lsn' param would be better in the function header
comment, instead of in the function body.

======

1.18 src/backend/replication/logical/worker.c - file header comment

+ * 1) Separate background workers
+ *
+ * Assign a new apply background worker (if available) as soon as the xact's...

Somehow this long comment did not ever mention that this mode is
selected by the user using the 'streaming=parallel'. I thought
probably it should say that somewhere here.

~~~

1.19. src/backend/replication/logical/worker.c -

ApplyErrorCallbackArg apply_error_callback_arg =
{
.command = 0,
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
.finish_lsn = InvalidXLogRecPtr,
.origin_name = NULL,
};

I still thought that the above initialization deserves some sort of
comment, even if you don't want to use the comment text previously
suggested [1 comment #41]

~~~

1.20 src/backend/replication/logical/worker.c -

@@ -251,27 +258,38 @@ static MemoryContext LogicalStreamingContext = NULL;
WalReceiverConn *LogRepWorkerWalRcvConn = NULL;

Subscription *MySubscription = NULL;
-static bool MySubscriptionValid = false;
+bool MySubscriptionValid = false;

bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;

/* fields valid only when processing streamed transaction */
-static bool in_streamed_transaction = false;
+bool in_streamed_transaction = false;

The tab alignment here looks wrong. IMO it's not worth trying to align
these at all. I think the tabs are leftover from before when the vars
used to be static.

~~~

1.21 src/backend/replication/logical/worker.c - apply_bgworker_active

+/* Check if we are applying the transaction in apply background worker */
+#define apply_bgworker_active() (in_streamed_transaction &&
stream_apply_worker != NULL)

Sorry [1 comment #42b], I had meant to write "in apply background
worker" -> "in an apply background worker".

~~~

1.22 src/backend/replication/logical/worker.c - skip_xact_finish_lsn

/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches
the subskiplsn.
* Once we start skipping changes, we don't stop it until we skip all
changes of
* the transaction even if pg_subscription is updated and
MySubscription->skiplsn
- * gets changed or reset during that. Also, in streaming transaction cases, we
- * don't skip receiving and spooling the changes since we decide whether or not
+ * gets changed or reset during that. Also, in streaming transaction
cases (streaming = on),
+ * we don't skip receiving and spooling the changes since we decide
whether or not
* to skip applying the changes when starting to apply changes. The
subskiplsn is
* cleared after successfully skipping the transaction or applying non-empty
* transaction. The latter prevents the mistakenly specified subskiplsn from
- * being left.
+ * being left. Note that we cannot skip the streaming transaction in parallel
+ * mode, because we cannot get the finish LSN before applying the changes.
*/

"in parallel mode, because" -> "in 'streaming = parallel' mode, because"

~~~

1.23 src/backend/replication/logical/worker.c - handle_streamed_transaction

1.23a
/*
- * Handle streamed transactions.
+ * Handle streamed transactions for both main apply worker and apply background
+ * worker.

SUGGESTION
Handle streamed transactions for both the main apply worker and the
apply background workers.

1.23b
+ * In streaming case (receiving a block of streamed transaction), for
+ * SUBSTREAM_ON mode, we simply redirect it to a file for the proper toplevel
+ * transaction, and for SUBSTREAM_PARALLEL mode, we send the changes to
+ * background apply worker (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
+ * changes will also be applied in main apply worker).

"background apply worker" -> "apply background workers"

Also, I think you don't need to say "we" everywhere:
"we simply redirect it" -> "simply redirect it"
"we send the changes" -> "send the changes"

1.23c.
+ * But there are two exceptions: If we apply streamed transaction in main apply
+ * worker with parallel mode, it will return false when we address
+ * LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes.

SUGGESTION
Exception: When parallel mode is applying streamed transaction in the
main apply worker, (e.g. when addressing
LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes), then return false.

~~~

1.24 src/backend/replication/logical/worker.c - handle_streamed_transaction

1.24a.
/* not in streaming mode */
- if (!in_streamed_transaction)
+ if (!(in_streamed_transaction || am_apply_bgworker()))
return false;
Uppercase comment

1.24b
+ /* define a savepoint for a subxact if needed. */
+ apply_bgworker_subxact_info_add(current_xid);

Uppercase comment

~~~

1.25 src/backend/replication/logical/worker.c - handle_streamed_transaction

+ /*
+ * This is the main apply worker, and there is an apply background
+ * worker. So we apply the changes of this transaction in an apply
+ * background worker. Pass the data to the worker.
+ */

SUGGESTION (to be more consistent with the next comment)
This is the main apply worker, but there is an apply background
worker, so apply the changes of this transaction in that background
worker. Pass the data to the worker.

~~~

1.26 src/backend/replication/logical/worker.c - handle_streamed_transaction

+ /*
+ * This is the main apply worker, but there is no apply background
+ * worker. So we write to temporary files and apply when the final
+ * commit arrives.

SUGGESTION
This is the main apply worker, but there is no apply background
worker, so write to temporary files and apply when the final commit
arrives.

~~~

1.27 src/backend/replication/logical/worker.c - apply_handle_stream_prepare

+ /*
+ * Check if we are processing this transaction in an apply background
+ * worker.
+ */

SUGGESTION:
Check if we are processing this transaction in an apply background
worker and if so, send the changes to that worker.

~~~

1.28 src/backend/replication/logical/worker.c - apply_handle_stream_prepare

+ if (wstate)
+ {
+ apply_bgworker_send_data(wstate, s->len, s->data);
+
+ /*
+ * Wait for apply background worker to finish. This is required to
+ * maintain commit order which avoids failures due to transaction
+ * dependencies and deadlocks.
+ */
+ apply_bgworker_wait_for(wstate, APPLY_BGWORKER_FINISHED);
+ apply_bgworker_free(wstate);

I think maybe the comment can be changed slightly, and then it can
move up one line to the top of this code block (above the 3
statements). I think it will become more readable.

SUGGESTION
After sending the data to the apply background worker, wait for that
worker to finish. This is necessary to maintain commit order which
avoids failures due to transaction dependencies and deadlocks.

~~~

1.29 src/backend/replication/logical/worker.c - apply_handle_stream_start

+ /*
+ * If no worker is available for the first stream start, we start to
+ * serialize all the changes of the transaction.
+ */
+ else
+ {

1.29a.
I felt that this comment should be INSIDE the else { block to be more readable.

1.29b.
The comment can also be simplified a bit
SUGGESTION:
Since no apply background worker is available for the first stream
start, serialize all the changes of the transaction.

~~~

1.30 src/backend/replication/logical/worker.c - apply_handle_stream_start

+ /* if this is not the first segment, open existing subxact file */
+ if (!first_segment)
+ subxact_info_read(MyLogicalRepWorker->subid, stream_xid);

Uppercase comment

~~~

1.31. src/backend/replication/logical/worker.c - apply_handle_stream_stop

+ if (apply_bgworker_active())
+ {
+ char action = LOGICAL_REP_MSG_STREAM_STOP;

Are all the tabs before the variable needed?

~~~

1.32. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ /* Check whether the publisher sends abort_lsn and abort_time. */
+ if (am_apply_bgworker())
+ include_abort_lsn = MyParallelState->server_version >= 150000;

Previously I already reported about this [1 comment #50]

I just do not trust this code to do the correct thing. E.g. what if
streaming=parallel but all bgworkers are exhausted then IIUC the
am_apply_bgworker() will not be true. But then with both PG15 servers
for pub/sub you will WRITE something but then you will not READ it.
Won't the stream IO will get out of step and everything will fall
apart?

Perhaps the include_abort_lsn assignment should be unconditionally
set, and I think this should be a protocol version check instead of a
server version check shouldn’t it (see my earlier comment 1.2)

~~~

1.32 src/backend/replication/logical/worker.c - apply_handle_stream_abort

BTW, I think the PG16devel is now stamped in the GitHub HEAD so
perhaps all of your 150000 checks should be now changed to say 160000?

~~~

1.33 src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ /*
+ * We are in main apply worker and the transaction has been serialized
+ * to file.
+ */
+ else
+ serialize_stream_abort(xid, subxid);

I think this will be more readable if written like:

else
{
/* put comment here... */
serialize_stream_abort(xid, subxid);
}

~~~

1.34 src/backend/replication/logical/worker.c - apply_dispatch

-
/*
* Logical replication protocol message dispatcher.
*/
-static void
+void
apply_dispatch(StringInfo s)

Maybe removing the whitespace is not really needed as part of this patch?

======

1.35 src/include/catalog/pg_subscription.h

+/* Disallow streaming in-progress transactions */
+#define SUBSTREAM_OFF 'f'
+
+/*
+ * Streaming transactions are written to a temporary file and applied only
+ * after the transaction is committed on upstream.
+ */
+#define SUBSTREAM_ON 't'
+
+/* Streaming transactions are applied immediately via a background worker */
+#define SUBSTREAM_PARALLEL 'p'
+

1.35a
Should all these "Streaming transactions" be called "Streaming
in-progress transactions"?

1.35b.
Either align the values or don’t. Currently, they seem half-aligned.

1.35c.
SUGGESTION (modify the 1st comment to be more consistent with the others)
Streaming in-progress transactions are disallowed.

======

1.36 src/include/replication/worker_internal.h

extern int logicalrep_sync_worker_count(Oid subid);
+extern int logicalrep_apply_background_worker_count(Oid subid);

Just wondering if this should be called
"logicalrep_apply_bgworker_count(Oid subid);" for consistency with the
other function naming.

========
v14-0002
========

2.1 Commit message

Change all TAP tests using the SUBSCRIPTION "streaming" option, so they
now test both 'on' and 'parallel' values.

"option" -> "parameter"

------
[1] https://www.postgresql.org/message-id/OS3PR01MB6275DCCDF35B3BBD52CA02CC9EB89%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Masahiko Sawada 2022-07-01 07:08:48 Re: Issue with pg_stat_subscription_stats
Previous Message Michael Paquier 2022-07-01 06:41:42 Re: Backup command and functions can cause assertion failure and segmentation fault