Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-07 08:16:37
Message-ID: CAHut+PsAH23qziC6QZoj=9tPoA9emyEnRnb6VW0L2jQ0zJ+b9g@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are my review comments for v42-0001

======

1. General.

Please take the time to process all new code comments using a
grammar/spelling checker (e.g. simply cut/paste them into MSWord or
Grammarly or any other tool of your choice as a quick double-check)
*before* posting the patches; too many of my review comments are about
code comments and it's taking a long time to keep cycling through
reporting/fixing/confirming comments for every patch version -
whereas it probably would take hardly any time to make the same
spelling/grammar corrections up-front.

======

.../replication/logical/applyparallelworker.c

2. ParallelApplyLockids

This seems like a bogus name. Code is using this in a way that means
the subset of lockED ids. Not the list of all the lock ids.

OTHO, having another list of ALL lock-ids might be useful (for
detecting unique ids) if you are able to maintain such a list safely.

~~~

3. parallel_apply_can_start

+
+ if (switching_to_serialize)
+ return false;

This should have an explanatory comment.

~~~

4. parallel_apply_start_worker

+ /* Check if the transaction in that worker has been finished. */
+ xact_state = parallel_apply_get_xact_state(tmp_winfo->shared);
+ if (xact_state == PARALLEL_TRANS_FINISHED)

"has been finished." -> "has finished."

~~~

5.

+ /*
+ * Set the xact_state flag in the leader instead of the
+ * parallel apply worker to avoid the race condition where the leader has
+ * already started waiting for the parallel apply worker to finish
+ * processing the transaction while the child process has not yet
+ * processed the first STREAM_START and has not set the
+ * xact_state to true.
+ */
+ SpinLockAcquire(&winfo->shared->mutex);
+ winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
+ winfo->shared->xid = xid;
+ winfo->shared->fileset_valid = false;
+ winfo->shared->partial_sent_message = false;
+ SpinLockRelease(&winfo->shared->mutex);

This code comment is stale, because xact_state is no longer a "flag",
nor does "set the xact_state to true." make sense anymore.

~~~

6. parallel_apply_free_worker

+ /*
+ * Don't free the worker if the transaction in the worker is still in
+ * progress. This could happend as we don't wait for transaction rollback
+ * to finish.
+ */
+ if (parallel_apply_get_xact_state(winfo->shared) < PARALLEL_TRANS_FINISHED)
+ return;

6a.
typo "happend"

~

6b.
Saying "< PARALLEL_TRANS_FINISHED" seems kind of risky because not it
is assuming a specific ordering of those enums which has never been
mentioned before. I think it will be safer to say "!=
PARALLEL_TRANS_FINISHED" instead. Alternatively, if the enum order is
important then it must be documented with the typedef so that nobody
changes it.

~~~

7.

+ ParallelApplyWorkersList = list_delete_ptr(ParallelApplyWorkersList,
+ winfo);

Unnecessary wrapping

~~~

8.

+ /*
+ * Resend the pending message to parallel apply worker to cleanup the
+ * queue. Note that parallel apply worker will just ignore this message
+ * as it has already handled this message while applying spooled
+ * messages.
+ */
+ result = shm_mq_send(winfo->mq_handle, strlen(winfo->pending_msg),
+ winfo->pending_msg, false, true);

If I understand this logic it seems a bit hacky. From the comment, it
seems you are resending a message that you know/expect to be ignored
simply to make it disappear. (??). Isn't there some other way to clear
the pending message without requiring a bogus send?

~~~

9. parallel_apply_spooled_messages

+
+static void
+parallel_apply_spooled_messages(void)

Missing function comment

~~~

10.

