Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-22 05:19:45
Message-ID: CAHut+PtdHV-sMqczE495bZonAXWP8ba+axvb8h14H8wkc5fxUw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Thanks for addressing my review comments on v47-0001.

Here are my review comments for v49-0001.

======

src/backend/replication/logical/applyparallelworker.c

1. GENERAL - NULL checks

There is inconsistent NULL checking in the patch.

Sometimes it is like (!winfo)
Sometimes explicit NULL checks like (winfo->mq_handle != NULL)

(That is just one example -- there are differences in many places)

It would be better to use a consistent style everywhere.

~

2. GENERAL - Error message worker name

2a.
In worker.c all the messages are now "logical replication apply
worker" or "logical replication parallel apply worker" etc, but in the
applyparallel.c sometimes the "logical replication" part is missing.
IMO all the messages in this patch should be consistently worded.

I've reported some of them in the following comment below, but please
search the whole patch for any I might have missed.

2b.
Consider if maybe all of these ought to be calling get_worker_name()
which is currently static in worker.c. Doing this means any future
changes to get_worker_name won't cause more inconsistencies.

~~~

3. File header comment

+ * IDENTIFICATION src/backend/replication/logical/applyparallelworker.c

The word "IDENTIFICATION" should be on a separate line (for
consistency with every other PG source file)

~

4.

+ * In order for lmgr to detect this, we have LA acquire a session lock on the
+ * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
+ * trying to receive messages. In other words, LA acquires the lock before
+ * sending STREAM_STOP and releases it if already acquired before sending
+ * STREAM_START, STREAM_ABORT(for toplevel transaction), STREAM_PREPARE and
+ * STREAM_COMMIT. For PA, it always needs to acquire the lock after processing
+ * STREAM_STOP and STREAM_ABORT(for subtransaction) and then release
+ * immediately after acquiring it. That way, when PA is waiting for LA, we can
+ * have a wait-edge from PA to LA in lmgr, which will make a deadlock in lmgr
+ * like:

Missing spaces before '(' deliberate?

~~~

5. globals

+/*
+ * Is there a message sent by parallel apply worker which the leader apply
+ * worker need to receive?
+ */
+volatile sig_atomic_t ParallelApplyMessagePending = false;

SUGGESTION
Is there a message sent by a parallel apply worker that the leader
apply worker needs to receive?

~~~

6. pa_get_available_worker

+/*
+ * get an available parallel apply worker from the worker pool.
+ */
+static ParallelApplyWorkerInfo *
+pa_get_available_worker(void)

Uppercase comment

~

7.

+ /*
+ * We first try to free the worker to improve our chances of getting
+ * the worker. Normally, we free the worker after ensuring that the
+ * transaction is committed by parallel worker but for rollbacks, we
+ * don't wait for the transaction to finish so can't free the worker
+ * information immediately.
+ */

7a.
"We first try to free the worker to improve our chances of getting the worker."

SUGGESTION
We first try to free the worker to improve our chances of finding one
that is not in use.

~

7b.
"parallel worker" -> "the parallel worker"

~~~

8. pa_allocate_worker

+ /* Try to get a free parallel apply worker. */
+ winfo = pa_get_available_worker();
+

SUGGESTION
First, try to get a parallel apply worker from the pool.

~~~

9. pa_free_worker

+ * This removes the parallel apply worker entry from the hash table so that it
+ * can't be used. This either stops the worker and free the corresponding info,
+ * if there are enough workers in the pool or just marks it available for
+ * reuse.

BEFORE
This either stops the worker and free the corresponding info, if there
are enough workers in the pool or just marks it available for reuse.

SUGGESTION
If there are enough workers in the pool it stops the worker and frees
the corresponding info, otherwise it just marks the worker as
available for reuse.

~

10.

+ /* Free the corresponding info if the worker exited cleanly. */
+ if (winfo->error_mq_handle == NULL)
+ {
+ pa_free_worker_info(winfo);
+ return true;
+ }

Is it correct that this bypasses the removal from the hash table?

~

11.

+
+ /* Worker is already available for reuse. */
+ if (!winfo->in_use)
+ return false;

Should this quick-exit check for in_use come first?

~~

12. HandleParallelApplyMessage

+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("parallel apply worker exited abnormally"),
+ errcontext("%s", edata.context)));

Maybe "parallel apply worker" -> "logical replication parallel apply
worker" (for consistency with the other error messages)

~

13.

+ default:
+ elog(ERROR, "unrecognized message type received from parallel apply
worker: %c (message length %d bytes)",
+ msgtype, msg->len);
+ }

ditto #12 above.

~

14.

+ case 'X': /* Terminate, indicating clean exit. */
+ {
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ break;
+ }
+ default:

No need for the { } here.

~~~

15. HandleParallelApplyMessage

+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to the parallel apply worker")));
+ }

"parallel apply worker" -> "logical replication parallel apply worker"

~~~

16. pa_init_and_launch_worker

+ /* Setup shared memory. */
+ if (!pa_setup_dsm(winfo))
+ {
+ MemoryContextSwitchTo(oldcontext);
+ pfree(winfo);
+ return NULL;
+ }

Wouldn't it be better to do the pfree before switching back to the oldcontext?

~~~

17. pa_send_data

+ /* Wait before retrying. */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ SHM_SEND_RETRY_INTERVAL_MS,
+ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }

