RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-16 09:17:37
Message-ID: OS0PR01MB5716E94C5E503CC53177A22994E69@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Friday, December 16, 2022 3:08 PM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
>
>
>Here are some minor comments:

Thanks for the comments!

> ---
> +pa_has_spooled_message_pending()
> +{
> + PartialFileSetState fileset_state;
> +
> + fileset_state = pa_get_fileset_state();
> +
> + if (fileset_state != FS_UNKNOWN)
> + return true;
> + else
> + return false;
> +}
>
> I think we can simply do:
>
> return (fileset_state != FS_UNKNOWN);

Will change.

>
> Or do we need this function in the first place? I think we can do in
> LogicalParallelApplyLoop() like:

I was intended to not expose the file state in the main loop, so maybe better
to keep this function.

> ---
> + active_workers = list_copy(ParallelApplyWorkerPool);
> +
> + foreach(lc, active_workers)
> + {
> + int slot_no;
> + uint16 generation;
> + ParallelApplyWorkerInfo *winfo =
> (ParallelApplyWorkerInfo *) lfirst(lc);
> +
> + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> + napplyworkers =
> logicalrep_pa_worker_count(MyLogicalRepWorker->subid);
> + LWLockRelease(LogicalRepWorkerLock);
> +
> + if (napplyworkers <=
> max_parallel_apply_workers_per_subscription / 2)
> + return;
> +
>
> Calling logicalrep_pa_worker_count() with lwlock for each worker seems
> not efficient to me. I think we can get the number of workers once at
> the top of this function and return if it's already lower than the
> maximum pool size. Otherwise, we attempt to stop extra workers.

How about we directly check the length of worker pool list here which
seems simpler and don't need to lock ?

> ---
> +bool
> +pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid)
> +{
>
>
> Is there any reason why this function has the XID as a separate
> argument? It seems to me that since we always call this function with
> 'winfo' and 'winfo->shared->xid', we can remove xid from the function
> argument.
>
> ---
> + /* Initialize shared memory area. */
> + SpinLockAcquire(&winfo->shared->mutex);
> + winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
> + winfo->shared->xid = xid;
> + SpinLockRelease(&winfo->shared->mutex);
>
> It's practically no problem but is there any reason why some fields of
> ParallelApplyWorkerInfo are initialized in pa_setup_dsm() whereas some
> fields are done here?

We could be using old worker in the pool here in which case we need to update
these fields with the new streaming transaction information.

I will address other comments except above ones which are being discussed.

Best regards,
Hou zj

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Drouvot, Bertrand 2022-12-16 10:33:50 Re: Minimal logical decoding on standbys
Previous Message Kyotaro Horiguchi 2022-12-16 08:43:43 Re: Inconsistency in reporting checkpointer stats