Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

From: vignesh C <vignesh21(at)gmail(dot)com>
To: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Melih Mutlu <m(dot)melihmutlu(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Melanie Plageman <melanieplageman(at)gmail(dot)com>, "Wei Wang (Fujitsu)" <wangw(dot)fnst(at)fujitsu(dot)com>, "Yu Shi (Fujitsu)" <shiy(dot)fnst(at)fujitsu(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, shveta malik <shveta(dot)malik(at)gmail(dot)com>
Subject: Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication
Date: 2023-07-03 11:49:50
Message-ID: CALDaNm2mFqHrFYHoBKu1_muWJcozEcNfjtNx5uTsZkun0PF74g@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, 28 Jun 2023 at 12:02, Hayato Kuroda (Fujitsu)
<kuroda(dot)hayato(at)fujitsu(dot)com> wrote:
>
> Dear Amit,
>
> > > > This actually makes sense. I quickly try to do that without adding any
> > > > new replication message. As you would expect, it did not work.
> > > > I don't really know what's needed to make a connection to last for
> > > > more than one iteration. Need to look into this. Happy to hear any
> > > > suggestions and thoughts.
> > >
> >
> > It is not clear to me what exactly you tried here which didn't work.
> > Can you please explain a bit more?
>
> Just to confirm, this is not my part. Melih can answer this...
>
> > > I have analyzed how we handle this. Please see attached the patch (0003) which
> > > allows reusing connection.
> > >
> >
> > Why did you change the application name during the connection?
>
> It was because the lifetime of tablesync worker is longer than slots's one and
> tablesync worker creates temporary replication slots many times, per the target
> relation. The name of each slots has relid, so I thought that it was not suitable.
> But in the later patch the tablesync worker tries to reuse the slot during the
> synchronization, so in this case the application_name should be same as slotname.
>
> I added comment in 0003, and new file 0006 file to use slot name as application_name
> again. Note again that the separation was just for specifying changes, Melih can
> include them to one part of files if needed.

Few comments:
1) Should these error messages say "Could not create a snapshot by
replication slot":
+ if (!pubnames_str)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
/* likely guess */
+ errmsg("could not start WAL streaming: %s",
+
pchomp(PQerrorMessage(conn->streamConn)))));
+ pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
+
strlen(pubnames_str));
+ if (!pubnames_literal)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
/* likely guess */
+ errmsg("could not start WAL streaming: %s",
+
pchomp(PQerrorMessage(conn->streamConn)))));
+ appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
+ PQfreemem(pubnames_literal);
+ pfree(pubnames_str);

2) These checks are present in CreateReplicationSlot too, can we have
a common function to check these for both CreateReplicationSlot and
CreateReplicationSnapshot:
+ if (!IsTransactionBlock())
+ ereport(ERROR,
+ (errmsg("%s must be called inside a
transaction",
+
"CREATE_REPLICATION_SNAPSHOT ...")));
+
+ if (XactIsoLevel != XACT_REPEATABLE_READ)
+ ereport(ERROR,
+ (errmsg("%s must be called in
REPEATABLE READ isolation mode transaction",
+
"CREATE_REPLICATION_SNAPSHOT ...")));
+
+ if (!XactReadOnly)
+ ereport(ERROR,
+ (errmsg("%s must be called in a read
only transaction",
+
"CREATE_REPLICATION_SNAPSHOT ...")));
+
+ if (FirstSnapshotSet)
+ ereport(ERROR,
+ (errmsg("%s must be called before any query",
+
"CREATE_REPLICATION_SNAPSHOT ...")));
+
+ if (IsSubTransaction())
+ ereport(ERROR,
+ (errmsg("%s must not be called in a
subtransaction",
+
"CREATE_REPLICATION_SNAPSHOT ...")));

3) Probably we can add the function header at this point of time:
+/*
+ * TODO
+ */
+static void
+libpqrcv_slot_snapshot(WalReceiverConn *conn,
+ char *slotname,
+ const WalRcvStreamOptions *options,
+ XLogRecPtr *lsn)

4) Either or relation name or relid should be sufficient here, no need
to print both:
StartTransactionCommand();
+ ereport(LOG,
+ (errmsg("%s
for subscription \"%s\" has moved to sync table \"%s\" with relid
%u.",
+
get_worker_name(),
+
MySubscription->name,
+
get_rel_name(MyLogicalRepWorker->relid),
+
MyLogicalRepWorker->relid)));
+ CommitTransactionCommand();

5) Why is this check of logicalrep_worker_find is required required,
will it not be sufficient to pick the relations that are in
SUBREL_STATE_INIT state?
+ /*
+ * Pick the table for the next run if
it is not already picked up
+ * by another worker.
+ *
+ * Take exclusive lock to prevent any
other sync worker from picking
+ * the same table.
+ */
+ LWLockAcquire(LogicalRepWorkerLock,
LW_EXCLUSIVE);
+ if (rstate->state != SUBREL_STATE_SYNCDONE &&
+
!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
+ {
+ /* Update worker state for the
next table */

6) This comment is missed while refactoring:
- /* Build logical replication streaming options. */
- options.logical = true;
- options.startpoint = origin_startpos;
- options.slotname = myslotname;

7) We could keep twophase and origin as the same order as it was
earlier so that it is easy to review that the existing code is kept as
is in this case:
+ options->proto.logical.publication_names = MySubscription->publications;
+ options->proto.logical.binary = MySubscription->binary;
+ options->proto.logical.twophase = false;
+ options->proto.logical.origin = pstrdup(MySubscription->origin);
+
+ /*
+ * Assign the appropriate option value for streaming option according to
+ * the 'streaming' mode and the publisher's ability to support
that mode.
+ */
+ if (server_version >= 160000 &&

8) There are few indentation issues, we could run pgindent once:
8.a)
+ /* Sync worker has completed synchronization of the
current table. */
+ MyLogicalRepWorker->is_sync_completed = true;
+
+ ereport(LOG,
+ (errmsg("logical replication table synchronization
worker for subscription \"%s\", relation \"%s\" with relid %u has
finished",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid),
+ MyLogicalRepWorker->relid)));
+ CommitTransactionCommand();

8.b)
+ ereport(DEBUG2,
+ (errmsg("process_syncing_tables_for_sync:
updated originname: %s, slotname: %s, state: %c for relation \"%u\" in
subscription \"%u\".",
+ "NULL", "NULL",
MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relid,
MyLogicalRepWorker->subid)));
+ CommitTransactionCommand();
+ pgstat_report_stat(false);

Regards,
Vignesh

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message jian he 2023-07-03 11:52:23 Re: Cleaning up array_in()
Previous Message Heikki Linnakangas 2023-07-03 11:37:15 Re: Refactor ssl tests to avoid using internal PostgreSQL::Test::Cluster methods