Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>
Cc: Melih Mutlu <m(dot)melihmutlu(at)gmail(dot)com>, Melanie Plageman <melanieplageman(at)gmail(dot)com>, "Wei Wang (Fujitsu)" <wangw(dot)fnst(at)fujitsu(dot)com>, "Yu Shi (Fujitsu)" <shiy(dot)fnst(at)fujitsu(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, shveta malik <shveta(dot)malik(at)gmail(dot)com>
Subject: Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Date: 2023-07-07 09:37:54
Message-ID: CAHut+PtXUg33utEmrh=_=6_78JU20Ms1RBscusgBio2DLags-A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi. Here are some review comments for the patch v16-0001

======
Commit message.

1.
Also; most of the code shared by both worker types are already combined
in LogicalRepApplyLoop(). There is no need to combine the rest in
ApplyWorkerMain() anymore.

~

/are already/is already/

/Also;/Also,/

~~~

2.
This commit introduces TablesyncWorkerMain() as a new entry point for
tablesync workers and separates both type of workers from each other.
This aims to increase code readability and help to maintain logical
replication workers separately.

2a.
/This commit/This patch/

~

2b.
"and separates both type of workers from each other"

Maybe that part can all be removed. The following sentence says the
same again anyhow.

======
src/backend/replication/logical/worker.c

3.
static void stream_write_change(char action, StringInfo s);
static void stream_open_and_write_change(TransactionId xid, char
action, StringInfo s);
static void stream_close_file(void);
+static void set_stream_options(WalRcvStreamOptions *options,
+ char *slotname,
+ XLogRecPtr *origin_startpos);

~

Maybe a blank line was needed here because this static should not be
grouped with the other functions that are grouped for "Serialize and
deserialize changes for a toplevel transaction." comment.

~~~

4. set_stream_options

+ /* set_stream_options
+ * Set logical replication streaming options.
+ *
+ * This function sets streaming options including replication slot name and
+ * origin start position. Workers need these options for logical replication.
+ */
+static void
+set_stream_options(WalRcvStreamOptions *options,

The indentation is not right for this function comment.

~~~

5. set_stream_options

+ /*
+ * Even when the two_phase mode is requested by the user, it remains as
+ * the tri-state PENDING until all tablesyncs have reached READY state.
+ * Only then, can it become ENABLED.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
+ options->proto.logical.twophase = true;
+}

This part of the refactoring seems questionable...

IIUC this new function was extracted from code in originally in
function ApplyWorkerMain()

But in that original code, this fragment above was guarded by the condition
if (!am_tablesync_worker())

But now where is that condition? e.g. What is stopping tablesync
working from getting into this code it previously would not have
executed?

~~~

6.
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ pgstat_report_subscription_error(MySubscription->oid,
+ !am_tablesync_worker());

Does this change have anything to do with this patch? Is it a quirk of
running pg_indent?

~~~
7. run_tablesync_worker

Since the stated intent of the patch is the separation of apply and
tablesync workers then shouldn't this function belong in the
tablesync.c file?

~~~
8. run_tablesync_worker

+ * Runs the tablesync worker.
+ * It starts syncing tables. After a successful sync, sets streaming options
+ * and starts streaming to catchup.
+ */
+static void
+run_tablesync_worker(WalRcvStreamOptions *options,

Nicer to have a blank line after the first sentence of that function comment?

~~~
9. run_apply_worker

+/*
+ * Runs the leader apply worker.
+ * It sets up replication origin, streaming options and then starts streaming.
+ */
+static void
+run_apply_worker(WalRcvStreamOptions *options,

Nicer to have a blank line after the first sentence of that function comment?

~~~
10. InitializeLogRepWorker

+/*
+ * Common initialization for logical replication workers; leader apply worker,
+ * parallel apply worker and tablesync worker.
*
* Initialize the database connection, in-memory subscription and necessary
* config options.
*/
void
-InitializeApplyWorker(void)
+InitializeLogRepWorker(void)

typo:

/workers;/workers:/

~~~
11. TablesyncWorkerMain

Since the stated intent of the patch is the separation of apply and
tablesync workers then shouldn't this function belong in the
tablesync.c file?

======
src/include/replication/worker_internal.h

12.
#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)

+extern void finish_sync_worker(void);

~

I think the macro isParallelApplyWorker is associated with the am_XXX
inline functions that follow it, so it doesn’t seem the best place to
jam this extern in the middle of that.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message 蔡梦娟 (玊于) 2023-07-07 09:48:39 回复:The same 2PC data maybe recovered twice
Previous Message Kumar, Sachin 2023-07-07 09:16:01 RE: Initial Schema Sync for Logical Replication