Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-05 23:56:48
Message-ID: CAHut+Pv0WP_2DqHorm0JbO7tUyvJVE+Y4e9urDiOegn6qhq==A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are my review comments for patch v55-0002

======

.../replication/logical/applyparallelworker.c

1. pa_can_start

@@ -276,9 +278,9 @@ pa_can_start(TransactionId xid)
/*
* Don't start a new parallel worker if user has set skiplsn as it's
* possible that user want to skip the streaming transaction. For
- * streaming transaction, we need to spill the transaction to disk so that
- * we can get the last LSN of the transaction to judge whether to skip
- * before starting to apply the change.
+ * streaming transaction, we need to serialize the transaction to a file
+ * so that we can get the last LSN of the transaction to judge whether to
+ * skip before starting to apply the change.
*/
if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
return false;

I think the wording change may belong in patch 0001 because it has
nothing to do with partial serializing.

~~~

2. pa_free_worker

+ /*
+ * Stop the worker if there are enough workers in the pool.
+ *
+ * XXX The worker is also stopped if the leader apply worker needed to
+ * serialize part of the transaction data due to a send timeout. This is
+ * because the message could be partially written to the queue due to send
+ * timeout and there is no way to clean the queue other than resending the
+ * message until it succeeds. To avoid complexity, we directly stop the
+ * worker in this case.
+ */
+ if (winfo->serialize_changes ||
+ napplyworkers > (max_parallel_apply_workers_per_subscription / 2))

Don't need to say "due to send timeout" 2 times in 2 sentences.

SUGGESTION
XXX The worker is also stopped if the leader apply worker needed to
serialize part of the transaction data due to a send timeout. This is
because the message could be partially written to the queue but there
is no way to clean the queue other than resending the message until it
succeeds. Directly stopping the worker avoids needing this complexity.

~~~

3. pa_spooled_messages

Previously I suggested this function name should be changed but that
was rejected (see [1] #6a)

> 6a.
> IMO a better name for this function would be
> pa_apply_spooled_messages();
Not sure about this.

~

FYI the reason for the previous suggestion is because there is no verb
in the current function name, so the reader is left thinking
pa_spooled_messages "what"?

It means the caller has to have extra comments like:
/* Check if changes have been serialized to a file. */
pa_spooled_messages();

OTOH, if the function was called something better -- e.g.
pa_check_for_spooled_messages() or similar -- then it would be
self-explanatory.

~

4.

/*
+ * Replay the spooled messages in the parallel apply worker if the leader apply
+ * worker has finished serializing changes to the file.
+ */
+static void
+pa_spooled_messages(void)

I'm not 100% sure of the logic, so IMO maybe the comment should say a
bit more about how this works:

Specifically, let's say there was some timeout and the LA needed to
write the spool file, then let's say the PA timed out and found itself
inside this function. Now, let's say the LA is still busy writing the
file -- so what happens next?

Does this function simply return, then the main PA loop waits again,
then the times out again, then PA finds itself back inside this
function again... and that keeps happening over and over until
eventually the spool file is found FS_READY? Some explanatory comments
might help.

~

5.

+ /*
+ * Check if changes have been serialized to a file. if so, read and apply
+ * them.
+ */
+ SpinLockAcquire(&MyParallelShared->mutex);
+ fileset_state = MyParallelShared->fileset_state;
+ SpinLockRelease(&MyParallelShared->mutex);

"if so" -> "If so"

~~~

6. pa_send_data

+ *
+ * If the attempt to send data via shared memory times out, then we will switch
+ * to "PARTIAL_SERIALIZE mode" for the current transaction to prevent possible
+ * deadlocks with another parallel apply worker (refer to the comments atop
+ * applyparallelworker.c for details). This means that the current data and any
+ * subsequent data for this transaction will be serialized to a file.
*/
void
pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)

SUGGESTION (minor comment rearranging)

If the attempt to send data via shared memory times out, then we will
switch to "PARTIAL_SERIALIZE mode" for the current transaction -- this
means that the current data and any subsequent data for this
transaction will be serialized to a file. This is done to prevent
possible deadlocks with another parallel apply worker (refer to the
comments atop applyparallelworker.c for details).

~

7.

+ /*
+ * Take the stream lock to make sure that the parallel apply worker
+ * will wait for the leader to release the stream lock until the
+ * end of the transaction.
+ */
+ pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);

The comment doesn't sound right.

"until the end" -> "at the end" (??)

~~~

8. pa_stream_abort

@@ -1374,6 +1470,7 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data)
RollbackToSavepoint(spname);
CommitTransactionCommand();
subxactlist = list_truncate(subxactlist, i + 1);
+
break;
}
}
Spurious whitespace unrelated to this patch?

======

src/backend/replication/logical/worker.c

9. handle_streamed_transaction

/*
+ * The parallel apply worker needs the xid in this message to decide
+ * whether to define a savepoint, so save the original message that has not
+ * moved the cursor after the xid. We will serailize this message to a file
+ * in PARTIAL_SERIALIZE mode.
+ */
+ original_msg = *s;

"serailize" -> "serialize"

~~~

10. apply_handle_stream_prepare

@@ -1245,6 +1265,7 @@ apply_handle_stream_prepare(StringInfo s)
LogicalRepPreparedTxnData prepare_data;
ParallelApplyWorkerInfo *winfo;
TransApplyAction apply_action;
+ StringInfoData original_msg = *s;

Should this include a longer explanation of why this copy is needed
(same as was done in handle_streamed_transaction)?

