Re: Perform streaming logical transactions by background workers and parallel apply

From: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-07 10:17:32
Message-ID: CAD21AoDGLd9h8_DQm8Of5fvAM+MDYzLtFYmEaTSibkg=iwWX7A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Nov 3, 2022 at 10:06 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Wednesday, November 2, 2022 10:50 AM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
> >
> > On Mon, Oct 24, 2022 at 8:42 PM Masahiko Sawada
> > <sawada(dot)mshk(at)gmail(dot)com> wrote:
> > >
> > > On Wed, Oct 12, 2022 at 3:04 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> > wrote:
> > > >
> > > > On Tue, Oct 11, 2022 at 5:52 AM Masahiko Sawada
> > <sawada(dot)mshk(at)gmail(dot)com> wrote:
> > > > >
> > > > > On Fri, Oct 7, 2022 at 2:00 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> > wrote:
> > > > > >
> > > > > > About your point that having different partition structures for
> > > > > > publisher and subscriber, I don't know how common it will be once we
> > > > > > have DDL replication. Also, the default value of
> > > > > > publish_via_partition_root is false which doesn't seem to indicate
> > > > > > that this is a quite common case.
> > > > >
> > > > > So how can we consider these concurrent issues that could happen only
> > > > > when streaming = 'parallel'? Can we restrict some use cases to avoid
> > > > > the problem or can we have a safeguard against these conflicts?
> > > > >
> > > >
> > > > Yeah, right now the strategy is to disallow parallel apply for such
> > > > cases as you can see in *0003* patch.
> > >
> > > Tightening the restrictions could work in some cases but there might
> > > still be coner cases and it could reduce the usability. I'm not really
> > > sure that we can ensure such a deadlock won't happen with the current
> > > restrictions. I think we need something safeguard just in case. For
> > > example, if the leader apply worker is waiting for a lock acquired by
> > > its parallel worker, it cancels the parallel worker's transaction,
> > > commits its transaction, and restarts logical replication. Or the
> > > leader can log the deadlock to let the user know.
> > >
> >
> > As another direction, we could make the parallel apply feature robust
> > if we can detect deadlocks that happen among the leader worker and
> > parallel workers. I'd like to summarize the idea discussed off-list
> > (with Amit, Hou-San, and Kuroda-San) for discussion. The basic idea is
> > that when the leader worker or parallel worker needs to wait for
> > something (eg. transaction completion, messages) we use lmgr
> > functionality so that we can create wait-for edges and detect
> > deadlocks in lmgr.
> >
> > For example, a scenario where a deadlock occurs is the following:
> >
> > [Publisher]
> > create table tab1(a int);
> > create publication pub for table tab1;
> >
> > [Subcriber]
> > creat table tab1(a int primary key);
> > create subscription sub connection 'port=10000 dbname=postgres'
> > publication pub with (streaming = parallel);
> >
> > TX1:
> > BEGIN;
> > INSERT INTO tab1 SELECT i FROM generate_series(1, 5000) s(i); -- streamed
> > Tx2:
> > BEGIN;
> > INSERT INTO tab1 SELECT i FROM generate_series(1, 5000) s(i); -- streamed
> > COMMIT;
> > COMMIT;
> >
> > Suppose a parallel apply worker (PA-1) is executing TX-1 and the
> > leader apply worker (LA) is executing TX-2 concurrently on the
> > subscriber. Now, LA is waiting for PA-1 because of the unique key of
> > tab1 while PA-1 is waiting for LA to send further messages. There is a
> > deadlock between PA-1 and LA but lmgr cannot detect it.
> >
> > One idea to resolve this issue is that we have LA acquire a session
> > lock on a shared object (by LockSharedObjectForSession()) and have
> > PA-1 wait on the lock before trying to receive messages. IOW, LA
> > acquires the lock before sending STREAM_STOP and releases it if
> > already acquired before sending STREAM_START, STREAM_PREPARE and
> > STREAM_COMMIT. For PA-1, it always needs to acquire the lock after
> > processing STREAM_STOP and then release immediately after acquiring
> > it. That way, when PA-1 is waiting for LA, we can have a wait-edge
> > from PA-1 to LA in lmgr, which will make a deadlock in lmgr like:
> >
> > LA (waiting to acquire lock) -> PA-1 (waiting to acquire the shared
> > object) -> LA
> >
> > We would need the shared objects per parallel apply worker.
> >
> > After detecting a deadlock, we can restart logical replication with
> > temporarily disabling the parallel apply, which is done by 0005 patch.
> >
> > Another scenario is similar to the previous case but TX-1 and TX-2 are
> > executed by two parallel apply workers (PA-1 and PA-2 respectively).
> > In this scenario, PA-2 is waiting for PA-1 to complete its transaction
> > while PA-1 is waiting for subsequent input from LA. Also, LA is
> > waiting for PA-2 to complete its transaction in order to preserve the
> > commit order. There is a deadlock among three processes but it cannot
> > be detected in lmgr because the fact that LA is waiting for PA-2 to
> > complete its transaction doesn't appear in lmgr (see
> > parallel_apply_wait_for_xact_finish()). To fix it, we can use
> > XactLockTableWait() instead.
> >
> > However, since XactLockTableWait() considers PREPARED TRANSACTION as
> > still in progress, probably we need a similar trick as above in case
> > where a transaction is prepared. For example, suppose that TX-2 was
> > prepared instead of committed in the above scenario, PA-2 acquires
> > another shared lock at START_STREAM and releases it at
> > STREAM_COMMIT/PREPARE. LA can wait on the lock.
> >
> > Yet another scenario where LA has to wait is the case where the shm_mq
> > buffer is full. In the above scenario (ie. PA-1 and PA-2 are executing
> > transactions concurrently), if the shm_mq buffer between LA and PA-2
> > is full, LA has to wait to send messages, and this wait doesn't appear
> > in lmgr. To fix it, probably we have to use non-blocking write and
> > wait with a timeout. If timeout is exceeded, the LA will write to file
> > and indicate PA-2 that it needs to read file for remaining messages.
> > Then LA will start waiting for commit which will detect deadlock if
> > any.
> >
> > If we can detect deadlocks by having such a functionality or some
> > other way then we don't need to tighten the restrictions of subscribed
> > tables' schemas etc.
>
> Thanks for the analysis and summary !
>
> I tried to implement the above idea and here is the patch set. I have done some
> basic tests for the new codes and it work fine.