+parallel_apply_spooled_messages(void)
+{
+ bool fileset_valid = false;
+
+ /*
+ * Check if changes has been serialized to disk. if so, read and
+ * apply them.
+ */
+ SpinLockAcquire(&MyParallelShared->mutex);
+ fileset_valid = MyParallelShared->fileset_valid;
+ SpinLockRelease(&MyParallelShared->mutex);

The variable assignment in the declaration seems unnecessary.

~~~

11.

+ /*
+ * Check if changes has been serialized to disk. if so, read and
+ * apply them.
+ */
+ SpinLockAcquire(&MyParallelShared->mutex);
+ fileset_valid = MyParallelShared->fileset_valid;
+ SpinLockRelease(&MyParallelShared->mutex);

"has been" -> "have been"

~~~

12.

+ apply_spooled_messages(&MyParallelShared->fileset,
+ MyParallelShared->xid,
+ InvalidXLogRecPtr);
+ parallel_apply_set_fileset(MyParallelShared, false);

parallel_apply_set_fileset() is a confusing function name. IMO this
logic would be better split into 2 smaller functions:
- parallel_apply_set_fileset_valid()
- parallel_apply_set_fileset_invalid()

~~~

13. parallel_apply_get_unique_id

+/*
+ * Returns the unique id among all parallel apply workers in the subscriber.
+ */
+static uint16
+parallel_apply_get_unique_id()

The meaning of that comment and the purpose of this function are not
entirely clear... e.g. I had to read the code to figure out what the
comment is describing.

~~~

14.

The function seems to be written in some way that scans all known ids
looking for one that does not match. I wonder if it might be easier to
just assign some auto-incrementing static instead of having to scan
for uniqueness always. Since the pool of apply workers is limited is
that kind of ID ever going to come close to running out?

Alternatively, see also comment #2 for a different way to know what
lockids are present.

~~~

15.

winfo->shared->stream_lock_id = parallel_apply_get_unique_id();
winfo->shared->transaction_lock_id = parallel_apply_get_unique_id();
It somehow feels clunky to be calling this
parallel_apply_get_unique_id() like this to scan all the same things 2
times. If you are going to keep this scanning logic then at least the
function should be changed to return a PAIR of lock-ids so you only;y
need to do 1x scan instead of 2x scan.
~~~

16. parallel_apply_send_data

+/*
+ * Send the data to the specified parallel apply worker via
shared-memory queue.
+ */
+void
+parallel_apply_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
+ const void *data)

The function comment needs more detail to explain the purpose of, and
how the thresholds work.

~~~

17. parallel_apply_wait_for_xact_finish

+/*
+ * Wait until the parallel apply worker's transaction finishes.
+ */
+void
+parallel_apply_wait_for_xact_finish(ParallelApplyWorkerShared *wshared)

I think this comment needs lots more details because the
implementation seems to be doing a lot more than just waiting for the
start to become "finished" - e.g. it seems to be waiting for it to
transition through the other stages as well...

~~~

18.

The boolean flag was changed to enum states so all these comments
mentioning "flag" are stale and need to be reworded/rewritten.

18a.
+ /*
+ * Wait until the parallel apply worker handles the first message and
+ * set the flag to true.
+ */

Update this comment

~

18b.
+ /*
+ * Wait until the flag becomes false in case the lock was released because
+ * of failure while applying.
+ */

Update this comment

~~~

19. parallel_apply_wait_for_in_xact

+/*
+ * Wait until the parallel apply worker's xact_state flag becomes
+ * the same as in_xact.
+ */
+static void
+parallel_apply_wait_for_in_xact(ParallelApplyWorkerShared *wshared,
+ ParallelTransState xact_state)

SUGGESTION
Wait until the parallel apply worker's transaction state becomes the
same as in_xact.

~~~

20.

+ /* Stop if the flag becomes the same as in_xact. */
+ if (parallel_apply_get_xact_state(wshared) >= xact_state)
+ break;

20a.
"flag" -> "transaction state",

~

20b.
This code uses >= comparison which means a strict order of the enum
values is assumed. So this order MUST be documented in the enum
typedef.

~~~

21. parallel_apply_set_xact_state

+/*
+ * Set the xact_state flag for the given parallel apply worker.
+ */
+void
+parallel_apply_set_xact_state(ParallelApplyWorkerShared *wshared,
+ ParallelTransState xact_state)

SUGGESTION
Set an enum indicating the transaction state for the given parallel
apply worker.

~~~

22. parallel_apply_get_xact_state

/*
* Get the xact_state flag for the given parallel apply worker.
*/
static ParallelTransState
parallel_apply_get_xact_state(ParallelApplyWorkerShared *wshared)

SUGGESTION
Get an enum indicating the transaction state for the given parallel
apply worker.

~~~

23. parallel_apply_set_fileset

+/*
+ * Set the fileset_valid flag and fileset for the given parallel apply worker.
+ */
+void
+parallel_apply_set_fileset(ParallelApplyWorkerShared *wshared, bool
fileset_valid)