~

11.

case TRANS_PARALLEL_APPLY:
+
+ /*
+ * Close the file before committing if the parallel apply worker
+ * is applying spooled messages.
+ */
+ if (stream_fd)
+ stream_close_file();

11a.

This comment seems worded backwards.

SUGGESTION
If the parallel apply worker is applying spooled messages then close
the file before committing.

~

11b.

I'm confused - isn't there code doing exactly this (close file before
commit) already in the apply_handle_stream_commit
TRANS_PARALLEL_APPLY?

~~~

12. apply_handle_stream_start

@@ -1383,6 +1493,7 @@ apply_handle_stream_start(StringInfo s)
bool first_segment;
ParallelApplyWorkerInfo *winfo;
TransApplyAction apply_action;
+ StringInfoData original_msg = *s;

Should this include a longer explanation of why this copy is needed
(same as was done in handle_streamed_transaction)?

~

13.

+ serialize_stream_start(stream_xid, false);
+ stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);

- end_replication_step();
break;

A spurious blank line is left before the break;

~~~

14. serialize_stream_stop

+ /* We must be in a valid transaction state */
+ Assert(IsTransactionState());

The comment seems redundant. The code says the same.

~~~

15. apply_handle_stream_abort

@@ -1676,6 +1794,7 @@ apply_handle_stream_abort(StringInfo s)
LogicalRepStreamAbortData abort_data;
ParallelApplyWorkerInfo *winfo;
TransApplyAction apply_action;
+ StringInfoData original_msg = *s;
bool toplevel_xact;

Should this include a longer explanation of why this copy is needed
(same as was done in handle_streamed_transaction)?

~~~

16. apply_spooled_messages

+ stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);

Something still seems a bit odd about this to me (previously also
mentioned in review [1] #29) but I cannot quite put my finger on it...

AFAIK the 'stream_fd' is the global the LA is using to remember the
single stream spool file; It corresponds to the LogicalRepWorker's
'stream_fileset'. So using that same global on the PA side somehow
seemed strange to me. The fileset at PA comes from a different place
(MyParallelShared->fileset).

Basically, I felt that whenever use are using 'stream_fd' and
'stream_fileset' etc. then it should be safe to assume you are looking
at the worker.c from the leader apply worker POV. Otherwise, IMO it
should just use some fd/fs passed around as parameters. Sure, there
might be a few places like stream_close_file (etc) which need some
small refactoring to pass as a parameter instead of always using
'stream_fd' but IMO the end result will be tidier.

~

17.

+ /*
+ * No need to output the DEBUG message here in the parallel apply
+ * worker as similar messages will be output when handling STREAM_STOP
+ * message.
+ */
+ if (!am_parallel_apply_worker() && nchanges % 1000 == 0)
elog(DEBUG1, "replayed %d changes from file \"%s\"",
nchanges, path);

Instead of saying what you are not doing ("No need to... in output
apply worker") wouldn't it make more sense to reverse it and say what
you are doing ("Only log DEBUG messages for the leader apply worker
because ...") and then the condition also becomes positive:

if (am_leader_apply_worker())
{
...
}

~

18.

+ if (am_parallel_apply_worker() &&
+ MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED)
+ goto done;
+
+ /*
+ * No need to output the DEBUG message here in the parallel apply
+ * worker as similar messages will be output when handling STREAM_STOP
+ * message.
+ */
+ if (!am_parallel_apply_worker() && nchanges % 1000 == 0)
elog(DEBUG1, "replayed %d changes from file \"%s\"",
nchanges, path);
}

- BufFileClose(fd);
-
+ stream_close_file();
pfree(buffer);
pfree(s2.data);

+done:
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);

Shouldn't that "done:" label be *above* the pfree's. Otherwise, those
are going to be skipped over by the "goto done;".

~~~

19. apply_handle_stream_commit

@@ -1898,6 +2072,7 @@ apply_handle_stream_commit(StringInfo s)
LogicalRepCommitData commit_data;
ParallelApplyWorkerInfo *winfo;
TransApplyAction apply_action;
+ StringInfoData original_msg = *s;

Should this include a longer explanation of why this copy is needed
(same as was done in handle_streamed_transaction)?

~

20.

+ /*
+ * Close the file before committing if the parallel apply worker
+ * is applying spooled messages.
+ */
+ if (stream_fd)
+ stream_close_file();

(same as previous review comment - see #11)

This comment seems worded backwards.

SUGGESTION
If the parallel apply worker is applying spooled messages then close
the file before committing.

======

src/include/replication/worker_internal.h

21. PartialFileSetState

+ * State of fileset in leader apply worker.
+ *
+ * FS_BUSY means that the leader is serializing changes to the file. FS_READY
+ * means that the leader has serialized all changes to the file and the file is
+ * ready to be read by a parallel apply worker.
+ */
+typedef enum PartialFileSetState

"ready to be read" sounded a bit strange.

SUGGESTION
... to the file so it is now OK for a parallel apply worker to read it.

------
[1] Houz reply to my review v51-0002 --
https://www.postgresql.org/message-id/OS0PR01MB5716350729D8C67AA8CE333194129%40OS0PR01MB5716.jpnprd01.prod.outlook.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Nathan Bossart 2022-12-06 00:04:14 Re: predefined role(s) for VACUUM and ANALYZE
Previous Message Andres Freund 2022-12-05 23:48:45 Re: [PATCH] Add native windows on arm64 support