Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-18 02:26:45
Message-ID: CAA4eK1KWgReYbpwEMh1H1ohHoYirv4Aa=6v13MutCF9NvHTc5A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Nov 16, 2022 at 1:50 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Tuesday, November 15, 2022 7:58 PM houzj(dot)fnst(at)fujitsu(dot)com <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> I noticed that I didn't add CHECK_FOR_INTERRUPTS while retrying send message.
> So, attach the new version which adds that. Also attach the 0004 patch that
> restarts logical replication with temporarily disabling the parallel apply if
> failed to apply a transaction in parallel apply worker.
>

Few comments on v48-0001
======================
1. The variable name pending_message_count seems to indicate a number
of pending messages but normally it is pending start/stop streams
except for probably rollback to savepoint case. Shall we name it
pending_stream_count and change the comments accordingly?

2. The variable name abort_toplevel_transaction seems unnecessarily
long. Shall we rename it to toplevel_xact or something like that?

3.
+ /*
+ * Increment the number of messages waiting to be processed by
+ * parallel apply worker.
+ */
+ if (!abort_toplevel_transaction)
+ pg_atomic_add_fetch_u32(&(winfo->shared->pending_message_count), 1);
+ else
+ pa_unlock_stream(xid, AccessExclusiveLock);

It is better to explain here why different actions are required for
subtransaction and transaction rather than the current comment.

4.
+
+ if (abort_toplevel_transaction)
+ {
+ (void) pa_free_worker(winfo, xid);
+ }

{} is not required here.

5.
/*
+ * Although the lock can be automatically released during transaction
+ * rollback, but we still release the lock here as we may not in a
+ * transaction.
+ */
+ pa_unlock_transaction(xid, AccessShareLock);
+

It is better to explain for which case (I think it is for empty xacts)
it will be useful to release it explicitly.

6.
+ *
+ * XXX We can avoid sending pairs of the START/STOP messages to the parallel
+ * worker because unlike apply worker it will process only one transaction at a
+ * time. However, it is not clear whether any optimization is worthwhile
+ * because these messages are sent only when the logical_decoding_work_mem
+ * threshold is exceeded.
*/
static void
apply_handle_stream_start(StringInfo s)

I think this comment is no longer valid as now we need to wait for the
next stream at stream_stop message and also need to acquire the lock
in stream_start message. So, I think it is better to remove it unless
I am missing something.

7. I am able to compile applyparallelworker.c by commenting few of the
header includes. Please check if those are really required.
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
//#include "mb/pg_wchar.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
//#include "replication/logicalworker.h"
#include "replication/origin.h"
//#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
//#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/inval.h"
#include "utils/memutils.h"
//#include "utils/resowner.h"
#include "utils/syscache.h"

8.
+/*
+ * Is there a message sent by parallel apply worker which we need to receive?
+ */
+volatile sig_atomic_t ParallelApplyMessagePending = false;

This comment and variable are placed in applyparallelworker.c, so 'we'
in the above sentence is not clear. I think you need to use leader
apply worker instead.

9.
+static ParallelApplyWorkerInfo *pa_get_free_worker(void);

Will it be better if we name this function pa_get_available_worker()?

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2022-11-18 02:27:41 Re: Fix some newly modified tab-complete changes
Previous Message Tom Lane 2022-11-18 01:36:50 Re: Optimize join selectivity estimation by not reading MCV stats for unique join attributes