RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-21 12:34:35
Message-ID: OS0PR01MB57163ABCD8680F43975DA7B8940A9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Friday, November 18, 2022 8:36 AM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
>
> Here are review comments on v47-0001 and v47-0002 patches:

Thanks for the comments!

> When the parallel apply worker exited, I got the following server log.
> I think this log is not appropriate since the worker was not terminated by
> administrator command but exited by itself. Also, probably it should exit with
> exit code 0?
>
> FATAL: terminating logical replication worker due to administrator command
> LOG: background worker "logical replication parallel worker" (PID
> 3594918) exited with exit code 1

Changed to report a LOG and exited with code 0.

> ---
> /*
> * Stop the worker if there are enough workers in the pool or the leader
> * apply worker serialized part of the transaction data to a file due to
> * send timeout.
> */
> if (winfo->serialize_changes ||
> napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
>
> Why do we need to stop the worker if the leader serializes changes?

Because there might be partial sent message left in memory queue if send timeout.
And we need to either re-send the same message until success or detach from the memory
queue. To make the logic simple, the patch directly stop the worker in this case.

> ---
> + /*
> + * Release all session level locks that could be held in parallel apply
> + * mode.
> + */
> + LockReleaseAll(DEFAULT_LOCKMETHOD, true);
> +
>
> I think we call LockReleaseAll() at the process exit (in ProcKill()), but do we
> really need to do LockReleaseAll() here too?

If we don't release locks before ProcKill, we might break an Assert check at
the beginning of ProcKill which is used to ensure all the locks are released.
And It seems ProcKill doesn't release session level locks after the assert
check. So I think we'd better release them here.

> ---
>
> + elog(ERROR, "could not find replication state slot
> for replication"
> + "origin with OID %u which was acquired by
> %d", node, acquired_by);
>
> Let's not break the error log message in the middle so that the user can search
> the message by grep easily.

Changed.

> ---
> + {
> + {"max_parallel_apply_workers_per_subscription",
> + PGC_SIGHUP,
> + REPLICATION_SUBSCRIBERS,
> + gettext_noop("Maximum number of parallel
> apply workers per subscription."),
> + NULL,
> + },
> + &max_parallel_apply_workers_per_subscription,
> + 2, 0, MAX_BACKENDS,
> + NULL, NULL, NULL
> + },
> +
>
> I think we should use MAX_PARALLEL_WORKER_LIMIT as the max value instead.
> MAX_BACKENDS is too high.

Changed.

> ---
> + /*
> + * Indicates whether there are pending messages in the queue.
> The parallel
> + * apply worker will check it before starting to wait.
> + */
> + pg_atomic_uint32 pending_message_count;
>
> The "pending messages" sounds like individual logical replication messages
> such as LOGICAL_REP_MSG_INSERT. But IIUC what this value actually shows is
> how many streamed chunks are pending to process, right?

Yes, renamed this.

> ---
> When the parallel apply worker raises an error, I got the same error twice from
> the leader worker and parallel worker as follows. Can we suppress either one?
>
> 2022-11-17 17:30:23.490 JST [3814552] LOG: logical replication parallel apply
> worker for subscription "test_sub1" has started
> 2022-11-17 17:30:23.490 JST [3814552] ERROR: duplicate key value violates
> unique constraint "test1_c_idx"
> 2022-11-17 17:30:23.490 JST [3814552] DETAIL: Key (c)=(1) already exists.
> 2022-11-17 17:30:23.490 JST [3814552] CONTEXT: processing remote data for
> replication origin "pg_16390" during message type "INSERT" for replication
> target relatio n "public.test1" in transaction 731
> 2022-11-17 17:30:23.490 JST [3814550] ERROR: duplicate key value violates
> unique constraint "test1_c_idx"
> 2022-11-17 17:30:23.490 JST [3814550] DETAIL: Key (c)=(1) already exists.
> 2022-11-17 17:30:23.490 JST [3814550] CONTEXT: processing remote data for
> replication origin "pg_16390" during message type "INSERT" for replication
> target relatio n "public.test1" in transaction 731
> parallel apply worker

It seems similar to the behavior of parallel query which will report the same
error twice. But I agree it might be better for the leader to report something
different. So, I changed the error message reported by leader in the new
version patch.

Best regards,
Hou zj

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2022-11-21 12:36:09 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message houzj.fnst@fujitsu.com 2022-11-21 12:34:02 RE: Perform streaming logical transactions by background workers and parallel apply