Thank you for updating the patches!

Here are comments on v42-0001:

We have the following three similar name functions regarding to
starting a new parallel apply worker:

parallel_apply_start_worker()
parallel_apply_setup_worker()
parallel_apply_setup_dsm()

It seems to me that we can somewhat merge them since
parallel_apply_setup_worker() and parallel_apply_setup_dsm() have only
one caller.

---
+/*
+ * Extract the streaming mode value from a DefElem. This is like
+ * defGetBoolean() but also accepts the special value of "parallel".
+ */
+char
+defGetStreamingMode(DefElem *def)

It's a bit unnatural to have this function in define.c since other
functions in this file for primitive data types. How about having it
in subscription.c?

---
/*
* Exit if any parameter that affects the remote connection
was changed.
- * The launcher will start a new worker.
+ * The launcher will start a new worker, but note that the
parallel apply
+ * worker may or may not restart depending on the value of
the streaming
+ * option and whether there will be a streaming transaction.

In which case does the parallel apply worker don't restart even if the
streaming option has been changed?

---
I think we should explain somewhere the idea of using locks for
synchronization between leader and worker. Maybe can we do that with
sample workload in new README file?

---
in parallel_apply_send_data():

+ result = shm_mq_send(winfo->mq_handle, nbytes, data,
true, true);
+
+ if (result == SHM_MQ_SUCCESS)
+ break;
+ else if (result == SHM_MQ_DETACHED)
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not send data
to shared-memory queue")))
+
+ Assert(result == SHM_MQ_WOULD_BLOCK);
+
+ if (++retry >= CHANGES_THRESHOLD)
+ {
+ MemoryContext oldcontext;
+ StringInfoData msg;
+ TimestampTz now = GetCurrentTimestamp();
+
+ if (startTime == 0)
+ startTime = now;
+
+ if (!TimestampDifferenceExceeds(startTime,
now, SHM_SEND_TIMEOUT_MS))
+ continue;

IIUC since the parallel worker retries to send data without waits the
'retry' will get larger than CHANGES_THRESHOLD in a very short time.
But the worker waits at least for SHM_SEND_TIMEOUT_MS to spool data
regardless of 'retry' count. Don't we need to nap somewhat and why do
we need CHANGES_THRESHOLD?

---
+/*
+ * Wait until the parallel apply worker's xact_state flag becomes
+ * the same as in_xact.
+ */
+static void
+parallel_apply_wait_for_in_xact(ParallelApplyWorkerShared *wshared,
+
ParallelTransState xact_state)
+{
+ for (;;)
+ {
+ /* Stop if the flag becomes the same as in_xact. */

What do you mean by 'in_xact' here?

---
I got the error "ERROR: invalid logical replication message type ""
with the following scenario:

1. Stop the PA by sending SIGSTOP signal.
2. Stream a large transaction so that the LA spools changes to the file for PA.
3. Resume the PA by sending SIGCONT signal.
4. Stream another large transaction.

---
* On publisher (with logical_decoding_work_mem = 64kB)
begin;
insert into t select generate_series(1, 1000);
rollback;
begin;
insert into t select generate_series(1, 1000);
rollback;

I got the following error:

ERROR: hash table corrupted
CONTEXT: processing remote data for replication origin "pg_16393"
during message type "STREAM START" in transaction 734

---
IIUC the changes for worker.c in 0001 patch includes both changes:

1. apply worker takes action based on the apply_action returned by
get_transaction_apply_action() per message (or streamed chunk).
2. apply worker supports handling parallel apply workers.

It seems to me that (1) is a rather refactoring patch, so probably we
can do that in a separate patch so that we can make the patches
smaller.

---
postgres(1:2831190)=# \dRs+ test_sub1
List of subscriptions
-[ RECORD 1 ]------+--------------------------
Name | test_sub1
Owner | masahiko
Enabled | t
Publication | {test_pub1}
Binary | f
Streaming | p
Two-phase commit | d
Disable on error | f
Origin | any
Synchronous commit | off
Conninfo | port=5551 dbname=postgres
Skip LSN | 0/0

It's better to show 'on', 'off' or 'streaming' rather than one character.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Aleksander Alekseev 2022-11-07 10:35:18 Re: Pluggable toaster
Previous Message Aleksander Alekseev 2022-11-07 10:07:45 Re: Adding doubly linked list type which stores the number of items in the list