Re: Perform streaming logical transactions by background workers and parallel apply

From: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-16 07:08:23
Message-ID: CAD21AoA-i_Gr1Quo2JN56ORafL=phqD=5hUvbA40EHMRZjArQA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Dec 15, 2022 at 12:28 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Wednesday, December 14, 2022 2:49 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> >
> > On Wed, Dec 14, 2022 at 9:50 AM houzj(dot)fnst(at)fujitsu(dot)com
> > <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> > >
> > > On Tuesday, December 13, 2022 11:25 PM Masahiko Sawada
> > <sawada(dot)mshk(at)gmail(dot)com> wrote:
> > > >
> > > > Here are comments on v59 0001, 0002 patches:
> > >
> > > Thanks for the comments!
> > >
> > > > +void
> > > > +pa_increment_stream_block(ParallelApplyWorkerShared *wshared) {
> > > > + while (1)
> > > > + {
> > > > + SpinLockAcquire(&wshared->mutex);
> > > > +
> > > > + /*
> > > > + * Don't try to increment the count if the parallel
> > > > apply worker is
> > > > + * taking the stream lock. Otherwise, there would
> > > > + be
> > > > a race condition
> > > > + * that the parallel apply worker checks there is
> > > > + no
> > > > pending streaming
> > > > + * block and before it actually starts waiting on a
> > > > lock, the leader
> > > > + * sends another streaming block and take the
> > > > + stream
> > > > lock again. In
> > > > + * this case, the parallel apply worker will start
> > > > waiting for the next
> > > > + * streaming block whereas there is actually a
> > > > pending streaming block
> > > > + * available.
> > > > + */
> > > > + if (!wshared->pa_wait_for_stream)
> > > > + {
> > > > + wshared->pending_stream_count++;
> > > > + SpinLockRelease(&wshared->mutex);
> > > > + break;
> > > > + }
> > > > +
> > > > + SpinLockRelease(&wshared->mutex);
> > > > + }
> > > > +}
> > > >
> > > > I think we should add an assertion to check if we don't hold the stream lock.
> > > >
> > > > I think that waiting for pa_wait_for_stream to be false in a busy
> > > > loop is not a good idea. It's not interruptible and there is not
> > > > guarantee that we can break from this loop in a short time. For
> > > > instance, if PA executes
> > > > pa_decr_and_wait_stream_block() a bit earlier than LA executes
> > > > pa_increment_stream_block(), LA has to wait for PA to acquire and
> > > > release the stream lock in a busy loop. It should not be long in
> > > > normal cases but the duration LA needs to wait for PA depends on PA,
> > > > which could be long. Also what if PA raises an error in
> > > > pa_lock_stream() due to some reasons? I think LA won't be able to
> > > > detect the failure.
> > > >
> > > > I think we should at least make it interruptible and maybe need to
> > > > add some sleep. Or perhaps we can use the condition variable for this case.
> > >
> >
> > Or we can leave this while (true) logic altogether for the first version and have a
> > comment to explain this race. Anyway, after restarting, it will probably be
> > solved. We can always change this part of the code later if this really turns out
> > to be problematic.
>
> Agreed, and reverted this part.
>
> >
> > > Thanks for the analysis, I will research this part.
> > >
> > > > ---
> > > > In worker.c, we have the following common pattern:
> > > >
> > > > case TRANS_LEADER_PARTIAL_SERIALIZE:
> > > > write change to the file;
> > > > do some work;
> > > > break;
> > > >
> > > > case TRANS_LEADER_SEND_TO_PARALLEL:
> > > > pa_send_data();
> > > >
> > > > if (winfo->serialize_changes)
> > > > {
> > > > do some worker required after writing changes to the file.
> > > > }
> > > > :
> > > > break;
> > > >
> > > > IIUC there are two different paths for partial serialization: (a)
> > > > where apply_action is TRANS_LEADER_PARTIAL_SERIALIZE, and (b) where
> > > > apply_action is TRANS_LEADER_PARTIAL_SERIALIZE and
> > > > winfo->serialize_changes became true. And we need to match what we
> > > > winfo->do
> > > > in (a) and (b). Rather than having two different paths for the same
> > > > case, how about falling through TRANS_LEADER_PARTIAL_SERIALIZE when
> > > > we could not send the changes? That is, pa_send_data() just returns
> > > > false when the timeout exceeds and we need to switch to serialize
> > > > changes, otherwise returns true. If it returns false, we prepare for
> > > > switching to serialize changes such as initializing fileset, and
> > > > fall through TRANS_LEADER_PARTIAL_SERIALIZE case. The code would be
> > like:
> > > >
> > > > case TRANS_LEADER_SEND_TO_PARALLEL:
> > > > ret = pa_send_data();
> > > >
> > > > if (ret)
> > > > {
> > > > do work for sending changes to PA.
> > > > break;
> > > > }
> > > >
> > > > /* prepare for switching to serialize changes */
> > > > winfo->serialize_changes = true;
> > > > initialize fileset;
> > > > acquire stream lock if necessary;
> > > >
> > > > /* FALLTHROUGH */
> > > > case TRANS_LEADER_PARTIAL_SERIALIZE:
> > > > do work for serializing changes;
> > > > break;
> > >
> > > I think that the suggestion is to extract the code that switch to
> > > serialize mode out of the pa_send_data(), and then we need to add that
> > > logic in all the functions which call pa_send_data(), I am not sure if
> > > it looks better as it might introduce some more codes in each handling
> > function.
> > >
> >
> > How about extracting the common code from apply_handle_stream_commit
> > and apply_handle_stream_prepare to a separate function say
> > pa_xact_finish_common()? I see there is a lot of common code (unlock the
> > stream, wait for the finish, store flush location, free worker
> > info) in both the functions for TRANS_LEADER_PARTIAL_SERIALIZE and
> > TRANS_LEADER_SEND_TO_PARALLEL cases.
>
> Agreed, changed. I also addressed Sawada-san comment by extracting the
> code that switch to serialize out of pa_send_data().
>
> > >
> > > > ---
> > > > void
> > > > pa_lock_stream(TransactionId xid, LOCKMODE lockmode) {
> > > > LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
> > > > PARALLEL_APPLY_LOCK_STREAM,
> > > > lockmode); }
> > > >
> > > > I think since we don't need to let the caller to specify the lock
> > > > mode but need only shared and exclusive modes, we can make it simple
> > > > by having a boolean argument say shared instead of lockmode.
> > >
> > > I personally think passing the lockmode would make the code more clear
> > > than passing a Boolean value.
> > >
> >
> > +1.
> >
> > I have made a few changes in the newly added comments and function name in
> > the attached patch. Kindly include this if you find the changes okay.
>
> Thanks, I have checked and merged it.
>
> Attach the new version patch set which addressed all comments so far.

Thank you for updating the patches! Here are some minor comments:

@@ -100,7 +100,6 @@ static void check_duplicates_in_publist(List
*publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist,
bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char
*slotname, char *err);

-
/*
* Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
*

Unnecessary line removal.

---
+ * Swtich to PARTIAL_SERIALIZE mode for the current transaction -- this means

typo

s/Swtich/Switch/

---
+pa_has_spooled_message_pending()
+{
+ PartialFileSetState fileset_state;
+
+ fileset_state = pa_get_fileset_state();
+
+ if (fileset_state != FS_UNKNOWN)
+ return true;
+ else
+ return false;
+}

I think we can simply do:

return (fileset_state != FS_UNKNOWN);

Or do we need this function in the first place? I think we can do in
LogicalParallelApplyLoop() like:

else if (shmq_res == SHM_MQ_WOULD_BLOCK)
{
/* Check if changes have been serialized to a file. */
if (pa_get_fileset_state != FS_UNKNOWN)
{
pa_spooled_messages();
}

Also, I think the name FS_UNKNOWN doesn't mean anything. It sounds
rather we don't expect this state but it's not true. How about
FS_INITIAL or FS_EMPTY? It sounds more understandable.

---
+/*
+ * Wait until the parallel apply worker's transaction finishes.
+ */
+void
+pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)

I think we no longer need to expose pa_wait_for_exact_finish().

---
+ active_workers = list_copy(ParallelApplyWorkerPool);
+
+ foreach(lc, active_workers)
+ {
+ int slot_no;
+ uint16 generation;
+ ParallelApplyWorkerInfo *winfo =
(ParallelApplyWorkerInfo *) lfirst(lc);
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ napplyworkers =
logicalrep_pa_worker_count(MyLogicalRepWorker->subid);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ if (napplyworkers <=
max_parallel_apply_workers_per_subscription / 2)
+ return;
+

Calling logicalrep_pa_worker_count() with lwlock for each worker seems
not efficient to me. I think we can get the number of workers once at
the top of this function and return if it's already lower than the
maximum pool size. Otherwise, we attempt to stop extra workers.

---
+bool
+pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid)
+{

Is there any reason why this function has the XID as a separate
argument? It seems to me that since we always call this function with
'winfo' and 'winfo->shared->xid', we can remove xid from the function
argument.

---
+ /* Initialize shared memory area. */
+ SpinLockAcquire(&winfo->shared->mutex);
+ winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
+ winfo->shared->xid = xid;
+ SpinLockRelease(&winfo->shared->mutex);

It's practically no problem but is there any reason why some fields of
ParallelApplyWorkerInfo are initialized in pa_setup_dsm() whereas some
fields are done here?

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Masahiko Sawada 2022-12-16 07:23:13 Re: plpgsq_plugin's stmt_end() is not called when an error is caught
Previous Message Andrey Lepikhov 2022-12-16 06:45:30 Re: Removing unneeded self joins