RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: "kuroda(dot)hayato(at)fujitsu(dot)com" <kuroda(dot)hayato(at)fujitsu(dot)com>
Cc: 'Amit Kapila' <amit(dot)kapila16(at)gmail(dot)com>, 'Dilip Kumar' <dilipbalaut(at)gmail(dot)com>, 'Masahiko Sawada' <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, 'Peter Smith' <smithpb2250(at)gmail(dot)com>, 'PostgreSQL Hackers' <pgsql-hackers(at)lists(dot)postgresql(dot)org>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-10-07 06:15:06
Message-ID: OS0PR01MB5716D01B2A29D9F7749B5AA5945F9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thursday, October 6, 2022 8:40 PM Kuroda, Hayato/黒田 隼人 <kuroda(dot)hayato(at)fujitsu(dot)com> wrote:
>
> Dear Hou,
>
> I put comments for v35-0001.

Thanks for the comments.

> 01. catalog.sgml
>
> ```
> + Controls how to handle the streaming of in-progress transactions:
> + <literal>f</literal> = disallow streaming of in-progress transactions,
> + <literal>t</literal> = spill the changes of in-progress transactions to
> + disk and apply at once after the transaction is committed on the
> + publisher,
> + <literal>p</literal> = apply changes directly using a parallel apply
> + worker if available (same as 't' if no worker is available)
> ```
>
> I'm not sure why 't' means "spill the changes to file". Is it compatibility issue?

Yes, I think it would be better to be consistent with previous version.

> ~~~
> 02. applyworker.c - parallel_apply_stream_abort
>
> The argument abort_data is not modified in the function. Maybe "const"
> modifier should be added.
> (Other functions should be also checked...)

I am not sure is it necessary to add the const here as I didn’t
find many similar style codes.

> ~~~
> 03. applyparallelworker.c - parallel_apply_find_worker
>
> ```
> + ParallelApplyWorkerEntry *entry = NULL;
> ```
>
> This may not have to be initialized here.

Fixed.

> ~~~
> 04. applyparallelworker.c - HandleParallelApplyMessages
>
> ```
> + static MemoryContext hpm_context = NULL;
> ```
>
> I think "hpm" means "handle parallel message", so it should be "hpam".

Fixed.

> ~~~
> 05. launcher.c - logicalrep_worker_launch()
>
> ```
> if (is_subworker)
> snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication
> parallel worker");
> else
> snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication
> worker"); ```
>
> I'm not sure why there are only bgw_type even if there are three types of apply
> workers. Is it for compatibility?

Yeah, It's for compatibility.

> ~~~
> 06. launcher.c - logicalrep_worker_stop_by_slot
>
> An assertion like Assert(slot_no >=0 && slot_no <
> max_logical_replication_workers) should be added at the top of this function.
>

Fixed.

> ~~~
> 07. launcher.c - logicalrep_worker_stop_internal
>
> ```
> +/*
> + * Workhorse for logicalrep_worker_stop(), logicalrep_worker_detach()
> +and
> + * logicalrep_worker_stop_by_slot(). Stop the worker and wait for it to die.
> + */
> +static void
> +logicalrep_worker_stop_internal(LogicalRepWorker *worker)
> ```
>
> I think logicalrep_worker_stop_internal() may be not "Workhorse" for
> logicalrep_worker_detach(). In the function internal function is called for
> parallel apply worker, and it does not main part of the detach function.
>
> ~~~
> 08. worker.c - handle_streamed_transaction()
>
> ```
> + TransactionId current_xid = InvalidTransactionId;
> ```
>
> This initialization is not needed. This is not used in non-streaming mode,
> otherwise it is substituted before used.

Fixed.

> ~~~
> 09. worker.c - handle_streamed_transaction()
>
> ```
> + case TRANS_PARALLEL_APPLY:
> + /* Define a savepoint for a subxact if needed. */
> + parallel_apply_start_subtrans(current_xid, stream_xid);
> + return false;
> ```
>
> Based on other case-block, Assert(am_parallel_apply_worker()) may be added
> at the top of this part.
> This suggestion can be said for other swith-case statements.

I feel the apply_action is returned by the nearby
get_transaction_apply_action() function call which means it can only be in
parallel apply worker here. So, I am not sure if the assert is necessary or not.

> ~~~
> 10. worker.c - apply_handle_stream_start
>
> ```
> + *
> + * XXX We can avoid sending pair of the START/STOP messages to the
> + parallel
> + * worker because unlike apply worker it will process only one
> + * transaction-at-a-time. However, it is not clear whether that is
> + worth the
> + * effort because it is sent after logical_decoding_work_mem changes.
> ```
>
> I can understand that START message is not needed, but is STOP really
> removable? If leader does not send STOP to its child, does it lose a chance to
> change the worker-state to IDLE_IN_TRANSACTION?

Fixed.

> ~~~
> 11. worker.c - apply_handle_stream_start
>
> Currently the number of received chunks have not counted, but it can do if a
> variable "nchunks" is defined and incremented in apply_handle_stream_start().
> This this info may be useful to determine appropriate
> logical_decoding_work_mem for workloads. How do you think?

Since we don't have similar DEBUG message for "streaming=on" mode, so I feel
maybe we can leave this for now and add them later as a separate patch if needed.

> ~~~
> 12. worker.c - get_transaction_apply_action
>
> {} are not needed.

I am fine with either style here, so I didn’t change this.

Best regards,
Hou zj

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-10-07 06:18:09 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message John Naylor 2022-10-07 05:29:11 Re: [PoC] Improve dead tuple storage for lazy vacuum