As mentioned elsewhere (#12 above) I think would be better to split
this into 2 functions.

~~~

24. parallel_apply_lock/unlock

24a.
+/* Helper function to release a lock with lockid */
SUGGESTION
Helper function to release a lock identified by lockid.

~

24b.
+/* Helper function to take a lock with lockid */
SUGGESTION
Helper function to acquire a lock identified by lockid.

~

24c.
+/* Helper function to release a lock with lockid */
+void
+parallel_apply_lock(uint16 lockid)
...
+/* Helper function to take a lock with lockid */
+void
+parallel_apply_unlock(uint16 lockid)

Aren't those function comments around the wrong way?

======

src/backend/replication/logical/worker.c

25. File header comment

+ * The dynamic shared memory segment will contain (a) a shm_mq that can be used
+ * to send changes in the transaction from leader apply worker to parallel
+ * apply worker (b) another shm_mq that can be used to send errors (and other
+ * messages reported via elog/ereport) from the parallel apply worker to leader
+ * apply worker (c) necessary information to be shared among parallel apply
+ * workers and leader apply worker (i.e. the member in
+ * ParallelApplyWorkerShared).

"the member in ParallelApplyWorkerShared" -> "the members of
ParallelApplyWorkerShared"

~~~

26.

Shouldn't that comment have something to say about the
deadlock-detection design?

~~~

27. TransApplyAction

+typedef enum
{
- LogicalRepMsgType command; /* 0 if invalid */
- LogicalRepRelMapEntry *rel;
-
- /* Remote node information */
- int remote_attnum; /* -1 if invalid */
- TransactionId remote_xid;
- XLogRecPtr finish_lsn;
- char *origin_name;
-} ApplyErrorCallbackArg;
-
-static ApplyErrorCallbackArg apply_error_callback_arg =
+ /* The action for non-streaming transactions. */
+ TRANS_LEADER_APPLY,
+
+ /* Actions for streaming transactions. */
+ TRANS_LEADER_SERIALIZE,
+ TRANS_LEADER_PARTIAL_SERIALIZE,
+ TRANS_LEADER_SEND_TO_PARALLEL,
+ TRANS_PARALLEL_APPLY
+} TransApplyAction;

27a.
A new enum TRANS_LEADER_PARTIAL_SERIALIZE was added, but the
explanatory comment for it is missing

~

27b.
In fact, this new TRANS_LEADER_PARTIAL_SERIALIZE is used in many
places with no comments to explain what it is for.

~~~

28. handle_streamed_transaction

static bool
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
{
- TransactionId xid;
+ TransactionId current_xid;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+ StringInfoData origin_msg;
+
+ apply_action = get_transaction_apply_action(stream_xid, &winfo);

/* not in streaming mode */
- if (!in_streamed_transaction)
+ if (apply_action == TRANS_LEADER_APPLY)
return false;

- Assert(stream_fd != NULL);
Assert(TransactionIdIsValid(stream_xid));

+ origin_msg = *s;

28a.
There are no comments explaining what this
TRANS_LEADER_PARTIAL_SERIALIZE is doing. SO I cannot tell if
'origin_msg' is a meaningful name, or does that mean to say
'original_msg' ?

~

28b.
Why not assign it at the declaration, the same as
apply_handle_stream_prepare does?

~~~

29. apply_handle_stream_prepare

+ case TRANS_LEADER_PARTIAL_SERIALIZE:

Seems like there is a missing explanation of what this partial
serialize logic is doing.

~~~

30.

+ case TRANS_PARALLEL_APPLY:
+ parallel_apply_replorigin_setup();
+
+ /* Unlock all the shared object lock at transaction end. */
+ parallel_apply_unlock(MyParallelShared->stream_lock_id);
+
+ if (stream_fd)
+ BufFileClose(stream_fd);

Should be some explanatory comment, on what's going on here with the
stream_fd. E.g. how does it get to be non-NULL and why you do not set
it again to NULL after the BufFileClose.

~~~

31.

/*
+ * Handle STREAM START message when the transaction was spilled to disk.
+ *
+ * Inintialize fileset if not yet and open the file.
+ */
+void
+serialize_stream_start(TransactionId xid, bool first_segment)

Typo "Inintialize" -> "Initialize"

Looks like missing words in the comment.

SUGGESTION
Initialize fileset (if not already done), and open the file.

~~~

32. apply_handle_stream_start

- if (in_streamed_transaction)
+ if (!switching_to_serialize && in_streamed_transaction)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("duplicate STREAM START message")));

Somehow, I think this condition seems more natural if written the
other way around:

SUGGESTION
if (in_streamed_transaction && !switching_to_serialize)

~~~

33.

+ /*
+ * Increment the number of message waiting to be processed by
+ * parallel apply worker.
+ */
+ pg_atomic_add_fetch_u32(&(winfo->shared->left_message), 1);

33a.
"of message" -> "of messages".

~

33b.
The extra &() parens are not useful.

This same syntax is repeated in all the calls to that atomic function
so please search/fix all the others too...

~

33c.
The member name 'left_message' seems not a very good name. How about
'pending_message_count' or 'n_unprocessed_messages' or
'n_messages_remaining' or anything else more informative?

~~~

34. apply_handle_stream_abort

