Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Melih Mutlu <m(dot)melihmutlu(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Melanie Plageman <melanieplageman(at)gmail(dot)com>, "Wei Wang (Fujitsu)" <wangw(dot)fnst(at)fujitsu(dot)com>, "Yu Shi (Fujitsu)" <shiy(dot)fnst(at)fujitsu(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, shveta malik <shveta(dot)malik(at)gmail(dot)com>
Subject: Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Date: 2023-07-21 07:24:21
Message-ID: CAA4eK1LLuPdB4jSTsmk_TBJEQAXWneQk8f4fsRbrGRRo2txahQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Fri, Jul 21, 2023 at 12:05 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> On Fri, Jul 21, 2023 at 3:39 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
> >
> > On Fri, Jul 21, 2023 at 7:30 AM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> > >
> > > ~~~
> > >
> > > 2. StartLogRepWorker
> > >
> > > /* Common function to start the leader apply or tablesync worker. */
> > > void
> > > StartLogRepWorker(int worker_slot)
> > > {
> > > /* Attach to slot */
> > > logicalrep_worker_attach(worker_slot);
> > >
> > > /* Setup signal handling */
> > > pqsignal(SIGHUP, SignalHandlerForConfigReload);
> > > pqsignal(SIGTERM, die);
> > > BackgroundWorkerUnblockSignals();
> > >
> > > /*
> > > * We don't currently need any ResourceOwner in a walreceiver process, but
> > > * if we did, we could call CreateAuxProcessResourceOwner here.
> > > */
> > >
> > > /* Initialise stats to a sanish value */
> > > MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
> > > MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
> > >
> > > /* Load the libpq-specific functions */
> > > load_file("libpqwalreceiver", false);
> > >
> > > InitializeLogRepWorker();
> > >
> > > /* Connect to the origin and start the replication. */
> > > elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
> > > MySubscription->conninfo);
> > >
> > > /*
> > > * Setup callback for syscache so that we know when something changes in
> > > * the subscription relation state.
> > > */
> > > CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
> > > invalidate_syncing_table_states,
> > > (Datum) 0);
> > > }
> > >
> > > ~
> > >
> > > 2a.
> > > The function name seems a bit misleading because it is not really
> > > "starting" anything here - it is just more "initialization" code,
> > > right? Nor is it common to all kinds of LogRepWorker. Maybe the
> > > function could be named something else like 'InitApplyOrSyncWorker()'.
> > > -- see also #2c
> > >
> >
> > How about SetupLogRepWorker?
>
> The name is better than StartXXX, but still, SetupXXX seems a synonym
> of InitXXX. That is why I thought it is a bit awkward having 2
> functions with effectively the same name and the same
> initialization/setup purpose (the only difference is one function
> excludes parallel workers, and the other function is common to all
> workers).
>

I can't know of a better way. We can probably name it as
SetupApplyOrSyncWorker or something like that if you find that better.

> > The other thing I noticed is that we
> > don't seem to be consistent in naming functions in these files. For
> > example, shall we make all exposed functions follow camel case (like
> > InitializeLogRepWorker) and static functions follow _ style (like
> > run_apply_worker) or the other possibility is to use _ style for all
> > functions except may be the entry functions like ApplyWorkerMain()? I
> > don't know if there is already a pattern but if not then let's form it
> > now, so that code looks consistent.
> >
>
> +1 for using some consistent rule, but I think this may result in
> *many* changes, so it would be safer to itemize all the changes first,
> just to make sure everybody is OK with it first before updating
> everything.
>

Fair enough. We can do that as a first patch and then work on the
refactoring patch to avoid introducing more inconsistencies or we can
do the refactoring patch first but keep all the new function names to
follow _ style.

Apart from this, few more comments on 0001:
1.
+run_apply_worker(WalRcvStreamOptions *options,
+ char *slotname,
+ char *originname,
+ int originname_size,
+ XLogRecPtr *origin_startpos)

The caller neither uses nor passes the value of origin_startpos. So,
isn't it better to make origin_startpos local to run_apply_worker()?
It seems the same is true for some of the other parameters slotname,
originname, originname_size. Is there a reason to keep these as
arguments in this function?

2.
+static void
+run_tablesync_worker(WalRcvStreamOptions *options,
+ char *slotname,
+ char *originname,
+ int originname_size,
+ XLogRecPtr *origin_startpos)

The comments in the previous point seem to apply to this as well.

3.
+ set_stream_options(options, slotname, origin_startpos);
+
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())

This last check is done in set_stream_options() and here as well. I
don't see any reason to give different answers at both places but
before the patch, we were not relying on any such assumption that this
check will always give the same answer considering the answer could be
different due to AllTablesyncsReady(). Can we move this check outside
set_stream_options()?

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Hayato Kuroda (Fujitsu) 2023-07-21 07:30:14 RE: [PoC] pg_upgrade: allow to upgrade publisher node
Previous Message Bharath Rupireddy 2023-07-21 07:08:07 Re: Switching XLog source from archive to streaming when primary available