Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>
Cc: Melih Mutlu <m(dot)melihmutlu(at)gmail(dot)com>, Melanie Plageman <melanieplageman(at)gmail(dot)com>, "Wei Wang (Fujitsu)" <wangw(dot)fnst(at)fujitsu(dot)com>, "Yu Shi (Fujitsu)" <shiy(dot)fnst(at)fujitsu(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, shveta malik <shveta(dot)malik(at)gmail(dot)com>
Subject: Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Date: 2023-07-11 02:36:09
Message-ID: CAHut+PvkwVfmuMsHmZgA_ZAgMP977_gbRG5TNUg1aUGBMvwapg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are some review comments for patch v16-00003

======
1. Commit Message.

The patch description is missing.

======
2. General.

+LogicalRepSyncTableStart(XLogRecPtr *origin_startpos, int worker_slot)

and

+start_table_sync(XLogRecPtr *origin_startpos,
+ char **myslotname,
+ int worker_slot)

and

@@ -4548,12 +4552,13 @@ run_tablesync_worker(WalRcvStreamOptions *options,
char *slotname,
char *originname,
int originname_size,
- XLogRecPtr *origin_startpos)
+ XLogRecPtr *origin_startpos,
+ int worker_slot)

It seems the worker_slot is being passed all over the place as an
additional function argument so that it can be used to construct an
application_name. Is it possible/better to introduce a new
'MyLogicalRepWorker' field for the 'worker_slot' so it does not have
to be passed like this?

======
src/backend/replication/logical/tablesync.c

3.
+ /*
+ * Disconnect from publisher. Otherwise reused sync workers causes
+ * exceeding max_wal_senders.
+ */
+ if (LogRepWorkerWalRcvConn != NULL)
+ {
+ walrcv_disconnect(LogRepWorkerWalRcvConn);
+ LogRepWorkerWalRcvConn = NULL;
+ }
+

Why is this comment mentioning anything about "reused workers" at all?
The worker process exits in this function, right?

~~~

4. LogicalRepSyncTableStart

/*
- * Here we use the slot name instead of the subscription name as the
- * application_name, so that it is different from the leader apply worker,
- * so that synchronous replication can distinguish them.
+ * Connect to publisher if not yet. The application_name must be also
+ * different from the leader apply worker because synchronous replication
+ * must distinguish them.
*/

I felt all the details in the 2nd part of this comment belong inside
the condition, not outside.

SUGGESTION
/* Connect to the publisher if haven't done so already. */

~~~

5.
+ if (LogRepWorkerWalRcvConn == NULL)
+ {
+ char application_name[NAMEDATALEN];
+
+ /*
+ * FIXME: set appropriate application_name. Previously, the slot name
+ * was used becasue the lifetime of the tablesync worker was same as
+ * that, but now the tablesync worker handles many slots during the
+ * synchronization so that it is not suitable. So what should be?
+ * Note that if the tablesync worker starts to reuse the replication
+ * slot during synchronization, we should use the slot name as
+ * application_name again.
+ */
+ snprintf(application_name, NAMEDATALEN, "pg_%u_sync_%i",
+ MySubscription->oid, worker_slot);
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true,
+ must_use_password,
+ application_name, &err);
+ }

5a.
/becasue/because/

~

5b.
I am not sure about what name this should ideally use, but anyway for
uniqueness doesn't it still need to include the GetSystemIdentifier()
same as function ReplicationSlotNameForTablesync() was doing?

Maybe this can use the same function ReplicationSlotNameForTablesync()
can be used but just pass the worker_slot instead of the relid?

======
src/backend/replication/logical/worker.c

6. LogicalRepApplyLoop

/*
* Init the ApplyMessageContext which we clean up after each replication
- * protocol message.
+ * protocol message, if needed.
*/
- ApplyMessageContext = AllocSetContextCreate(ApplyContext,
- "ApplyMessageContext",
- ALLOCSET_DEFAULT_SIZES);
+ if (!ApplyMessageContext)
+ ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+ "ApplyMessageContext",
+

Maybe slightly reword the comment.

BEFORE:
Init the ApplyMessageContext which we clean up after each replication
protocol message, if needed.

AFTER:
Init the ApplyMessageContext if needed. This context is cleaned up
after each replication protocol message.

======
src/backend/replication/walsender.c

7.
+ /*
+ * Initialize the flag again because this streaming may be
+ * second time.
+ */
+ streamingDoneSending = streamingDoneReceiving = false;

Isn't this only possible to be 2nd time because the "reuse tablesync
worker" might re-issue a START_REPLICATION again to the same
WALSender? So, should this flag reset ONLY be done for the logical
replication ('else' part), otherwise it should be asserted false?

e.g. Would it be better to be like this?

if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{
Assert(!streamingDoneSending && !streamingDoneReceiving)
StartReplication(cmd);
}
else
{
/* Reset flags because reusing tablesync workers can mean this is the
second time here. */
streamingDoneSending = streamingDoneReceiving = false;
StartLogicalReplication(cmd);
}

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Peter Smith 2023-07-11 02:59:24 Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Previous Message suyu.cmj 2023-07-11 02:35:15 Got FATAL in lock_twophase_recover() during recovery