RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-29 05:11:34
Message-ID: OS0PR01MB5716350729D8C67AA8CE333194129@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, November 28, 2022 15:19 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> Here are some review comments for patch v51-0002

Thanks for your comments!

> ======
>
> 1.
>
> GENERAL - terminology: spool/serialize and data/changes/message
>
> The terminology seems to be used at random. IMO it might be worthwhile
> rechecking at least that terms are used consistently in all the
> comments. e.g "serialize message data to disk" ... and later ...
> "apply the spooled messages".
>
> Also for places where it says "Write the message to file" maybe
> consider using consistent terminology like "serialize the message to a
> file".
>
> Also, try to standardize the way things are described by using
> consistent (if they really are the same) terminology for "writing
> data" VS "writing data" VS "writing messages" etc. It is confusing
> trying to know if the different wording has some intended meaning or
> is it just random.

I changes some of them, but I think there some things left which I will recheck in next version.
And I think we'd better not change comments that refer to existing comments or functions or variables.
For example, it’s fine for comments that refer to apply_spooled_message to use "spool" "message".

> ======
>
> Commit message
>
> 2.
> When the leader apply worker times out while sending a message to the
> parallel apply worker. Instead of erroring out, switch to partial
> serialize mode and let the leader serialize all remaining changes to
> the file and notify the parallel apply workers to read and apply them at the end of the transaction.
>
> ~
>
> The first sentence seems incomplete
>
> SUGGESTION.
> In patch 0001 if the leader apply worker times out while attempting to
> send a message to the parallel apply worker it results in an ERROR.
>
> This patch (0002) modifies that behaviour, so instead of erroring it
> will switch to "partial serialize" mode - in this mode the leader
> serializes all remaining changes to a file and notifies the parallel
> apply workers to read and apply them at the end of the transaction.
>
> ~~~
>
> 3.
>
> This patch 0002 is called “Serialize partial changes to disk if the
> shm_mq buffer is full”, but the commit message is saying nothing about
> the buffer filling up. I think the Commit message should be mentioning
> something that makes the commit patch name more relevant. Otherwise
> change the patch name.

Changed.

> ======
>
> .../replication/logical/applyparallelworker.c
>
> 4. File header comment
>
> + * timeout is exceeded, the LA will write to file and indicate PA-2
> + that it
> + * needs to read file for remaining messages. Then LA will start
> + waiting for
> + * commit which will detect deadlock if any. (See pa_send_data() and
> + typedef
> + * enum TransApplyAction)
>
> "needs to read file for remaining messages" -> "needs to read that
> file for the remaining messages"

Changed.

> ~~~
>
> 5. pa_free_worker
>
> + /*
> + * Stop the worker if there are enough workers in the pool.
> + *
> + * XXX we also need to stop the worker if the leader apply worker
> + * serialized part of the transaction data to a file due to send timeout.
> + * This is because the message could be partially written to the
> + queue due
> + * to send timeout and there is no way to clean the queue other than
> + * resending the message until it succeeds. To avoid complexity, we
> + * directly stop the worker in this case.
> + */
> + if (winfo->serialize_changes ||
> + napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
>
> 5a.
>
> + * XXX we also need to stop the worker if the leader apply worker
> + * serialized part of the transaction data to a file due to send timeout.
>
> SUGGESTION
> XXX The worker is also stopped if the leader apply worker needed to
> serialize part of the transaction data due to a send timeout.
>
> ~
>
> 5b.
>
> + /* Unlink the files with serialized changes. */ if
> + (winfo->serialize_changes)
> + stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
>
> A better comment might be
>
> SUGGESTION
> Unlink any files that were needed to serialize partial changes.

Changed.

> ~~~
>
> 6. pa_spooled_messages
>
> /*
> * Replay the spooled messages in the parallel apply worker if leader
> apply
> * worker has finished serializing changes to the file.
> */
> static void
> pa_spooled_messages(void)
>
> 6a.
> IMO a better name for this function would be
> pa_apply_spooled_messages();

Not sure about this.

> ~
>
> 6b.
> "if leader apply" -> "if the leader apply"

Changed.

> ~
>
> 7.
>
> + /*
> + * Acquire the stream lock if the leader apply worker is serializing
> + * changes to the file, because the parallel apply worker will no
> + longer
> + * have a chance to receive a STREAM_STOP and acquire the lock until
> + the
> + * leader serialize all changes to the file.
> + */
> + if (fileset_state == LEADER_FILESET_BUSY) {
> + pa_lock_stream(MyParallelShared->xid, AccessShareLock);
> + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); }
>
> SUGGESTION (rearranged comment - please check, I am not sure if I got
> this right)
>
> If the leader apply worker is still (busy) serializing partial changes
> then the parallel apply worker acquires the stream lock now.
> Otherwise, it would not have a chance to receive a STREAM_STOP (and
> acquire the stream lock) until the leader had serialized all changes.

