Re: Logical Replication of sequences

From: Chao Li <li(dot)evan(dot)chao(at)gmail(dot)com>
To: vignesh C <vignesh21(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, shveta malik <shveta(dot)malik(at)gmail(dot)com>, Shlok Kyal <shlok(dot)kyal(dot)oss(at)gmail(dot)com>, Nisha Moond <nisha(dot)moond412(at)gmail(dot)com>, Peter Eisentraut <peter(at)eisentraut(dot)org>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Euler Taveira <euler(at)eulerto(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, "Jonathan S(dot) Katz" <jkatz(at)postgresql(dot)org>
Subject: Re: Logical Replication of sequences
Date: 2025-10-27 09:11:46
Message-ID: BABE39BA-6893-4D65-8432-5523960FFC2B@gmail.com
Views: Whole Thread | Raw Message | Download mbox | Resend email
Thread:
Lists: pgsql-hackers


> On Oct 24, 2025, at 23:22, vignesh C <vignesh21(at)gmail(dot)com> wrote:
>
> Regards,
> Vignesh
> <v20251024-0001-Rename-sync_error_count-to-tbl_sync_error_.patch><v20251024-0002-Add-worker-type-argument-to-logicalrep_wor.patch><v20251024-0003-New-worker-for-sequence-synchronization-du.patch><v20251024-0004-Documentation-for-sequence-synchronization.patch>

The changes in 0001 are straightforward, looks good. I haven’t reviewed 0004 yet. Got a few comments for 0002 and 0003.

1 - 0002
```
* We are only interested in the leader apply worker or table sync worker.
+ * For apply workers, the relid should be set to InvalidOid, as they manage
+ * changes across all tables and sequences. For table sync workers, the relid
+ * should be set to the OID of the relation being synchronized.
*/
LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype,
+ bool only_running)
{
int i;
LogicalRepWorker *res = NULL;

Assert(LWLockHeldByMe(LogicalRepWorkerLock));
```

The comment says that “for apply workers, the relid should be set to InvalidOid”, so is it worthy adding an assert for that?

2 - 0002
```
- /* Search for attached worker for a given subscription id. */
+ /* Search for the attached worker matching the specified criteria. */
for (i = 0; i < max_logical_replication_workers; i++)
{
```

Minor issue with the comment:

* we are not search for a specific work, so “the” should be “a”
* “attached” is confusing. In the old comment, ‘attached” tied to “a given subscription id”, but now, attach to what?

So suggested revision:

“/* Search for a logical replication worker matching the specified criteria */”

3 - 0002
```
/*
* Stop the logical replication worker for subid/relid, if any.
+ *
+ * Similar to logicalrep_worker_find, relid should be set to a valid OID only
+ * for table sync workers.
*/
void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype)
```

The comment should be updated: subid/relid => subid/relid/wtype.

4 - 0002
```
@@ -477,7 +477,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);

syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
- rstate->relid, false);
+ rstate->relid,
+ WORKERTYPE_TABLESYNC, true);
```

Why changed only_running from false to true? This commit adds a new worker type, but don’t tend to change the existing logic.

5 - 0003
```
+/*
+ * Reset the last_seqsync_start_time of the sequencesync worker in the
+ * subscription's apply worker.
+ */
+void
+logicalrep_reset_seqsync_start_time(void)
+{
+ LogicalRepWorker *worker;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /*
+ * Set the last_seqsync_start_time for the sequence worker in the apply
+ * worker instead of the sequence sync worker, as the sequence sync worker
+ * has finished and is about to exit.
+ */
+ worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid,
+ WORKERTYPE_APPLY, true);
+ if (worker)
+ worker->last_seqsync_start_time = 0;
+
+ LWLockRelease(LogicalRepWorkerLock);
+}
```

Two comments for this new function:

* The function comment and in-code comment are redundant. Suggesting move the in-code comment to function comment.
* Why LW_SHARED is used? We are writing worker->last_seqsync_start_time, shouldn’t LW_EXCLUSIVE be used?

6 - 0003
```
+ /*
+ * Count running sync workers for this subscription, while we have the
+ * lock.
+ */
+ nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ launch_sync_worker(nsyncworkers, InvalidOid,
+ &MyLogicalRepWorker->last_seqsync_start_time);
```

I think here could be a race condition. Because the lock is acquired in LW_SHARED, meaning multiple caller may get the same nsyncworkers. Then it launches sync worker based on nsyncworkers, which would use inaccurate nsyncworkers, because between LWLockRelease() and launch_sync_worker(), another worker might be started.

But if that is not the case, only one caller should call ProcessSyncingSequencesForApply(), then why the lock is needed?

7 - 0003
```
+ if (insuffperm_seqs->len)
+ {
+ appendStringInfo(combined_error_detail, "Insufficient permission for sequence(s): (%s)",
+ insuffperm_seqs->data);
+ appendStringInfoString(combined_error_hint, "Grant permissions for the sequence(s).");
+ }
```

“Grant permissions” is unclear. Should it be “Grant UPDATE privilege”?

8 - 0003
```
+ appendStringInfoString(combined_error_hint, " For mismatched sequences, alter or re-create local sequences to have matching parameters as publishers.");
```

“To have matching parameters as publishers” grammatically sound not good. Maybe revision to “to match the publisher’s parameters”.

9 - 0003
```
+ /*
+ * current_indexes is not incremented sequentially because some
+ * sequences may be missing, and the number of fetched rows may not
+ * match the batch size. The `hash_search` with HASH_REMOVE takes care
+ * of the count.
+ */
```

Typo: current_indexes => current_index

10 - 0003
```
- /* Find the leader apply worker and signal it. */
- logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+ /*
+ * This is a clean exit of the sequencesync worker; reset the
+ * last_seqsync_start_time.
+ */
+ if (wtype == WORKERTYPE_SEQUENCESYNC)
+ logicalrep_reset_seqsync_start_time();
+ else
+ /* Find the leader apply worker and signal it. */
+ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
```

The comment “this is a clean exist of sequencsync worker” is specific to “if”, so suggesting moving into “if”. And “this is a clean exis of the sequencesyc worker” is not needed, keep consistent with the comment in “else”.

11 - 0003
```
+void
+launch_sync_worker(int nsyncworkers, Oid relid, TimestampTz *last_start_time)
+{
+ /* If there is a free sync worker slot, start a new sync worker */
+ if (nsyncworkers < max_sync_workers_per_subscription)
+ {
```

The entire function is under an “if”, so we can do “if (!…) return”, so saves a level of indent.

Best regards,

Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Fujii Masao 2025-10-27 09:13:53 Re: Suggestion to add --continue-client-on-abort option to pgbench
Previous Message shveta malik 2025-10-27 08:42:33 Re: Logical Replication of sequences