Instead of CHECK_FOR_INTERRUPTS, should this be calling your new
function ProcessParallelApplyInterrupts?

~

18.

+ if (startTime == 0)
+ startTime = GetCurrentTimestamp();
+ else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
+ SHM_SEND_TIMEOUT_MS))
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating logical replication parallel apply worker due to
timeout")));

I'd previously commented that the timeout calculation seemed wrong.
Hou-san replied [1,#9] "start counting from the first failure looks
fine to me." but I am not so sure - e.g. If the timeout is 10s then I
expect it to fail ~10s after the function is called, not 11s after. I
know it's pedantic, but where's the harm in making the calculation
right instead of just nearly right?

IMO probably an easy fix for this is like:

#define SHM_SEND_RETRY_INTERVAL_MS 1000
#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)

~~~

19. pa_wait_for_xact_state

+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();

Instead of CHECK_FOR_INTERRUPTS, should this be calling your new
function ProcessParallelApplyInterrupts?

~~~

20. pa_savepoint_name

+static void
+pa_savepoint_name(Oid suboid, TransactionId xid, char *spname,
+ Size szsp)

Unnecessary wrapping?

======

src/backend/replication/logical/origin.c

21. replorigin_session_setup

+ * However, we do allow multiple processes to point to the same origin slot
+ * if requested by the caller by passing PID of the process that has already
+ * acquired it. This is to allow using the same origin by multiple parallel
+ * apply processes the provided they maintain commit order, for example, by
+ * allowing only one process to commit at a time.

21a.
I thought the comment should mention this is optional and the special
value acquired_by=0 means don't do this.

~

21b.
"the provided they" ?? typo?

======

src/backend/replication/logical/tablesync.c

22. process_syncing_tables

process_syncing_tables(XLogRecPtr current_lsn)
{
+ /*
+ * Skip for parallel apply workers as they don't operate on tables that
+ * are not in ready state. See pa_can_start() and
+ * should_apply_changes_for_rel().
+ */
+ if (am_parallel_apply_worker())
+ return;

SUGGESTION (remove the double negative)
Skip for parallel apply workers because they only operate on tables
that are in a READY state. See pa_can_start() and
should_apply_changes_for_rel().

======

src/backend/replication/logical/worker.c

23. apply_handle_stream_stop

Previously I suggested that this lock/unlock seems too tricky and
needed a comment. The reply [1,#12] was that this is already described
atop parallelapplyworker.c. OK, but in that case maybe here the
comment can just refer to that explanation:

SUGGESTION
Refer to the comments atop applyparallelworker.c for what this lock
and immediate unlock is doing.

~~~

24. apply_handle_stream_abort

+ if (pg_atomic_sub_fetch_u32(&(MyParallelShared->pending_stream_count),
1) == 0)
+ {
+ pa_lock_stream(MyParallelShared->xid, AccessShareLock);
+ pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
+ }

ditto comment #23

~~~

25. apply_worker_clean_exit

+void
+apply_worker_clean_exit(void)
+{
+ /* Notify the leader apply worker that we have exited cleanly. */
+ if (am_parallel_apply_worker())
+ pq_putmessage('X', NULL, 0);
+
+ proc_exit(0);
+}

Somehow it doesn't seem right that the PA worker sending 'X' is here
in worker.c, while the LA worker receipt of this 'X' is in the other
applyparallelworker.c module. Maybe that other function
HandleParallelApplyMessage should also be here in worker.c?

======

src/backend/utils/misc/guc_tables.c

26.

@@ -2957,6 +2957,18 @@ struct config_int ConfigureNamesInt[] =
NULL,
},
&max_sync_workers_per_subscription,
+ 2, 0, MAX_PARALLEL_WORKER_LIMIT,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"max_parallel_apply_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel apply workers per subscription."),
+ NULL,
+ },
+ &max_parallel_apply_workers_per_subscription,
2, 0, MAX_BACKENDS,
NULL, NULL, NULL

Is this correct? Did you mean to change
max_sync_workers_per_subscription, My 1st impression is that there has
been some mixup with the MAX_PARALLEL_WORKER_LIMIT and MAX_BACKENDS or
that this change was accidentally made to the wrong GUC.

======

src/include/replication/worker_internal.h

27. ParallelApplyWorkerShared

+ /*
+ * Indicates whether there are pending streaming blocks in the queue. The
+ * parallel apply worker will check it before starting to wait.
+ */
+ pg_atomic_uint32 pending_stream_count;

A better name might be 'n_pending_stream_blocks'.

~

28. function names

extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_parallel_apply_worker_stop(int slot_no, uint16
generation);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);

extern int logicalrep_sync_worker_count(Oid subid);
+extern int logicalrep_parallel_apply_worker_count(Oid subid);

Would it be better to call those new functions using similar shorter
names as done elsewhere?

logicalrep_parallel_apply_worker_stop -> logicalrep_pa_worker_stop
logicalrep_parallel_apply_worker_count -> logicalrep_pa_worker_count

------
[1] Hou-san's reply to my review v47-0001.
https://www.postgresql.org/message-id/OS0PR01MB571680391393F3CB63469F3E940A9%40OS0PR01MB5716.jpnprd01.prod.outlook.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Julien Rouhaud 2022-11-22 05:25:56 Re: proposal: possibility to read dumped table's name from file
Previous Message Ted Yu 2022-11-22 05:00:01 Re: Partial aggregates pushdown