Changed.

> ~~~
>
> 8. pa_send_data
>
> + *
> + * When sending data times out, data will be serialized to disk. And
> + the
> + * current streaming transaction will enter PARTIAL_SERIALIZE mode,
> + which
> means
> + * that subsequent data will also be serialized to disk.
> */
> void
> pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void
> *data)
>
> SUGGESTION (minor comment change)
>
> If the attempt to send data via shared memory times out, then we will
> switch to "PARTIAL_SERIALIZE mode" for the current transaction. This
> means that the current data and any subsequent data for this
> transaction will be serialized to disk.

Changed.

> ~
>
> 9.
>
> Assert(!IsTransactionState());
> + Assert(!winfo->serialize_changes);
>
> How about also asserting that this must be the LA worker?

Not sure about this as I think the parallel apply worker won't have a winfo.

> ~
>
> 10.
>
> + /*
> + * The parallel apply worker might be stuck for some reason, so
> + * stop sending data to parallel worker and start to serialize
> + * data to files.
> + */
> + winfo->serialize_changes = true;
>
> SUGGESTION (minor reword)
> The parallel apply worker might be stuck for some reason, so stop
> sending data directly to it and start to serialize data to files
> instead.

Changed.

> ~
>
> 11.
> + /* Skip first byte and statistics fields. */ msg.cursor +=
> + SIZE_STATS_MESSAGE + 1;
>
> IMO it would be better for the comment order and the code calculation
> order to be the same.
>
> SUGGESTION
> /* Skip first byte and statistics fields. */ msg.cursor += 1 +
> SIZE_STATS_MESSAGE;

Changed.

> ~
>
> 12. pa_stream_abort
>
> + /*
> + * If the parallel apply worker is applying the spooled
> + * messages, we save the current file position and close the
> + * file to prevent the file from being accidentally closed on
> + * rollback.
> + */
> + if (stream_fd)
> + {
> + BufFileTell(stream_fd, &fileno, &offset); BufFileClose(stream_fd);
> + reopen_stream_fd = true; }
> +
> RollbackToSavepoint(spname);
> CommitTransactionCommand();
> subxactlist = list_truncate(subxactlist, i + 1);
> +
> + /*
> + * Reopen the file and set the file position to the saved
> + * position.
> + */
> + if (reopen_stream_fd)
>
> It seems a bit vague to just refer to "close the file" and "reopen the
> file" in these comments. IMO it would be better to call this file by a
> name like "the message spool file" or similar. Please check all other
> similar comments.

Changed.

