RE: Perform streaming logical transactions by background workers and parallel apply

From: "kuroda(dot)hayato(at)fujitsu(dot)com" <kuroda(dot)hayato(at)fujitsu(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: 'Amit Kapila' <amit(dot)kapila16(at)gmail(dot)com>, 'Dilip Kumar' <dilipbalaut(at)gmail(dot)com>, 'Masahiko Sawada' <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, 'Peter Smith' <smithpb2250(at)gmail(dot)com>, 'PostgreSQL Hackers' <pgsql-hackers(at)lists(dot)postgresql(dot)org>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-10-06 12:39:35
Message-ID: TYAPR01MB5866C7ED7046BBFC1B8FDD26F55C9@TYAPR01MB5866.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Dear Hou,

I put comments for v35-0001.

01. catalog.sgml

```
+ Controls how to handle the streaming of in-progress transactions:
+ <literal>f</literal> = disallow streaming of in-progress transactions,
+ <literal>t</literal> = spill the changes of in-progress transactions to
+ disk and apply at once after the transaction is committed on the
+ publisher,
+ <literal>p</literal> = apply changes directly using a parallel apply
+ worker if available (same as 't' if no worker is available)
```

I'm not sure why 't' means "spill the changes to file". Is it compatibility issue?

~~~
02. applyworker.c - parallel_apply_stream_abort

The argument abort_data is not modified in the function. Maybe "const" modifier should be added.
(Other functions should be also checked...)

~~~
03. applyparallelworker.c - parallel_apply_find_worker

```
+ ParallelApplyWorkerEntry *entry = NULL;
```

This may not have to be initialized here.

~~~
04. applyparallelworker.c - HandleParallelApplyMessages

```
+ static MemoryContext hpm_context = NULL;
```

I think "hpm" means "handle parallel message", so it should be "hpam".

~~~
05. launcher.c - logicalrep_worker_launch()

```
if (is_subworker)
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
else
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
```

I'm not sure why there are only bgw_type even if there are three types of apply workers. Is it for compatibility?

~~~
06. launcher.c - logicalrep_worker_stop_by_slot

An assertion like Assert(slot_no >=0 && slot_no < max_logical_replication_workers) should be added at the top of this function.

~~~
07. launcher.c - logicalrep_worker_stop_internal

```
+/*
+ * Workhorse for logicalrep_worker_stop(), logicalrep_worker_detach() and
+ * logicalrep_worker_stop_by_slot(). Stop the worker and wait for it to die.
+ */
+static void
+logicalrep_worker_stop_internal(LogicalRepWorker *worker)
```

I think logicalrep_worker_stop_internal() may be not "Workhorse" for logicalrep_worker_detach(). In the function internal function is called for parallel apply worker, and it does not main part of the detach function.

~~~
08. worker.c - handle_streamed_transaction()

```
+ TransactionId current_xid = InvalidTransactionId;
```

This initialization is not needed. This is not used in non-streaming mode, otherwise it is substituted before used.

~~~
09. worker.c - handle_streamed_transaction()

```
+ case TRANS_PARALLEL_APPLY:
+ /* Define a savepoint for a subxact if needed. */
+ parallel_apply_start_subtrans(current_xid, stream_xid);
+ return false;
```

Based on other case-block, Assert(am_parallel_apply_worker()) may be added at the top of this part.
This suggestion can be said for other swith-case statements.

~~~
10. worker.c - apply_handle_stream_start

```
+ *
+ * XXX We can avoid sending pair of the START/STOP messages to the parallel
+ * worker because unlike apply worker it will process only one
+ * transaction-at-a-time. However, it is not clear whether that is worth the
+ * effort because it is sent after logical_decoding_work_mem changes.
```

I can understand that START message is not needed, but is STOP really removable? If leader does not send STOP to its child, does it lose a chance to change the worker-state to IDLE_IN_TRANSACTION?

~~~
11. worker.c - apply_handle_stream_start

Currently the number of received chunks have not counted, but it can do if a variable "nchunks" is defined and incremented in apply_handle_stream_start(). This this info may be useful to determine appropriate logical_decoding_work_mem for workloads. How do you think?

~~~
12. worker.c - get_transaction_apply_action

{} are not needed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Peter Eisentraut 2022-10-06 12:39:51 Re: Allow foreign keys to reference a superset of unique columns
Previous Message Drouvot, Bertrand 2022-10-06 12:36:43 Re: Record SET session in VariableSetStmt