RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-22 12:42:24
Message-ID: OS0PR01MB5716339FF7CB759E751492CB940D9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tues, November 22, 2022 13:20 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> Thanks for addressing my review comments on v47-0001.
>
> Here are my review comments for v49-0001.

Thanks for your comments.

> ======
>
> src/backend/replication/logical/applyparallelworker.c
>
> 1. GENERAL - NULL checks
>
> There is inconsistent NULL checking in the patch.
>
> Sometimes it is like (!winfo)
> Sometimes explicit NULL checks like (winfo->mq_handle != NULL)
>
> (That is just one example -- there are differences in many places)
>
> It would be better to use a consistent style everywhere.

Changed.

> ~
>
> 2. GENERAL - Error message worker name
>
> 2a.
> In worker.c all the messages are now "logical replication apply
> worker" or "logical replication parallel apply worker" etc, but in the
> applyparallel.c sometimes the "logical replication" part is missing.
> IMO all the messages in this patch should be consistently worded.
>
> I've reported some of them in the following comment below, but please
> search the whole patch for any I might have missed.

Rename LA and PA to the following styles:
```
LA -> logical replication apply worker
PA -> logical replication parallel apply worker ```

> 2b.
> Consider if maybe all of these ought to be calling get_worker_name()
> which is currently static in worker.c. Doing this means any future
> changes to get_worker_name won't cause more inconsistencies.

The most error message in applyparallelxx.c can only use "xx parallel worker",
so I think it's fine not to call get_worker_name

> ~~~
>
> 3. File header comment
>
> + * IDENTIFICATION
> + src/backend/replication/logical/applyparallelworker.c
>
> The word "IDENTIFICATION" should be on a separate line (for
> consistency with every other PG source file)

Fixed.

> ~
>
> 4.
>
> + * In order for lmgr to detect this, we have LA acquire a session
> + lock on the
> + * remote transaction (by pa_lock_stream()) and have PA wait on the
> + lock
> before
> + * trying to receive messages. In other words, LA acquires the lock
> + before
> + * sending STREAM_STOP and releases it if already acquired before
> + sending
> + * STREAM_START, STREAM_ABORT(for toplevel transaction),
> STREAM_PREPARE and
> + * STREAM_COMMIT. For PA, it always needs to acquire the lock after
> processing
> + * STREAM_STOP and STREAM_ABORT(for subtransaction) and then release
> + * immediately after acquiring it. That way, when PA is waiting for
> + LA, we can
> + * have a wait-edge from PA to LA in lmgr, which will make a deadlock
> + in lmgr
> + * like:
>
> Missing spaces before '(' deliberate?

Added.

> ~~~
>
> 5. globals
>
> +/*
> + * Is there a message sent by parallel apply worker which the leader
> +apply
> + * worker need to receive?
> + */
> +volatile sig_atomic_t ParallelApplyMessagePending = false;
>
> SUGGESTION
> Is there a message sent by a parallel apply worker that the leader
> apply worker needs to receive?

Changed.

> ~~~
>
> 6. pa_get_available_worker
>
> +/*
> + * get an available parallel apply worker from the worker pool.
> + */
> +static ParallelApplyWorkerInfo *
> +pa_get_available_worker(void)
>
> Uppercase comment

Changed.

> ~
>
> 7.
>
> + /*
> + * We first try to free the worker to improve our chances of getting
> + * the worker. Normally, we free the worker after ensuring that the
> + * transaction is committed by parallel worker but for rollbacks, we
> + * don't wait for the transaction to finish so can't free the worker
> + * information immediately.
> + */
>
> 7a.
> "We first try to free the worker to improve our chances of getting the worker."
>
> SUGGESTION
> We first try to free the worker to improve our chances of finding one
> that is not in use.
>
> ~
>
> 7b.
> "parallel worker" -> "the parallel worker"

Changed.

> ~~~
>
> 8. pa_allocate_worker
>
> + /* Try to get a free parallel apply worker. */ winfo =
> + pa_get_available_worker();
> +
>
> SUGGESTION
> First, try to get a parallel apply worker from the pool.

Changed.

> ~~~
>
> 9. pa_free_worker
>
> + * This removes the parallel apply worker entry from the hash table
> + so that it
> + * can't be used. This either stops the worker and free the
> + corresponding info,
> + * if there are enough workers in the pool or just marks it available
> + for
> + * reuse.
>
> BEFORE
> This either stops the worker and free the corresponding info, if there
> are enough workers in the pool or just marks it available for reuse.
>
> SUGGESTION
> If there are enough workers in the pool it stops the worker and frees
> the corresponding info, otherwise it just marks the worker as
> available for reuse.