+static void
+apply_handle_stream_abort(StringInfo s)
+{
+ TransactionId xid;
+ TransactionId subxid;
+ LogicalRepStreamAbortData abort_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+ StringInfoData origin_msg = *s;

I'm unsure about that 'origin_msg' variable. Should that be called
'original_msg'?

~~~

35.

+ if (subxid == xid)

There are multiple parts of this logic that are doing (subxid == xid),
so it might be better to assign that to a meaningful variable name
instead of the repeated comparisons.

36.

+ * The file was deleted if aborted the whole transaction, so
+ * create it again in this case.

English? Missing words?

~~~

37.

+ /*
+ * Increment the number of message waiting to be processed by
+ * parallel apply worker.
+ */

"message" -> "messages"

~~~

38.

+ /*
+ * If there is no message left, wait for the leader to release the
+ * lock and send more messages.
+ */
+ if (xid != subxid &&
+ pg_atomic_sub_fetch_u32(&(MyParallelShared->left_message), 1) == 0)
+ parallel_apply_lock(MyParallelShared->stream_lock_id);

The comment says "wait for the leader"... but the comment seems
misleading - there is no waiting happening here.

~~~

39. apply_spooled_messages

+
/*
* Common spoolfile processing.
*/
-static void
-apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
+void
+apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
+ XLogRecPtr lsn)

Spurious extra blank line above this function.

~~~

40.

- fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
+ fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY,
false);

Unnecessary wrapping.

~~~

41.

+ fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY,
false);
+ stream_fd = fd;

Is it still meaningful to have the local 'fd' variable? Might as well
just use 'stream_fd' instead now, right?

~~~

42.

+ /*
+ * Break the loop if parallel apply worker have finished applying the
+ * transaction. The parallel apply worker should have close the file
+ * before committing.
+ */

English?

"if parallel" -> "if the parallel"

"have finished" -> "has finished"

"should have close" -> "should have closed"

~~~

43. apply_handle_stream_commit

LogicalRepCommitData commit_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+ StringInfoData origin_msg = *s

I'm unsure about that 'origin_msg' variable. Should that be called
'original_msg' ?

~~~

44. stream_write_message

+ * stream_write_message
+ * Serialize the message that are not in a streaming block to a file.
+ */
+static void
+stream_write_message(TransactionId xid, char action, StringInfo s,
+ bool create_file)

44a.
This logic seems new, but the function comment sounds strange
(English/typos?) and it is not giving enough details about when is
this file, and for what purpose are we writing to it?

~

44b.
If this is always written to a file, then wouldn't a better function
name be something including the word "serialize" - e.g.
serialize_message()?

======

src/backend/replication/logical/launcher.c

45. logicalrep_worker_onexit

+ /*
+ * Release all the session level lock that could be held in parallel apply
+ * mode.
+ */
+ LockReleaseAll(DEFAULT_LOCKMETHOD, true);

"the session level lock" -> "session level locks"

======

src/include/replication/worker_internal.h

46. ParallelApplyWorkerShared

+ /*
+ * Flag used to ensure commit ordering.
+ *
+ * The parallel apply worker will set it to false after handling the
+ * transaction finish commands while the apply leader will wait for it to
+ * become false before proceeding in transaction finish commands (e.g.
+ * STREAM_COMMIT/STREAM_ABORT/STREAM_PREPARE).
+ */
+ ParallelTransState xact_state;

The comment has gone stale because this member is not a boolean flag
anymore, so saying "will set it to false" is wrong...

~~~

47.

+ /* Unique identifiers in the current subscription that used to lock. */
+ uint16 stream_lock_id;
+ uint16 transaction_lock_id;

Comment English?

~~~

48.

+ pg_atomic_uint32 left_message;

Needs explanatory comment.

~~~

49.

+ /* Whether there is partially sent message left in the queue. */
+ bool partial_sent_message;

Comment English?

~~~

50.

+ /*
+ * Don't use SharedFileSet here because the fileset is shared by the leader
+ * worker and the fileset in leader need to survive after releasing the
+ * shared memory so that the leader can re-use the fileset for next
+ * streaming transaction.
+ */
+ bool fileset_valid;
+ FileSet fileset;

The comment here seems to need some more work because it is saying
more about what it *isn't*, rather than what it *is*.

Something like:

The 'fileset' is used for....
The 'fileset' is only valid to use when the accompanying fileset_valid
flag is true...
NOTE - We cannot use a SharedFileSet here because....

Also, fix typos "need to survive" -> "needs to survive".

Also, it may be better to refer to the "leader apply worker" by its
full name instead of just "leader".

~~~

51. typedef struct ParallelApplyWorkerInfo

+ bool serialize_changes;

Needs explanatory comment.

~~

52.

+ /*
+ * Used to save the message that was only partially sent to parallel apply
+ * worker.
+ */
+ char *pending_msg;

Some information seems missing because this comment does not have
enough detail to know what it means - e.g. what is a partially sent
message?

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Simon Riggs 2022-11-07 08:20:32 Re: Allow single table VACUUM in transaction block
Previous Message Aleksander Alekseev 2022-11-07 08:16:32 Re: Add common function ReplicationOriginName.