RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-07 03:01:35
Message-ID: OS0PR01MB57167C91F40088516C3C21D2941A9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tue, Dec 6, 2022 7:57 AM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> Here are my review comments for patch v55-0002

Thansk for your comments.

> ======
>
> .../replication/logical/applyparallelworker.c
>
> 1. pa_can_start
>
> @@ -276,9 +278,9 @@ pa_can_start(TransactionId xid)
> /*
> * Don't start a new parallel worker if user has set skiplsn as it's
> * possible that user want to skip the streaming transaction. For
> - * streaming transaction, we need to spill the transaction to disk so
> that
> - * we can get the last LSN of the transaction to judge whether to
> skip
> - * before starting to apply the change.
> + * streaming transaction, we need to serialize the transaction to a
> + file
> + * so that we can get the last LSN of the transaction to judge
> + whether to
> + * skip before starting to apply the change.
> */
> if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
> return false;
>
> I think the wording change may belong in patch 0001 because it has
> nothing to do with partial serializing.

Changed.

> ~~~
>
> 2. pa_free_worker
>
> + /*
> + * Stop the worker if there are enough workers in the pool.
> + *
> + * XXX The worker is also stopped if the leader apply worker needed
> + to
> + * serialize part of the transaction data due to a send timeout. This
> + is
> + * because the message could be partially written to the queue due to
> + send
> + * timeout and there is no way to clean the queue other than
> + resending the
> + * message until it succeeds. To avoid complexity, we directly stop
> + the
> + * worker in this case.
> + */
> + if (winfo->serialize_changes ||
> + napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
>
> Don't need to say "due to send timeout" 2 times in 2 sentences.
>
> SUGGESTION
> XXX The worker is also stopped if the leader apply worker needed to
> serialize part of the transaction data due to a send timeout. This is
> because the message could be partially written to the queue but there
> is no way to clean the queue other than resending the message until it
> succeeds. Directly stopping the worker avoids needing this complexity.

Changed.

> 4.
>
> /*
> + * Replay the spooled messages in the parallel apply worker if the
> +leader apply
> + * worker has finished serializing changes to the file.
> + */
> +static void
> +pa_spooled_messages(void)
>
> I'm not 100% sure of the logic, so IMO maybe the comment should say a
> bit more about how this works:
>
> Specifically, let's say there was some timeout and the LA needed to
> write the spool file, then let's say the PA timed out and found itself
> inside this function. Now, let's say the LA is still busy writing the
> file -- so what happens next?
>
> Does this function simply return, then the main PA loop waits again,
> then the times out again, then PA finds itself back inside this
> function again... and that keeps happening over and over until
> eventually the spool file is found FS_READY? Some explanatory comments
> might help.

Slightly changed the logic and comment here.

> ~
>
> 5.
>
> + /*
> + * Check if changes have been serialized to a file. if so, read and
> + apply
> + * them.
> + */
> + SpinLockAcquire(&MyParallelShared->mutex);
> + fileset_state = MyParallelShared->fileset_state;
> + SpinLockRelease(&MyParallelShared->mutex);
>
> "if so" -> "If so"

Changed.

> ~~~
>
>
> 6. pa_send_data
>
> + *
> + * If the attempt to send data via shared memory times out, then we
> + will
> switch
> + * to "PARTIAL_SERIALIZE mode" for the current transaction to prevent
> possible
> + * deadlocks with another parallel apply worker (refer to the
> + comments atop
> + * applyparallelworker.c for details). This means that the current
> + data and any
> + * subsequent data for this transaction will be serialized to a file.
> */
> void
> pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void
> *data)
>
> SUGGESTION (minor comment rearranging)
>
> If the attempt to send data via shared memory times out, then we will
> switch to "PARTIAL_SERIALIZE mode" for the current transaction -- this
> means that the current data and any subsequent data for this
> transaction will be serialized to a file. This is done to prevent
> possible deadlocks with another parallel apply worker (refer to the
> comments atop applyparallelworker.c for details).

Changed.

> ~
>
> 7.
>
> + /*
> + * Take the stream lock to make sure that the parallel apply worker
> + * will wait for the leader to release the stream lock until the
> + * end of the transaction.
> + */
> + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
>
> The comment doesn't sound right.
>
> "until the end" -> "at the end" (??)

I think it means "PA wait ... until the end of transaction".

> ~~~
>
> 8. pa_stream_abort
>
> @@ -1374,6 +1470,7 @@ pa_stream_abort(LogicalRepStreamAbortData
> *abort_data)
> RollbackToSavepoint(spname);
> CommitTransactionCommand();
> subxactlist = list_truncate(subxactlist, i + 1);
> +
> break;
> }
> }
> Spurious whitespace unrelated to this patch?

