Re: Perform streaming logical transactions by background workers and parallel apply

From: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, shveta malik <shveta(dot)malik(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2023-01-17 06:46:14
Message-ID: CAD21AoBFHxiw3w0FvWAAuBR+_muWwrXs6Ns1uVH-C+aEBx2Mig@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tue, Jan 17, 2023 at 12:37 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Tuesday, January 17, 2023 11:32 AM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> >
> > On Tue, Jan 17, 2023 at 1:21 PM houzj(dot)fnst(at)fujitsu(dot)com
> > <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> > >
> > > On Tuesday, January 17, 2023 5:43 AM Peter Smith
> > <smithpb2250(at)gmail(dot)com> wrote:
> > > >
> > > > On Mon, Jan 16, 2023 at 5:41 PM Amit Kapila
> > > > <amit(dot)kapila16(at)gmail(dot)com>
> > > > wrote:
> > > > >
> > > > > On Mon, Jan 16, 2023 at 10:24 AM Peter Smith
> > > > > <smithpb2250(at)gmail(dot)com>
> > > > wrote:
> > > > > >
> > > > > > 2.
> > > > > >
> > > > > > /*
> > > > > > + * Return the pid of the leader apply worker if the given pid
> > > > > > +is the pid of a
> > > > > > + * parallel apply worker, otherwise return InvalidPid.
> > > > > > + */
> > > > > > +pid_t
> > > > > > +GetLeaderApplyWorkerPid(pid_t pid) { int leader_pid =
> > > > > > +InvalidPid; int i;
> > > > > > +
> > > > > > + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> > > > > > +
> > > > > > + for (i = 0; i < max_logical_replication_workers; i++) {
> > > > > > + LogicalRepWorker *w = &LogicalRepCtx->workers[i];
> > > > > > +
> > > > > > + if (isParallelApplyWorker(w) && w->proc && pid ==
> > > > > > + w->proc->pid) { leader_pid = w->leader_pid; break; } }
> > > > > > +
> > > > > > + LWLockRelease(LogicalRepWorkerLock);
> > > > > > +
> > > > > > + return leader_pid;
> > > > > > +}
> > > > > >
> > > > > > 2a.
> > > > > > IIUC the IsParallelApplyWorker macro does nothing except check
> > > > > > that the leader_pid is not InvalidPid anyway, so AFAIK this
> > > > > > algorithm does not benefit from using this macro because we will
> > > > > > want to return InvalidPid anyway if the given pid matches.
> > > > > >
> > > > > > So the inner condition can just say:
> > > > > >
> > > > > > if (w->proc && w->proc->pid == pid) { leader_pid =
> > > > > > w->leader_pid; break; }
> > > > > >
> > > > >
> > > > > Yeah, this should also work but I feel the current one is explicit
> > > > > and more clear.
> > > >
> > > > OK.
> > > >
> > > > But, I have one last comment about this function -- I saw there are
> > > > already other functions that iterate max_logical_replication_workers
> > > > like this looking for things:
> > > > - logicalrep_worker_find
> > > > - logicalrep_workers_find
> > > > - logicalrep_worker_launch
> > > > - logicalrep_sync_worker_count
> > > >
> > > > So I felt this new function (currently called
> > > > GetLeaderApplyWorkerPid) ought to be named similarly to those ones.
> > > > e.g. call it something like "logicalrep_worker_find_pa_leader_pid".
> > > >
> > >
> > > I am not sure we can use the name, because currently all the API name
> > > in launcher that used by other module(not related to subscription) are
> > > like AxxBxx style(see the functions in logicallauncher.h).
> > > logicalrep_worker_xxx style functions are currently only declared in
> > > worker_internal.h.
> > >
> >
> > OK. I didn't know there was another header convention that you were following.
> > In that case, it is fine to leave the name as-is.
>
> Thanks for confirming!
>
> Attach the new version 0001 patch which addressed all other comments.
>

Thank you for updating the patch. Here is one comment:

@@ -426,14 +427,24 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)

/*
* Show the leader only for active
parallel workers. This
- * leaves the field as NULL for the
leader of a parallel
- * group.
+ * leaves the field as NULL for the
leader of a parallel group
+ * or the leader of parallel apply workers.
*/
if (leader && leader->pid !=
beentry->st_procpid)
{
values[28] = Int32GetDatum(leader->pid);
nulls[28] = false;
}
+ else
+ {
+ int
leader_pid = GetLeaderApplyWorkerPid(beentry->st_procpid);
+
+ if (leader_pid != InvalidPid)
+ {
+ values[28] =
Int32GetDatum(leader_pid);
+ nulls[28] = false;
+ }
+ }
}

I'm slightly concerned that there could be overhead of executing
GetLeaderApplyWorkerPid () for every backend process except for
parallel query workers. The number of such backends could be large and
GetLeaderApplyWorkerPid() acquires the lwlock. For example, does it
make sense to check (st_backendType == B_BG_WORKER) before calling
GetLeaderApplyWorkerPid()? Or it might not be a problem since it's
LogicalRepWorkerLock which is not likely to be contended.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2023-01-17 07:01:29 Re: Make use of assign_checkpoint_completion_target() to calculate CheckPointSegments correctly
Previous Message Richard Guo 2023-01-17 06:29:56 Re: Add proper planner support for ORDER BY / DISTINCT aggregates