Re: Perform streaming logical transactions by background workers and parallel apply

From: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-18 00:35:44
Message-ID: CAD21AoDScLvLT8JBfu5WaGCPQs_qhxsybMT+sMXJ=QrDMTyr9w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tue, Nov 15, 2022 at 8:57 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Saturday, November 12, 2022 7:06 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> >
> > On Fri, Nov 11, 2022 at 2:12 PM houzj(dot)fnst(at)fujitsu(dot)com
> > <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> > >
> >
> > Few comments on v46-0001:
> > ======================
> >
>
> Thanks for the comments.
>
> > 1.
> > +static void
> > +apply_handle_stream_abort(StringInfo s)
> > {
> > ...
> > + /* Send STREAM ABORT message to the parallel apply worker. */
> > + parallel_apply_send_data(winfo, s->len, s->data);
> > +
> > + if (abort_toplevel_transaction)
> > + {
> > + parallel_apply_unlock_stream(xid, AccessExclusiveLock);
> >
> > Shouldn't we need to release this lock before sending the message as
> > we are doing for streap_prepare and stream_commit? If there is a
> > reason for doing it differently here then let's add some comments for
> > the same.
>
> Changed.
>
> > 2. It seems once the patch makes the file state as busy
> > (LEADER_FILESET_BUSY), it will only be accessible after the leader
> > apply worker receives a transaction end message like stream_commit. Is
> > my understanding correct? If yes, then why can't we make it accessible
> > after the stream_stop message? Are you worried about the concurrency
> > handling for reading and writing the file? If so, we can probably deal
> > with it via some lock for reading and writing to file for each change.
> > I think after this we may not need additional stream level lock/unlock
> > in parallel_apply_spooled_messages. I understand that you probably
> > want to keep the code simple so I am not suggesting changing it
> > immediately but just wanted to know whether you have considered
> > alternatives here.
>
> I thought about this, but it seems the current buffile design doesn't allow two
> processes to open the same buffile at the same time(refer to the comment atop
> of BufFileOpenFileSet()). This means the LA needs to make sure the PA has
> closed the buffile before writing more changes into it. Although we could let
> the LA wait for that, but it could cause another kind of deadlock. Suppose the
> PA opened the file and is blocked when applying the just read change. And the
> LA starts to wait when trying to write the next set of streaming changes into
> file because the file is still opened by PA. Then the lock edge is like:
>
> LA (wait for file to be closed) -> PA1 (wait for unique lock in PA2) -> PA2
> (wait for stream lock held in LA)
>
> We could introduce another lock for this, but that seems not very great as we
> already had two kinds of locks here.
>
> Another solution could be we create different filename for each streaming block
> so that the leader don't need to reopen the same file after writing changes
> into it, but that seems largely increase the number of temp files and looks a
> bit hacky. Or we could let PA open the file, then read and close the file for
> each change, but it seems bring some overhead of opening and closing file.
>
> Another solution which doesn't need a new lock could be that we create
> different filename for each streaming block so that the leader doesn't need to
> reopen the same file after writing changes into it, but that seems largely
> increase the number of temp files and looks a bit hacky. Or we could let PA
> open the file, then read and close the file for each change, but it seems bring
> some overhead of opening and closing file.
>
> Based on above, how about keep the current approach ?(i.e. PA
> will open the file only after the leader apply worker receives a transaction
> end message like stream_commit). Ideally, it will enter partial serialize mode
> only when PA is blocked by a backend or another PA which seems not that common.

+1. We can improve this area later in a separate patch.

Here are review comments on v47-0001 and v47-0002 patches:

When the parallel apply worker exited, I got the following server log.
I think this log is not appropriate since the worker was not
terminated by administrator command but exited by itself. Also,
probably it should exit with exit code 0?

FATAL: terminating logical replication worker due to administrator command
LOG: background worker "logical replication parallel worker" (PID
3594918) exited with exit code 1

---
/*
* Stop the worker if there are enough workers in the pool or the leader
* apply worker serialized part of the transaction data to a file due to
* send timeout.
*/
if (winfo->serialize_changes ||
napplyworkers > (max_parallel_apply_workers_per_subscription / 2))

Why do we need to stop the worker if the leader serializes changes?

---
+ /*
+ * Release all session level locks that could be held in parallel apply
+ * mode.
+ */
+ LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+

I think we call LockReleaseAll() at the process exit (in ProcKill()),
but do we really need to do LockReleaseAll() here too?

---

+ elog(ERROR, "could not find replication state slot
for replication"
+ "origin with OID %u which was acquired by
%d", node, acquired_by);

Let's not break the error log message in the middle so that the user
can search the message by grep easily.

---
+ {
+ {"max_parallel_apply_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel
apply workers per subscription."),
+ NULL,
+ },
+ &max_parallel_apply_workers_per_subscription,
+ 2, 0, MAX_BACKENDS,
+ NULL, NULL, NULL
+ },
+

I think we should use MAX_PARALLEL_WORKER_LIMIT as the max value
instead. MAX_BACKENDS is too high.

---
+ /*
+ * Indicates whether there are pending messages in the queue.
The parallel
+ * apply worker will check it before starting to wait.
+ */
+ pg_atomic_uint32 pending_message_count;

The "pending messages" sounds like individual logical replication
messages such as LOGICAL_REP_MSG_INSERT. But IIUC what this value
actually shows is how many streamed chunks are pending to process,
right?

---
The streaming parameter has the new value "parallel" for "streaming"
option to enable the parallel apply. It fits so far but I think the
parallel apply feature doesn't necessarily need to be tied up with
streaming replication. For example, we might want to support parallel
apply also for non-streaming transactions in the future. It might be
better to have another option, say "parallel", to control parallel
apply behavior. The "parallel" option can be a boolean option and
setting parallel = on requires streaming = on.

Another variant is to have a new subscription parameter for example
"parallel_workers" parameter that specifies the number of parallel
workers. That way, users can specify the number of parallel workers
per subscription.

---
When the parallel apply worker raises an error, I got the same error
twice from the leader worker and parallel worker as follows. Can we
suppress either one?

2022-11-17 17:30:23.490 JST [3814552] LOG: logical replication
parallel apply worker for subscription "test_sub1" has started
2022-11-17 17:30:23.490 JST [3814552] ERROR: duplicate key value
violates unique constraint "test1_c_idx"
2022-11-17 17:30:23.490 JST [3814552] DETAIL: Key (c)=(1) already exists.
2022-11-17 17:30:23.490 JST [3814552] CONTEXT: processing remote data
for replication origin "pg_16390" during message type "INSERT" for
replication target relatio
n "public.test1" in transaction 731
2022-11-17 17:30:23.490 JST [3814550] ERROR: duplicate key value
violates unique constraint "test1_c_idx"
2022-11-17 17:30:23.490 JST [3814550] DETAIL: Key (c)=(1) already exists.
2022-11-17 17:30:23.490 JST [3814550] CONTEXT: processing remote data
for replication origin "pg_16390" during message type "INSERT" for
replication target relatio
n "public.test1" in transaction 731
parallel apply worker

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2022-11-18 00:39:30 Re: Optimize join selectivity estimation by not reading MCV stats for unique join attributes
Previous Message Tomas Vondra 2022-11-18 00:27:45 Re: Optimize join selectivity estimation by not reading MCV stats for unique join attributes