RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-15 03:28:25
Message-ID: OS0PR01MB57169DAA9A2A6E68EE5E05F094E19@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wednesday, December 14, 2022 2:49 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:

>
> On Wed, Dec 14, 2022 at 9:50 AM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > On Tuesday, December 13, 2022 11:25 PM Masahiko Sawada
> <sawada(dot)mshk(at)gmail(dot)com> wrote:
> > >
> > > Here are comments on v59 0001, 0002 patches:
> >
> > Thanks for the comments!
> >
> > > +void
> > > +pa_increment_stream_block(ParallelApplyWorkerShared *wshared) {
> > > + while (1)
> > > + {
> > > + SpinLockAcquire(&wshared->mutex);
> > > +
> > > + /*
> > > + * Don't try to increment the count if the parallel
> > > apply worker is
> > > + * taking the stream lock. Otherwise, there would
> > > + be
> > > a race condition
> > > + * that the parallel apply worker checks there is
> > > + no
> > > pending streaming
> > > + * block and before it actually starts waiting on a
> > > lock, the leader
> > > + * sends another streaming block and take the
> > > + stream
> > > lock again. In
> > > + * this case, the parallel apply worker will start
> > > waiting for the next
> > > + * streaming block whereas there is actually a
> > > pending streaming block
> > > + * available.
> > > + */
> > > + if (!wshared->pa_wait_for_stream)
> > > + {
> > > + wshared->pending_stream_count++;
> > > + SpinLockRelease(&wshared->mutex);
> > > + break;
> > > + }
> > > +
> > > + SpinLockRelease(&wshared->mutex);
> > > + }
> > > +}
> > >
> > > I think we should add an assertion to check if we don't hold the stream lock.
> > >
> > > I think that waiting for pa_wait_for_stream to be false in a busy
> > > loop is not a good idea. It's not interruptible and there is not
> > > guarantee that we can break from this loop in a short time. For
> > > instance, if PA executes
> > > pa_decr_and_wait_stream_block() a bit earlier than LA executes
> > > pa_increment_stream_block(), LA has to wait for PA to acquire and
> > > release the stream lock in a busy loop. It should not be long in
> > > normal cases but the duration LA needs to wait for PA depends on PA,
> > > which could be long. Also what if PA raises an error in
> > > pa_lock_stream() due to some reasons? I think LA won't be able to
> > > detect the failure.
> > >
> > > I think we should at least make it interruptible and maybe need to
> > > add some sleep. Or perhaps we can use the condition variable for this case.
> >
>
> Or we can leave this while (true) logic altogether for the first version and have a
> comment to explain this race. Anyway, after restarting, it will probably be
> solved. We can always change this part of the code later if this really turns out
> to be problematic.

Agreed, and reverted this part.

>
> > Thanks for the analysis, I will research this part.
> >
> > > ---
> > > In worker.c, we have the following common pattern:
> > >
> > > case TRANS_LEADER_PARTIAL_SERIALIZE:
> > > write change to the file;
> > > do some work;
> > > break;
> > >
> > > case TRANS_LEADER_SEND_TO_PARALLEL:
> > > pa_send_data();
> > >
> > > if (winfo->serialize_changes)
> > > {
> > > do some worker required after writing changes to the file.
> > > }
> > > :
> > > break;
> > >
> > > IIUC there are two different paths for partial serialization: (a)
> > > where apply_action is TRANS_LEADER_PARTIAL_SERIALIZE, and (b) where
> > > apply_action is TRANS_LEADER_PARTIAL_SERIALIZE and
> > > winfo->serialize_changes became true. And we need to match what we
> > > winfo->do
> > > in (a) and (b). Rather than having two different paths for the same
> > > case, how about falling through TRANS_LEADER_PARTIAL_SERIALIZE when
> > > we could not send the changes? That is, pa_send_data() just returns
> > > false when the timeout exceeds and we need to switch to serialize
> > > changes, otherwise returns true. If it returns false, we prepare for
> > > switching to serialize changes such as initializing fileset, and
> > > fall through TRANS_LEADER_PARTIAL_SERIALIZE case. The code would be
> like:
> > >
> > > case TRANS_LEADER_SEND_TO_PARALLEL:
> > > ret = pa_send_data();
> > >
> > > if (ret)
> > > {
> > > do work for sending changes to PA.
> > > break;
> > > }
> > >
> > > /* prepare for switching to serialize changes */
> > > winfo->serialize_changes = true;
> > > initialize fileset;
> > > acquire stream lock if necessary;
> > >
> > > /* FALLTHROUGH */
> > > case TRANS_LEADER_PARTIAL_SERIALIZE:
> > > do work for serializing changes;
> > > break;
> >
> > I think that the suggestion is to extract the code that switch to
> > serialize mode out of the pa_send_data(), and then we need to add that
> > logic in all the functions which call pa_send_data(), I am not sure if
> > it looks better as it might introduce some more codes in each handling
> function.
> >
>
> How about extracting the common code from apply_handle_stream_commit
> and apply_handle_stream_prepare to a separate function say
> pa_xact_finish_common()? I see there is a lot of common code (unlock the
> stream, wait for the finish, store flush location, free worker
> info) in both the functions for TRANS_LEADER_PARTIAL_SERIALIZE and
> TRANS_LEADER_SEND_TO_PARALLEL cases.

Agreed, changed. I also addressed Sawada-san comment by extracting the
code that switch to serialize out of pa_send_data().

> >
> > > ---
> > > void
> > > pa_lock_stream(TransactionId xid, LOCKMODE lockmode) {
> > > LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
> > > PARALLEL_APPLY_LOCK_STREAM,
> > > lockmode); }
> > >
> > > I think since we don't need to let the caller to specify the lock
> > > mode but need only shared and exclusive modes, we can make it simple
> > > by having a boolean argument say shared instead of lockmode.
> >
> > I personally think passing the lockmode would make the code more clear
> > than passing a Boolean value.
> >
>
> +1.
>
> I have made a few changes in the newly added comments and function name in
> the attached patch. Kindly include this if you find the changes okay.

Thanks, I have checked and merged it.

Attach the new version patch set which addressed all comments so far.

Best regards,
Hou zj

Attachment Content-Type Size
v61-0007-Add-a-main_worker_pid-to-pg_stat_subscription.patch application/octet-stream 8.7 KB
v61-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 193.0 KB
v61-0002-Serialize-partial-changes-to-a-file-when-the-att.patch application/octet-stream 43.3 KB
v61-0003-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 80.1 KB
v61-0004-Allow-streaming-every-change-without-waiting-til.patch application/octet-stream 5.3 KB
v61-0005-Add-GUC-stream_serialize_threshold-and-test-seri.patch application/octet-stream 14.0 KB
v61-0006-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 22.6 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2022-12-15 03:33:05 Re: Error-safe user functions
Previous Message Thomas Munro 2022-12-15 02:22:59 check_strxfrm_bug()