Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-04 08:06:42
Message-ID: CAA4eK1+UK0eN9hqU1JqY5WR5-YNbh6_2t8Zvd3bXpViQSE2+Rw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Nov 3, 2022 at 6:36 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> Thanks for the analysis and summary !
>
> I tried to implement the above idea and here is the patch set.
>

Few comments on v42-0001
===========================
1.
+ /*
+ * Set the xact_state flag in the leader instead of the
+ * parallel apply worker to avoid the race condition where the leader has
+ * already started waiting for the parallel apply worker to finish
+ * processing the transaction while the child process has not yet
+ * processed the first STREAM_START and has not set the
+ * xact_state to true.
+ */
+ SpinLockAcquire(&winfo->shared->mutex);
+ winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;

The comments and code for xact_state doesn't seem to match.

2.
+ * progress. This could happend as we don't wait for transaction rollback
+ * to finish.
+ */

/happend/happen

3.
+/* Helper function to release a lock with lockid */
+void
+parallel_apply_lock(uint16 lockid)
...
...
+/* Helper function to take a lock with lockid */
+void
+parallel_apply_unlock(uint16 lockid)

Here, the comments seems to be reversed.

4.
+parallel_apply_lock(uint16 lockid)
+{
+ MemoryContext oldcontext;
+
+ if (list_member_int(ParallelApplyLockids, lockid))
+ return;
+
+ LockSharedObjectForSession(SubscriptionRelationId, MySubscription->oid,
+ lockid, am_leader_apply_worker() ?
+ AccessExclusiveLock:
+ AccessShareLock);

This appears odd to me because this forecloses the option the parallel
apply worker can ever acquire this lock in exclusive mode. I think it
would be better to have lock_mode as one of the parameters in this
API.

5.
+ * Inintialize fileset if not yet and open the file.
+ */
+void
+serialize_stream_start(TransactionId xid, bool first_segment)

Typo. /Inintialize/Initialize

6.
parallel_apply_setup_dsm()
{
...
+ shared->xact_state = false;

xact_state should be set with one of the values of ParallelTransState.

7.
/*
+ * Don't use SharedFileSet here because the fileset is shared by the leader
+ * worker and the fileset in leader need to survive after releasing the
+ * shared memory

This comment seems a bit unclear to me. Should there be and between
leader worker? If so, then the following 'and' won't make sense.

8.
+apply_handle_stream_stop(StringInfo s)
{
...
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If there is no message left, wait for the leader to release the
+ * lock and send more messages.
+ */
+ if (pg_atomic_sub_fetch_u32(&(MyParallelShared->left_message), 1) == 0)
+ parallel_apply_lock(MyParallelShared->stream_lock_id);

As per Sawada-San's email [1], this lock should be released
immediately after we acquire it. If we do so, then we don't need to
unlock separately in apply_handle_stream_start() in the below code and
at similar places in stream_prepare, stream_commit, and stream_abort.
Is there a reason for doing it differently?

apply_handle_stream_start(StringInfo s)
{
...
+ case TRANS_PARALLEL_APPLY:
...
+ /*
+ * Unlock the shared object lock so that the leader apply worker
+ * can continue to send changes.
+ */
+ parallel_apply_unlock(MyParallelShared->stream_lock_id);

9.
+parallel_apply_spooled_messages(void)
{
...
+ if (fileset_valid)
+ {
+ in_streamed_transaction = false;
+
+ parallel_apply_lock(MyParallelShared->transaction_lock_id);

Is there a reason to acquire this lock here if the parallel apply
worker will acquire it at stream_start?

10.
+ winfo->shared->stream_lock_id = parallel_apply_get_unique_id();
+ winfo->shared->transaction_lock_id = parallel_apply_get_unique_id();

Why can't we use xid (remote_xid) for one of these and local_xid (one
generated by parallel apply) for the other? I was a bit worried about
the local_xid because it will be generated only after applying the
first message but the patch already seems to be waiting for it in
parallel_apply_wait_for_xact_finish as seen in the below code.

+void
+parallel_apply_wait_for_xact_finish(ParallelApplyWorkerShared *wshared)
+{
+ /*
+ * Wait until the parallel apply worker handles the first message and
+ * set the flag to true.
+ */
+ parallel_apply_wait_for_in_xact(wshared, PARALLEL_TRANS_STARTED);
+
+ /* Wait for the transaction lock to be released. */
+ parallel_apply_lock(wshared->transaction_lock_id);

[1] - https://www.postgresql.org/message-id/CAD21AoCWovvhGBD2uKcQqbk6px6apswuBrs6dR9%2BWhP1j2LdsQ%40mail.gmail.com

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message sirisha chamarthi 2022-11-04 08:10:39 Reviving lost replication slots
Previous Message Hayato Kuroda (Fujitsu) 2022-11-04 07:45:18 RE: Perform streaming logical transactions by background workers and parallel apply