Changed.

> ======
>
> src/backend/replication/logical/worker.c
>
> 9. handle_streamed_transaction
>
> /*
> + * The parallel apply worker needs the xid in this message to decide
> + * whether to define a savepoint, so save the original message that
> + has not
> + * moved the cursor after the xid. We will serailize this message to
> + a file
> + * in PARTIAL_SERIALIZE mode.
> + */
> + original_msg = *s;
>
> "serailize" -> "serialize"

Changed.

> ~~~
>
> 10. apply_handle_stream_prepare
>
> @@ -1245,6 +1265,7 @@ apply_handle_stream_prepare(StringInfo s)
> LogicalRepPreparedTxnData prepare_data;
> ParallelApplyWorkerInfo *winfo;
> TransApplyAction apply_action;
> + StringInfoData original_msg = *s;
>
> Should this include a longer explanation of why this copy is needed
> (same as was done in handle_streamed_transaction)?

Added the blow comment atop this variable.
```
Save the message before it is consumed.
```

> ~
>
> 11.
>
> case TRANS_PARALLEL_APPLY:
> +
> + /*
> + * Close the file before committing if the parallel apply worker
> + * is applying spooled messages.
> + */
> + if (stream_fd)
> + stream_close_file();
>
> 11a.
>
> This comment seems worded backwards.
>
> SUGGESTION
> If the parallel apply worker is applying spooled messages then close
> the file before committing.

Changed.

> ~
>
> 11b.
>
> I'm confused - isn't there code doing exactly this (close file before
> commit) already in the apply_handle_stream_commit
> TRANS_PARALLEL_APPLY?

I think here is a typo.
Changed the action in the comment. (committing -> preparing)

> ~
>
> 13.
>
> + serialize_stream_start(stream_xid, false);
> + stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
>
> - end_replication_step();
> break;
>
> A spurious blank line is left before the break;

Changed.

> ~~~
>
> 14. serialize_stream_stop
>
> + /* We must be in a valid transaction state */
> + Assert(IsTransactionState());
>
> The comment seems redundant. The code says the same.

Changed.

> ~
>
> 17.
>
> + /*
> + * No need to output the DEBUG message here in the parallel apply
> + * worker as similar messages will be output when handling
> + STREAM_STOP
> + * message.
> + */
> + if (!am_parallel_apply_worker() && nchanges % 1000 == 0)
> elog(DEBUG1, "replayed %d changes from file \"%s\"",
> nchanges, path);
>
> Instead of saying what you are not doing ("No need to... in output
> apply worker") wouldn't it make more sense to reverse it and say what
> you are doing ("Only log DEBUG messages for the leader apply worker
> because ...") and then the condition also becomes positive:
>
> if (am_leader_apply_worker())
> {
> ...
> }

Removed this condition according to Amit's comment.

> ~
>
> 18.
>
> + if (am_parallel_apply_worker() &&
> + MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED)
> + goto done;
> +
> + /*
> + * No need to output the DEBUG message here in the parallel apply
> + * worker as similar messages will be output when handling
> + STREAM_STOP
> + * message.
> + */
> + if (!am_parallel_apply_worker() && nchanges % 1000 == 0)
> elog(DEBUG1, "replayed %d changes from file \"%s\"",
> nchanges, path);
> }
>
> - BufFileClose(fd);
> -
> + stream_close_file();
> pfree(buffer);
> pfree(s2.data);
>
> +done:
> elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
> nchanges, path);
>
> Shouldn't that "done:" label be *above* the pfree's. Otherwise, those
> are going to be skipped over by the "goto done;".

After reconsidering, I think there is no need to 'pfree' these two variables here,
because they are allocated in toplevel transaction's context and will be freed very soon.
So, I just removed these pfree().

> ======
>
> src/include/replication/worker_internal.h
>
> 21. PartialFileSetState
>
>
> + * State of fileset in leader apply worker.
> + *
> + * FS_BUSY means that the leader is serializing changes to the file.
> +FS_READY
> + * means that the leader has serialized all changes to the file and
> +the file is
> + * ready to be read by a parallel apply worker.
> + */
> +typedef enum PartialFileSetState
>
> "ready to be read" sounded a bit strange.
>
> SUGGESTION
> ... to the file so it is now OK for a parallel apply worker to read it.

Changed.

Best regards,
Hou zj

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2022-12-07 03:05:53 Re: meson PGXS compatibility
Previous Message Kyotaro Horiguchi 2022-12-07 02:59:43 Re: Time delayed LR (WAS Re: logical replication restrictions)