Changed.

> ~
>
> 10.
>
> + /* Free the corresponding info if the worker exited cleanly. */ if
> + (winfo->error_mq_handle == NULL) { pa_free_worker_info(winfo);
> + return true; }
>
> Is it correct that this bypasses the removal from the hash table?

I rethink about this, it seems unnecessary to free the information here as
we don't expect the worker to stop unless the leader as them to stop.
So, I temporarily remove this and will think about this in next version.

> ~
>
> 14.
>
> + case 'X': /* Terminate, indicating clean exit. */ {
> + shm_mq_detach(winfo->error_mq_handle);
> + winfo->error_mq_handle = NULL;
> + break;
> + }
> + default:
>
>
> No need for the { } here.

Changed.

> ~~~
>
> 16. pa_init_and_launch_worker
>
> + /* Setup shared memory. */
> + if (!pa_setup_dsm(winfo))
> + {
> + MemoryContextSwitchTo(oldcontext);
> + pfree(winfo);
> + return NULL;
> + }
>
>
> Wouldn't it be better to do the pfree before switching back to the oldcontext?

I think either style seems fine.

> ~~~
>
> 17. pa_send_data
>
> + /* Wait before retrying. */
> + rc = WaitLatch(MyLatch,
> + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
> + SHM_SEND_RETRY_INTERVAL_MS,
> + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
> +
> + if (rc & WL_LATCH_SET)
> + {
> + ResetLatch(MyLatch);
> + CHECK_FOR_INTERRUPTS();
> + }
>
>
> Instead of CHECK_FOR_INTERRUPTS, should this be calling your new
> function ProcessParallelApplyInterrupts?

I thought the ProcessParallelApplyInterrupts is intended to be invoked only in main
loop(LogicalParallelApplyLoop) to make the parallel apply worker exit cleanly.

> ~
>
> 18.
>
> + if (startTime == 0)
> + startTime = GetCurrentTimestamp();
> + else if (TimestampDifferenceExceeds(startTime,
> + GetCurrentTimestamp(),
> + SHM_SEND_TIMEOUT_MS))
> + ereport(ERROR,
> + (errcode(ERRCODE_CONNECTION_FAILURE),
> + errmsg("terminating logical replication parallel apply worker due to
> timeout")));
>
>
> I'd previously commented that the timeout calculation seemed wrong.
> Hou-san replied [1,#9] "start counting from the first failure looks
> fine to me." but I am not so sure - e.g. If the timeout is 10s then I
> expect it to fail ~10s after the function is called, not 11s after. I
> know it's pedantic, but where's the harm in making the calculation
> right instead of just nearly right?
>
> IMO probably an easy fix for this is like:
>
> #define SHM_SEND_RETRY_INTERVAL_MS 1000 #define SHM_SEND_TIMEOUT_MS
> (10000 - SHM_SEND_RETRY_INTERVAL_MS)

OK, I moved the place of setting startTime before the WaitLatch.

> ~~~
>
> 20. pa_savepoint_name
>
> +static void
> +pa_savepoint_name(Oid suboid, TransactionId xid, char *spname,
> + Size szsp)
>
> Unnecessary wrapping?

Changed.

> ======
>
> src/backend/replication/logical/origin.c
>
> 21. replorigin_session_setup
>
> + * However, we do allow multiple processes to point to the same
> + origin slot
> + * if requested by the caller by passing PID of the process that has
> + already
> + * acquired it. This is to allow using the same origin by multiple
> + parallel
> + * apply processes the provided they maintain commit order, for
> + example, by
> + * allowing only one process to commit at a time.
>
> 21a.
> I thought the comment should mention this is optional and the special
> value acquired_by=0 means don't do this.

Added.

> ~
>
> 21b.
> "the provided they" ?? typo?

Changed.

> ======
>
> src/backend/replication/logical/tablesync.c
>
> 22. process_syncing_tables
>
> process_syncing_tables(XLogRecPtr current_lsn) {
> + /*
> + * Skip for parallel apply workers as they don't operate on tables
> + that
> + * are not in ready state. See pa_can_start() and
> + * should_apply_changes_for_rel().
> + */
> + if (am_parallel_apply_worker())
> + return;
>
> SUGGESTION (remove the double negative) Skip for parallel apply
> workers because they only operate on tables that are in a READY state.
> See pa_can_start() and should_apply_changes_for_rel().

Changed.

> ======
>
> src/backend/replication/logical/worker.c
>
> 23. apply_handle_stream_stop
>
>
> Previously I suggested that this lock/unlock seems too tricky and
> needed a comment. The reply [1,#12] was that this is already described
> atop parallelapplyworker.c. OK, but in that case maybe here the
> comment can just refer to that explanation:
>
> SUGGESTION
> Refer to the comments atop applyparallelworker.c for what this lock
> and immediate unlock is doing.
>
> ~~~
>
> 24. apply_handle_stream_abort
>
> + if
> + (pg_atomic_sub_fetch_u32(&(MyParallelShared->pending_stream_count),
> 1) == 0)
> + {
> + pa_lock_stream(MyParallelShared->xid, AccessShareLock);
> + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); }
>
> ditto comment #23

I feel the place atop the definition of pa_lock_xxx function is a better place to
put the comments, so added there. User can check it when reading the lock
functions.

> ~~~
>
> 25. apply_worker_clean_exit
>
> +void
> +apply_worker_clean_exit(void)
> +{
> + /* Notify the leader apply worker that we have exited cleanly. */
> +if (am_parallel_apply_worker()) pq_putmessage('X', NULL, 0);
> +
> + proc_exit(0);
> +}
>
> Somehow it doesn't seem right that the PA worker sending 'X' is here
> in worker.c, while the LA worker receipt of this 'X' is in the other
> applyparallelworker.c module. Maybe that other function
> HandleParallelApplyMessage should also be here in worker.c?

I thought the function apply_worker_clean_exit is widely used in worker.c and
is a common function for both leader/parallel apply workers, so I put it in
worker.c. But HandleParallelApplyMessage is a function only for parallel
worker, so it would be better to put it in applyparallelworker.c.

> ======
>
> src/backend/utils/misc/guc_tables.c
>
> 26.
>
> @@ -2957,6 +2957,18 @@ struct config_int ConfigureNamesInt[] =
> NULL,
> },
> &max_sync_workers_per_subscription,
> + 2, 0, MAX_PARALLEL_WORKER_LIMIT,
> + NULL, NULL, NULL
> + },
> +
> + {
> + {"max_parallel_apply_workers_per_subscription",
> + PGC_SIGHUP,
> + REPLICATION_SUBSCRIBERS,
> + gettext_noop("Maximum number of parallel apply workers per
> + subscription."), NULL, },
> + &max_parallel_apply_workers_per_subscription,
> 2, 0, MAX_BACKENDS,
> NULL, NULL, NULL
>
> Is this correct? Did you mean to change
> max_sync_workers_per_subscription, My 1st impression is that there has
> been some mixup with the MAX_PARALLEL_WORKER_LIMIT and MAX_BACKENDS or
> that this change was accidentally made to the wrong GUC.

Fixed.

> ======
>
> src/include/replication/worker_internal.h
>
> 27. ParallelApplyWorkerShared
>
> + /*
> + * Indicates whether there are pending streaming blocks in the queue.
> + The
> + * parallel apply worker will check it before starting to wait.
> + */
> + pg_atomic_uint32 pending_stream_count;
>
> A better name might be 'n_pending_stream_blocks'.

I am not sure if the name looks better, so didn’t change this.

> ~
>
> 28. function names
>
> extern void logicalrep_worker_stop(Oid subid, Oid relid);
> +extern void logicalrep_parallel_apply_worker_stop(int slot_no, uint16
> generation);
> extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern
> void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
>
> extern int logicalrep_sync_worker_count(Oid subid);
> +extern int logicalrep_parallel_apply_worker_count(Oid subid);
>
> Would it be better to call those new functions using similar shorter
> names as done elsewhere?
>
> logicalrep_parallel_apply_worker_stop -> logicalrep_pa_worker_stop
> logicalrep_parallel_apply_worker_count -> logicalrep_pa_worker_count

Changed.

Attach new version patch which also fixed an invalid shared memory access bug
in 0002 patch reported by Kuroda-San offlist.

Best regards,
Hou zj

Attachment Content-Type Size
v51-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch application/octet-stream 20.6 KB
v51-0001-Perform-streaming-logical-transactions-by-parall.patch application/octet-stream 186.4 KB
v51-0002-Serialize-partial-changes-to-disk-if-the-shm_mq-.patch application/octet-stream 35.6 KB
v51-0003-Test-streaming-parallel-option-in-tap-test.patch application/octet-stream 80.1 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Stavros Koureas 2022-11-22 12:52:34 Re: Logical Replication Custom Column Expression
Previous Message Bharath Rupireddy 2022-11-22 12:38:28 Re: Introduce a new view for checkpointer related stats