RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication

From: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>
To: 'Peter Smith' <smithpb2250(at)gmail(dot)com>
Cc: Melih Mutlu <m(dot)melihmutlu(at)gmail(dot)com>, Melanie Plageman <melanieplageman(at)gmail(dot)com>, "Wei Wang (Fujitsu)" <wangw(dot)fnst(at)fujitsu(dot)com>, "Yu Shi (Fujitsu)" <shiy(dot)fnst(at)fujitsu(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, shveta malik <shveta(dot)malik(at)gmail(dot)com>
Subject: RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Date: 2023-07-13 04:12:49
Message-ID: TYAPR01MB58667CF499CA100B1454A489F537A@TYAPR01MB5866.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Dear Peter,

Thanks for reviewing! I'm not sure what should be, but I modified only my part - 0003.
PSA new patchset. Other patches were not changed.
(I attached till 0005 just in case, but I did not consider about 0004 and 0005)

> ======
> 1. Commit Message.
>
> The patch description is missing.

Briefly added.

> 2. General.
>
> +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos, int worker_slot)
>
> and
>
> +start_table_sync(XLogRecPtr *origin_startpos,
> + char **myslotname,
> + int worker_slot)
>
> and
>
> @@ -4548,12 +4552,13 @@ run_tablesync_worker(WalRcvStreamOptions
> *options,
> char *slotname,
> char *originname,
> int originname_size,
> - XLogRecPtr *origin_startpos)
> + XLogRecPtr *origin_startpos,
> + int worker_slot)
>
>
> It seems the worker_slot is being passed all over the place as an
> additional function argument so that it can be used to construct an
> application_name. Is it possible/better to introduce a new
> 'MyLogicalRepWorker' field for the 'worker_slot' so it does not have
> to be passed like this?

I'm not sure it should be, but I did. How do you think?

> src/backend/replication/logical/tablesync.c
>
> 3.
> + /*
> + * Disconnect from publisher. Otherwise reused sync workers causes
> + * exceeding max_wal_senders.
> + */
> + if (LogRepWorkerWalRcvConn != NULL)
> + {
> + walrcv_disconnect(LogRepWorkerWalRcvConn);
> + LogRepWorkerWalRcvConn = NULL;
> + }
> +
>
> Why is this comment mentioning anything about "reused workers" at all?
> The worker process exits in this function, right?

I considered that code again, and I found this part is not needed anymore.

Initially it was added in 0002, this is because workers established new connections
without exiting and walsenders on publisher might be remained. So This was correct
for 0002 patch.
But now, in 0003 patch, workers reuse connections, which means that no need to call
walrcv_disconnect() explicitly. It is done when processes are exit.

> 4. LogicalRepSyncTableStart
>
> /*
> - * Here we use the slot name instead of the subscription name as the
> - * application_name, so that it is different from the leader apply worker,
> - * so that synchronous replication can distinguish them.
> + * Connect to publisher if not yet. The application_name must be also
> + * different from the leader apply worker because synchronous replication
> + * must distinguish them.
> */
>
> I felt all the details in the 2nd part of this comment belong inside
> the condition, not outside.
>
> SUGGESTION
> /* Connect to the publisher if haven't done so already. */

Changed.

> 5.
> + if (LogRepWorkerWalRcvConn == NULL)
> + {
> + char application_name[NAMEDATALEN];
> +
> + /*
> + * FIXME: set appropriate application_name. Previously, the slot name
> + * was used becasue the lifetime of the tablesync worker was same as
> + * that, but now the tablesync worker handles many slots during the
> + * synchronization so that it is not suitable. So what should be?
> + * Note that if the tablesync worker starts to reuse the replication
> + * slot during synchronization, we should use the slot name as
> + * application_name again.
> + */
> + snprintf(application_name, NAMEDATALEN, "pg_%u_sync_%i",
> + MySubscription->oid, worker_slot);
> + LogRepWorkerWalRcvConn =
> + walrcv_connect(MySubscription->conninfo, true,
> + must_use_password,
> + application_name, &err);
> + }
>
> 5a.
> /becasue/because/

Modified. Also, comments were moved atop ApplicationNameForTablesync.
I was not sure when it is removed, but I kept it.

>
> 5b.
> I am not sure about what name this should ideally use, but anyway for
> uniqueness doesn't it still need to include the GetSystemIdentifier()
> same as function ReplicationSlotNameForTablesync() was doing?
>
> Maybe this can use the same function ReplicationSlotNameForTablesync()
> can be used but just pass the worker_slot instead of the relid?

Good point. ApplicationNameForTablesync() was defined and used.

> src/backend/replication/logical/worker.c
>
> 6. LogicalRepApplyLoop
>
> /*
> * Init the ApplyMessageContext which we clean up after each replication
> - * protocol message.
> + * protocol message, if needed.
> */
> - ApplyMessageContext = AllocSetContextCreate(ApplyContext,
> - "ApplyMessageContext",
> - ALLOCSET_DEFAULT_SIZES);
> + if (!ApplyMessageContext)
> + ApplyMessageContext = AllocSetContextCreate(ApplyContext,
> + "ApplyMessageContext",
> +
>
> Maybe slightly reword the comment.
>
> BEFORE:
> Init the ApplyMessageContext which we clean up after each replication
> protocol message, if needed.
>
> AFTER:
> Init the ApplyMessageContext if needed. This context is cleaned up
> after each replication protocol message.

Changed.

> src/backend/replication/walsender.c
>
> 7.
> + /*
> + * Initialize the flag again because this streaming may be
> + * second time.
> + */
> + streamingDoneSending = streamingDoneReceiving = false;
>
> Isn't this only possible to be 2nd time because the "reuse tablesync
> worker" might re-issue a START_REPLICATION again to the same
> WALSender? So, should this flag reset ONLY be done for the logical
> replication ('else' part), otherwise it should be asserted false?
>
> e.g. Would it be better to be like this?
>
> if (cmd->kind == REPLICATION_KIND_PHYSICAL)
> {
> Assert(!streamingDoneSending && !streamingDoneReceiving)
> StartReplication(cmd);
> }
> else
> {
> /* Reset flags because reusing tablesync workers can mean this is the
> second time here. */
> streamingDoneSending = streamingDoneReceiving = false;
> StartLogicalReplication(cmd);
> }
>

It's OK to modify the comment. But after considering more, I started to think that
any specification for physical replication should not be changed.
So I accepted comments only for the logical rep.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

Attachment Content-Type Size
v17-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch application/octet-stream 21.4 KB
v17-0002-Reuse-Tablesync-Workers.patch application/octet-stream 10.7 KB
v17-0003-Reuse-connection-when-tablesync-workers-change-t.patch application/octet-stream 7.2 KB
v17-0004-Add-replication-protocol-cmd-to-create-a-snapsho.patch application/octet-stream 21.1 KB
v17-0005-Reuse-Replication-Slot-and-Origin-in-Tablesync.patch application/octet-stream 55.0 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Ashutosh Bapat 2023-07-13 04:14:34 Re: numeric datatype for current release not available
Previous Message Hayato Kuroda (Fujitsu) 2023-07-13 04:09:12 RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication