RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-22 02:00:05
Message-ID: OS0PR01MB5716997A7115715F9E4EE520940D9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Monday, November 21, 2022 8:34 PMhouzj(dot)fnst(at)fujitsu(dot)com <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Saturday, November 19, 2022 6:49 PM Amit Kapila
> <amit(dot)kapila16(at)gmail(dot)com> wrote:
> >
> > On Fri, Nov 18, 2022 at 7:56 AM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> wrote:
> > >
> > > On Wed, Nov 16, 2022 at 1:50 PM houzj(dot)fnst(at)fujitsu(dot)com
> > > <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> > > >
> > > > On Tuesday, November 15, 2022 7:58 PM houzj(dot)fnst(at)fujitsu(dot)com
> > <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> > > >
> > > > I noticed that I didn't add CHECK_FOR_INTERRUPTS while retrying
> > > > send
> > message.
> > > > So, attach the new version which adds that. Also attach the 0004
> > > > patch that restarts logical replication with temporarily disabling
> > > > the parallel apply if failed to apply a transaction in parallel apply worker.
> > > >
> > >
> > > Few comments on v48-0001
>
> Thanks for the comments !
>
> > > ======================
> > >
> >
> > I have made quite a few changes in the comments, added some new
> > comments, and made other cosmetic changes in the attached patch. The is
> atop v48-0001*.
> > If these look okay to you, please include them in the next version.
> > Apart from these, I have a few more comments on
> > v48-0001*
>
> Thanks, I have checked and merge them.
>
> > 1.
> > +static bool
> > +pa_can_start(TransactionId xid)
> > +{
> > + if (!TransactionIdIsValid(xid))
> > + return false;
> >
> > The caller (see caller of pa_start_worker) already has a check that
> > xid passed here is valid, so I think this should be an Assert unless I
> > am missing something in which case it is better to add a comment here.
>
> Changed to an Assert().
>
> > 2. Will it be better to rename pa_start_worker() as
> > pa_allocate_worker() because it sometimes gets the worker from the
> > pool and also allocate the hash entry for worker info? That will even
> > match the corresponding pa_free_worker().
>
> Agreed and changed.
>
> > 3.
> > +pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
> > {
> > ...
> > +
> > + oldctx = MemoryContextSwitchTo(ApplyContext);
> > + subxactlist = lappend_xid(subxactlist, current_xid);
> > + MemoryContextSwitchTo(oldctx);
> > ...
> >
> > Why do we need to allocate this list in a permanent context? IIUC, we
> > need to use this to maintain subxacts so that it can be later used to
> > find the given subxact at the time of rollback to savepoint in the
> > current in-progress transaction, so why do we need it beyond the
> > transaction being applied? If there is a reason for the same, it would
> > be better to add some comments for the same.
>
> I think you are right, I changed to use TopTransactionContext here.
>
> > 4.
> > +pa_stream_abort(LogicalRepStreamAbortData *abort_data)
> > {
> > ...
> > +
> > + for (i = list_length(subxactlist) - 1; i >= 0; i--) { TransactionId
> > + xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
> > +
> > + if (xid_tmp == subxid)
> > + {
> > + found = true;
> > + break;
> > + }
> > + }
> > +
> > + if (found)
> > + {
> > + RollbackToSavepoint(spname);
> > + CommitTransactionCommand();
> > + subxactlist = list_truncate(subxactlist, i + 1); }
> >
> > I was thinking whether we can have an Assert(false) for the not found
> > case but it seems if all the changes of a subxact have been skipped
> > then probably subxid corresponding to "rollback to savepoint" won't be
> > found in subxactlist and we don't need to do anything for it. If that
> > is the case, then probably adding a comment for it would be a good
> > idea, otherwise, we can probably have
> > Assert(false) in the else case.
>
> Yes, we might not find the xid for an empty subtransaction. I added some
> comments here for the same.
>
> Apart from above, I also addressed the comments in [1] and fixed a bug that
> parallel worker exits silently while the leader cannot detect that. In the latest
> patch, the parallel apply worker will send a notify('X') message to leader so that
> leader can detect the exit.
>
> Here is the new version patch.

I noticed that I missed a header file causing CFbot to complain.
Attach a new version patch set which fix that.

Best regards,
Hou zj

Attachment Content-Type Size
v50-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 20.6 KB
v50-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 186.1 KB
v50-0002-Serialize-partial-changes-to-disk-if-the-shm_mq-.patch application/octet-stream 35.6 KB
v50-0003-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 80.1 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Thomas Munro 2022-11-22 02:16:05 Re: wake up logical workers after ALTER SUBSCRIPTION
Previous Message Andres Freund 2022-11-22 01:36:59 Re: missing indexes in indexlist with partitioned tables