RE: Perform streaming logical transactions by background workers and parallel apply

From: "kuroda(dot)hayato(at)fujitsu(dot)com" <kuroda(dot)hayato(at)fujitsu(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-22 08:07:49
Message-ID: TYAPR01MB5866727DCFC8B47FAC46ABF6F54E9@TYAPR01MB5866.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Dear Wang,

Thanks for updating the patch! Followings are comments for v33-0001.

===
libpqwalreceiver.c

01. inclusion

```
+#include "catalog/pg_subscription.h"
```

We don't have to include it because the analysis of parameters is done at caller.

===
launcher.c

02. logicalrep_worker_launch()

```
+ /*
+ * Return silently if the number of parallel apply workers reached the
+ * limit per subscription.
+ */
+ if (is_subworker && nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
```

a.
I felt that it might be kind if we output some debug messages.

b.
The if statement seems to be more than 80 characters. You can move to new line around "nparallelapplyworkers >= ...".

===
applyparallelworker.c

03. declaration

```
+/*
+ * Is there a message pending in parallel apply worker which we need to
+ * receive?
+ */
+volatile bool ParallelApplyMessagePending = false;
```

I checked other flags that are set by signal handlers, their datatype seemed to be sig_atomic_t.
Is there any reasons that you use normal bool? It should be changed if not.

04. HandleParallelApplyMessages()

```
+ if (winfo->error_mq_handle == NULL)
+ continue;
```

a.
I was not sure when the cell should be cleaned. Currently we clean up ParallelApplyWorkersList() only in the parallel_apply_start_worker(),
but we have chances to remove such a cell like HandleParallelApplyMessages() or HandleParallelApplyMessage(). How do you think?

b.
Comments should be added even if we keep this, like "exited worker, skipped".

```
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to the leader apply worker")));
```

c.
This function is called on the leader apply worker, so the hint should be "lost connection to the parallel apply worker".

05. parallel_apply_setup_worker()

``
+ if (launched)
+ {
+ ParallelApplyWorkersList = lappend(ParallelApplyWorkersList, winfo);
+ }
```

{} should be removed.

06. parallel_apply_wait_for_xact_finish()

```
+ /* If any workers have died, we have failed. */
```

This function checked only about a parallel apply worker, so the comment should be "if worker has..."?

===
worker.c

07. handle_streamed_transaction()

```
+ * For non-streamed transactions, returns false;
```

"returns false;" -> "returns false"

apply_handle_commit_prepared(), apply_handle_abort_prepared()

These functions are not expected that parallel worker calls
so I think Assert() should be added.

08. UpdateWorkerStats()

```
-static void
+void
UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
```

This function is called only in worker.c, should be static.

09. subscription_change_cb()

```
-static void
+void
subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
```

This function is called only in worker.c, should be static.

10. InitializeApplyWorker()

```
+/*
+ * Initialize the database connection, in-memory subscription and necessary
+ * config options.
+ */
void
-ApplyWorkerMain(Datum main_arg)
+InitializeApplyWorker(void)
```

Some comments should be added about this is a common part of leader and parallel apply worker.

===
logicalrepworker.h

11. declaration

```
extern PGDLLIMPORT volatile bool ParallelApplyMessagePending;
```

Please refer above comment.

===
guc_tables.c

12. ConfigureNamesInt

```
+ {
+ {"max_parallel_apply_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel apply workers per subscription."),
+ NULL,
+ },
+ &max_parallel_apply_workers_per_subscription,
+ 2, 0, MAX_BACKENDS,
+ NULL, NULL, NULL
+ },
```

This parameter can be changed by pg_ctl reload, so the following corner case may be occurred.
Should we add a assign hook to handle this? Or, can we ignore it?

1. set max_parallel_apply_workers_per_subscription to 4.
2. start replicating two streaming transactions.
3. commit transactions
=== Two parallel workers will be remained ===
4. change max_parallel_apply_workers_per_subscription to 3
5. We expected that only one worker remains, but two parallel workers remained.
It will be not stopped until another streamed transaction is started and committed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andrew Dunstan 2022-09-22 08:29:15 Re: [RFC] building postgres with meson - v13
Previous Message Michael Paquier 2022-09-22 07:43:19 Re: Refactor backup related code (was: Is it correct to say, "invalid data in file \"%s\"", BACKUP_LABEL_FILE in do_pg_backup_stop?)