> ~~~
>
> 13. pa_set_fileset_state
>
> /*
> + * Set the fileset_state flag for the given parallel apply worker.
> +The
> + * stream_fileset of the leader apply worker will be written into the
> +shared
> + * memory if the fileset_state is LEADER_FILESET_ACCESSIBLE.
> + */
> +void
> +pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
> +LeaderFileSetState fileset_state) {
>
> 13a.
>
> It is an enum -- not a "flag", so:
>
> "fileset_state flag" -> "fileste state"

Changed.

> ~~
>
> 13b.
>
> It seemed strange to me that the comment/code says this state is only
> written to shm when it is "ACCESSIBLE".... IIUC this same filestate
> lingers around to be reused for other workers so I expected the state
> should *always* be written whenever the LA changes it. (I mean even if
> the PA is not needing to look at this member, I still think it should
> have the current/correct value in it).

I think we will always change the state.
Or do you mean the fileset is only written(not the state) when it is ACCESSIBLE?
The fileset cannot be used before it's READY, so I didn't write that fileset into
shared memory before that.

> ======
>
> src/backend/replication/logical/worker.c
>
> 14. TRANS_LEADER_SEND_TO_PARALLEL
>
> + * TRANS_LEADER_PARTIAL_SERIALIZE:
> + * The action means that we are in the leader apply worker and have
> + sent
> some
> + * changes to the parallel apply worker, but the remaining changes
> + need to be
> + * serialized to disk due to timeout while sending data, and the
> + parallel apply
> + * worker will apply these changes when the final commit arrives.
> + *
> + * One might think we can use LEADER_SERIALIZE directly. But in
> + partial
> + * serialize mode, in addition to serializing changes to file, the
> + leader
> + * worker needs to write the STREAM_XXX message to disk, and needs to
> + wait
> for
> + * parallel apply worker to finish the transaction when processing
> + the
> + * transaction finish command. So a new action was introduced to make
> + the
> logic
> + * clearer.
> + *
> * TRANS_LEADER_SEND_TO_PARALLEL:
>
>
> SUGGESTION (Minor wording changes)
> The action means that we are in the leader apply worker and have sent
> some changes directly to the parallel apply worker, due to timeout
> while sending data the remaining changes need to be serialized to
> disk. The parallel apply worker will apply these serialized changes
> when the final commit arrives.
>
> LEADER_SERIALIZE could not be used for this case because, in addition
> to serializing changes, the leader worker also needs to write the
> STREAM_XXX message to disk, and wait for the parallel apply worker to
> finish the transaction when processing the transaction finish command.
> So this new action was introduced to make the logic clearer.

Changed.

> ~
>
> 15.
> /* Actions for streaming transactions. */
> TRANS_LEADER_SERIALIZE,
> + TRANS_LEADER_PARTIAL_SERIALIZE,
> TRANS_LEADER_SEND_TO_PARALLEL,
> TRANS_PARALLEL_APPLY
>
> Although it makes no difference I felt it would be better to put
> TRANS_LEADER_PARTIAL_SERIALIZE *after* TRANS_LEADER_SEND_TO_PARALLEL
> because that would be the order that these mode changes occur in the
> logic...

I thought that it is fine as it follows LEADER_SERIALIZE which is similar to
LEADER_PARTIAL_SERIALIZE.

> ~~~
>
> 16.
>
> @@ -375,7 +388,7 @@ typedef struct ApplySubXactData static
> ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
>
> static inline void subxact_filename(char *path, Oid subid,
> TransactionId xid); -static inline void changes_filename(char *path,
> Oid subid, TransactionId xid);
> +inline void changes_filename(char *path, Oid subid, TransactionId
> +xid);
>
> IIUC (see [1]) when this function was made non-static the "inline"
> should have been put into the header file.

Changed this function from "inline void" to "void" as I am not sure is it better to put
this function's definition on header file.

> ~
>
> 17.
> @@ -388,10 +401,9 @@ static inline void cleanup_subxact_info(void);
> /*
> * Serialize and deserialize changes for a toplevel transaction.
> */
> -static void stream_cleanup_files(Oid subid, TransactionId xid);
> static void stream_open_file(Oid subid, TransactionId xid,
> bool first_segment);
> -static void stream_write_change(char action, StringInfo s);
> +static void stream_write_message(TransactionId xid, char action,
> +StringInfo s);
> static void stream_close_file(void);
>
> 17a.
>
> I felt just saying "file/files" is too vague. All the references to
> the file should be consistent, so IMO everything would be better named
> like:
>
> "stream_cleanup_files" -> "stream_msg_spoolfile_cleanup()"
> "stream_open_file" -> "stream_msg_spoolfile_open()"
> "stream_close_file" -> "stream_msg_spoolfile_close()"
> "stream_write_message" -> "stream_msg_spoolfile_write_msg()"

Renamed the function stream_write_message to stream_open_and_write_change.

> ~
>
> 17b.
> IMO there is not enough distinction here between function names
> stream_write_message and stream_write_change. e.g. You cannot really
> tell from their names what might be the difference.

Changed the name.

> ~~~
>
> 18.
>
> @@ -586,6 +595,7 @@ handle_streamed_transaction(LogicalRepMsgType
> action, StringInfo s)
> TransactionId current_xid;
> ParallelApplyWorkerInfo *winfo;
> TransApplyAction apply_action;
> + StringInfoData original_msg;
>
> apply_action = get_transaction_apply_action(stream_xid, &winfo);
>
> @@ -595,6 +605,8 @@ handle_streamed_transaction(LogicalRepMsgType
> action, StringInfo s)
>
> Assert(TransactionIdIsValid(stream_xid));
>
> + original_msg = *s;
> +
> /*
> * We should have received XID of the subxact as the first part of the
> * message, so extract it.
> @@ -618,10 +630,14 @@ handle_streamed_transaction(LogicalRepMsgType
> action, StringInfo s)
> stream_write_change(action, s);
> return true;
>
> + case TRANS_LEADER_PARTIAL_SERIALIZE:
> case TRANS_LEADER_SEND_TO_PARALLEL:
> Assert(winfo);
>
> - pa_send_data(winfo, s->len, s->data);
> + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL)
> + pa_send_data(winfo, s->len, s->data); else
> + stream_write_change(action, &original_msg);
>
> The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE
> case so I think it should only be declared/assigned in the scope of
> that 'else'

The member 'cursor' of 's' is changed after invoking the function pq_getmsgint.
So 'original_msg' is assigned before invoking the function pq_getmsgint.

> ~
>
> 20.
>
> + /*
> + * Close the file before committing if the parallel apply is
> + * applying spooled changes.
> + */
> + if (stream_fd)
> + BufFileClose(stream_fd);
>
> I found this a bit confusing because there is already a
> stream_close_file() wrapper function which does almost the same as
> this. So either this code should be calling that function, or the
> comment here should be explaining why this code is NOT calling that
> function.

Changed.

> ~~~
>
> 21. serialize_stream_start
>
> +/*
> + * Initialize fileset (if not already done).
> + *
> + * Create a new file when first_segment is true, otherwise open the
> +existing
> + * file.
> + */
> +void
> +serialize_stream_start(TransactionId xid, bool first_segment)
>
> IMO this function should be called stream_msg_spoolfile_init() or
> stream_msg_spoolfile_begin() to match the pattern for function names
> of the message spool file that I previously suggested. (see review
> comment #17a)

I am not sure about the name is better. I will think over this and adjust in next version.

> ~
>
> 22.
>
> + /*
> + * Initialize the worker's stream_fileset if we haven't yet. This
> + will be
> + * used for the entire duration of the worker so create it in a
> + permanent
> + * context. We create this on the very first streaming message from
> + any
> + * transaction and then use it for this and other streaming transactions.
> + * Now, we could create a fileset at the start of the worker as well
> + but
> + * then we won't be sure that it will ever be used.
> + */
> + if (!MyLogicalRepWorker->stream_fileset)
>
> I assumed this is a typo "Now," --> "Note," ?

That seems the existing comments, I am not sure it's a typo or not.

> ~
>
> 24.
>
> /*
> - * Start a transaction on stream start, this transaction will be
> - * committed on the stream stop unless it is a tablesync worker in
> - * which case it will be committed after processing all the
> - * messages. We need the transaction for handling the buffile,
> - * used for serializing the streaming data and subxact info.
> + * serialize_stream_start will start a transaction, this
> + * transaction will be committed on the stream stop unless it is a
> + * tablesync worker in which case it will be committed after
> + * processing all the messages. We need the transaction for
> + * handling the buffile, used for serializing the streaming data
> + * and subxact info.
> */
> - begin_replication_step();
> + serialize_stream_start(stream_xid, first_segment); break;
>
> Make the comment a bit more natural.
>
> SUGGESTION
>
> Function serialize_stream_start starts a transaction. This transaction
> will be committed on the stream stop unless it is a tablesync worker
> in which case it will be committed after processing all the messages.
> We need this transaction for handling the BufFile, used for
> serializing the streaming data and subxact info.

Changed.

> ~
>
> 25.
>
> + case TRANS_LEADER_PARTIAL_SERIALIZE:
> /*
> - * Initialize the worker's stream_fileset if we haven't yet. This
> - * will be used for the entire duration of the worker so create it
> - * in a permanent context. We create this on the very first
> - * streaming message from any transaction and then use it for this
> - * and other streaming transactions. Now, we could create a
> - * fileset at the start of the worker as well but then we won't be
> - * sure that it will ever be used.
> + * The file should have been created when entering
> + * PARTIAL_SERIALIZE mode so no need to create it again. The
> + * transaction started in serialize_stream_start will be committed
> + * on the stream stop.
> */
> - if (!MyLogicalRepWorker->stream_fileset)
>
> BEFORE
> The file should have been created when entering PARTIAL_SERIALIZE mode
> so no need to create it again.
>
> SUGGESTION
> The message spool file was already created when entering
> PARTIAL_SERIALIZE mode.

Changed.

> ~~~
>
> 26. serialize_stream_stop
>
> /*
> + * Update the information about subxacts and close the file.
> + *
> + * This function should be called when the serialize_stream_start
> +function has
> + * been called.
> + */
> +void
> +serialize_stream_stop(TransactionId xid)
>
> Maybe 2nd part of that comment should be something more like
>
> SUGGESTION
> This function ends what was started by the function serialize_stream_start().

I am thinking about a new function name and will adjust this in next version.

> ~
>
> 27.
>
> + /*
> + * Close the file with serialized changes, and serialize information
> + about
> + * subxacts for the toplevel transaction.
> + */
> + subxact_info_write(MyLogicalRepWorker->subid, xid);
> + stream_close_file();
>
> Should the comment and the code be in the same order?
>
> SUGGESTION
> Serialize information about subxacts for the toplevel transaction,
> then close the stream messages spool file.

Changed.

> ~~~
>
> 28. handle_stream_abort
>
> + case TRANS_LEADER_PARTIAL_SERIALIZE:
> + Assert(winfo);
> +
> + /*
> + * Parallel apply worker might have applied some changes, so write
> + * the STREAM_ABORT message so that the parallel apply worker can
> + * rollback the subtransaction if needed.
> + */
> + stream_write_message(xid, LOGICAL_REP_MSG_STREAM_ABORT,
> + &original_msg);
> +
>
> 28a.
> The original_msg is not used except for TRANS_LEADER_PARTIAL_SERIALIZE
> case so I think it should only be declared/assigned in the scope of
> that case.
>
> ~
>
> 28b.
> "so that the parallel apply worker can" -> "so that it can"

Changed.

> ~~~
>
> 29. apply_spooled_messages
>
> +void
> +apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
> + XLogRecPtr lsn)
> {
> StringInfoData s2;
> int nchanges;
> char path[MAXPGPATH];
> char *buffer = NULL;
> MemoryContext oldcxt;
> - BufFile *fd;
>
> - maybe_start_skipping_changes(lsn);
> + if (!am_parallel_apply_worker())
> + maybe_start_skipping_changes(lsn);
>
> /* Make sure we have an open transaction */
> begin_replication_step();
> @@ -1810,8 +1913,8 @@ apply_spooled_messages(TransactionId xid,
> XLogRecPtr lsn)
> changes_filename(path, MyLogicalRepWorker->subid, xid);
> elog(DEBUG1, "replaying changes from file \"%s\"", path);
>
> - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
> O_RDONLY,
> - false);
> + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY,
> + false); stream_xid = xid;
>
> IMO it seems strange to me that the fileset is passed as a parameter
> but then the resulting fd is always assigned to a single global
> variable (regardless of what the fileset was passed).

