Re: Perform streaming logical transactions by background workers and parallel apply

From: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-27 05:57:55
Message-ID: CAD21AoD=0nnMdJiNZhMcNaEBt4+qaCx05usLBx+Xy89P6ZxrWg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Dec 26, 2022 at 10:29 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Mon, Dec 26, 2022 at 6:33 PM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
> >
> > ---
> > + if (!pa_can_start(xid))
> > + return;
> > +
> > + /* First time through, initialize parallel apply worker state
> > hashtable. */
> > + if (!ParallelApplyTxnHash)
> > + {
> > + HASHCTL ctl;
> > +
> > + MemSet(&ctl, 0, sizeof(ctl));
> > + ctl.keysize = sizeof(TransactionId);
> > + ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
> > + ctl.hcxt = ApplyContext;
> > +
> > + ParallelApplyTxnHash = hash_create("logical
> > replication parallel apply workershash",
> > +
> > 16, &ctl,
> > +
> > HASH_ELEM |HASH_BLOBS | HASH_CONTEXT);
> > + }
> > +
> > + /*
> > + * It's necessary to reread the subscription information
> > before assigning
> > + * the transaction to a parallel apply worker. Otherwise, the
> > leader may
> > + * not be able to reread the subscription information if streaming
> > + * transactions keep coming and are handled by parallel apply workers.
> > + */
> > + maybe_reread_subscription();
> >
> > pa_can_start() checks if the skiplsn is an invalid xid or not, and
> > then maybe_reread_subscription() could update the skiplsn to a valid
> > value. As the comments in pa_can_start() says, it won't work. I think
> > we should call maybe_reread_subscription() in
> > apply_handle_stream_start() before calling pa_allocate_worker().
> >
>
> But I think a similar thing can happen when we start the worker and
> then before the transaction ends, we do maybe_reread_subscription().

Where do we do maybe_reread_subscription() in this case? IIUC if the
leader sends all changes to the worker, there is no chance for the
leader to do maybe_reread_subscription except for when waiting for the
input. On reflection, adding maybe_reread_subscription() to
apply_handle_stream_start() adds one extra call of it so it's not
good. Alternatively, we can do that in pa_can_start() before checking
the skiplsn. I think we do a similar thing in AllTablesyncsRead() --
update the information before the check if necessary.

> I think we should try to call maybe_reread_subscription() when we are
> reasonably sure that we are going to enter parallel mode, otherwise,
> anyway, it will be later called by the leader worker.

It isn't a big problem even if we update the skiplsn after launching a
worker since we will skip the transaction the next time. But it would
be more consistent with the current behavior. As I mentioned above,
doing it in pa_can_start() seems to be reasonable to me. What do you
think?

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Dilip Kumar 2022-12-27 06:12:01 Re: Time delayed LR (WAS Re: logical replication restrictions)
Previous Message Pavel Stehule 2022-12-27 05:55:30 build gcc warning