I am not sure about this as we already have similar code in stream_open_file().

> ~
>
> 30.
>
> - BufFileClose(fd);
> -
> + BufFileClose(stream_fd);
> pfree(buffer);
> pfree(s2.data);
>
> +done:
> + stream_fd = NULL;
> + stream_xid = InvalidTransactionId;
> +
>
> This code fragment seems to be doing almost the same as what function
> stream_close_file() is doing. Should you just call that instead?

Changed.

> ======
>
> src/include/replication/worker_internal.h
>
> 33. LeaderFileSetState
>
> +/* State of fileset in leader apply worker. */ typedef enum
> +LeaderFileSetState { LEADER_FILESET_UNKNOWN, LEADER_FILESET_BUSY,
> +LEADER_FILESET_ACCESSIBLE } LeaderFileSetState;
>
> 33a.
>
> Missing from typedefs.list?
>
> ~
>
> 33b.
>
> I thought some more explanatory comments for the meaning of
> BUSY/ACCESSIBLE should be here.
>
> ~
>
> 33c.
>
> READY might be a better value than ACCESSIBLE
>
> ~
>
> 33d.
> I'm not sure what usefulness does the "LEADER_" and "Leader" prefixes
> give here. Maybe a name like PartialFileSetStat is more meaningful?
>
> e.g. like this?
>
> typedef enum PartialFileSetState
> {
> FS_UNKNOWN,
> FS_BUSY,
> FS_READY
> } PartialFileSetState;

Changed.

> ~~~
>
> 34. ParallelApplyWorkerShared
>
> + /*
> + * The leader apply worker will serialize changes to the file after
> + * entering PARTIAL_SERIALIZE mode and share the fileset with the
> + parallel
> + * apply worker when processing the transaction finish command. And
> + then
> + * the parallel apply worker will apply all the spooled messages.
> + *
> + * Don't use SharedFileSet here as we need the fileset to survive
> + after
> + * releasing the shared memory so that the leader apply worker can
> + re-use
> + * the fileset for next streaming transaction.
> + */
> + LeaderFileSetState fileset_state;
> + FileSet fileset;
>
> Minor rewording of that comment
>
> SUGGESTION
> After entering PARTIAL_SERIALIZE mode, the leader apply worker will
> serialize changes to the file, and share the fileset with the parallel
> apply worker when processing the transaction finish command. Then the
> parallel apply worker will apply all the spooled messages.
>
> FileSet is used here instead of SharedFileSet because we need it to
> survive after releasing the shared memory so that the leader apply
> worker can re-use the same fileset for the next streaming transaction.

Changed.

> ~~~
>
> 35. globals
>
> /*
> + * Indicates whether the leader apply worker needs to serialize the
> + * remaining changes to disk due to timeout when sending data to the
> + * parallel apply worker.
> + */
> + bool serialize_changes;
>
> 35a.
> I wonder if the comment would be better to also mention "via shared memory".
>
> SUGGESTION
>
> Indicates whether the leader apply worker needs to serialize the
> remaining changes to disk due to timeout when attempting to send data
> to the parallel apply worker via shared memory.

Changed.

Best regards,
Hou zj

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message rajesh singarapu 2022-11-29 05:22:34 Re: Support logical replication of DDLs
Previous Message rajesh singarapu 2022-11-29 05:05:32 Re: Support logical replication of DDLs