RE: Slow catchup of 2PC (twophase) transactions on replica in LR

From: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>
To: 'Peter Smith' <smithpb2250(at)gmail(dot)com>
Cc: "pgsql-hackers(at)lists(dot)postgresql(dot)org" <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Ajin Cherian <itsajin(at)gmail(dot)com>, Давыдов Виталий <v(dot)davydov(at)postgrespro(dot)ru>
Subject: RE: Slow catchup of 2PC (twophase) transactions on replica in LR
Date: 2024-05-08 08:26:42
Message-ID: OSBPR01MB2552521ABDBC2F995FBACDF7F5E52@OSBPR01MB2552.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Dear Peter,

Thanks for reviewing! Here are updated patches.
I updated patches only for HEAD.

> ======
> Commit message
>
> 1.
> This patch allows user to alter two_phase option
>
> /allows user/allows the user/
>
> /to alter two_phase option/to alter the 'two_phase' option/

Fixed.

> ======
> doc/src/sgml/ref/alter_subscription.sgml
>
> 2.
> <literal>two_phase</literal> can be altered only for disabled subscription.
>
> SUGGEST
> The <literal>two_phase</literal> parameter can only be altered when
> the subscription is disabled.

Fixed.

> ======
> src/backend/access/transam/twophase.c
>
> 3. checkGid
> +
> +/*
> + * checkGid
> + */
> +static bool
> +checkGid(char *gid, Oid subid)
> +{
> + int ret;
> + Oid subid_written,
> + xid;
> +
> + ret = sscanf(gid, "pg_gid_%u_%u", &subid_written, &xid);
> +
> + if (ret != 2 || subid != subid_written)
> + return false;
> +
> + return true;
> +}
>
> 3a.
> The function comment should give more explanation of what it does. I
> think this function is the counterpart of the TwoPhaseTransactionGid()
> function of worker.c so the comment can say that too.

Comments were updated.

> 3b.
> Indeed, perhaps the function name should be similar to
> TwoPhaseTransactionGid. e.g. call it like
> IsTwoPhaseTransactionGidForSubid?

Replaced to IsTwoPhaseTransactionGidForSubid().

> 3c.
> Probably 'xid' should be TransactionId instead of Oid.

Right, fixed.

> 3d.
> Why not have a single return?
>
> SUGGESTION
> return (ret == 2 && subid = subid_written);

Fixed.

> 3e.
> I am wondering if the existing TwoPhaseTransactionGid function
> currently in worker.c should be moved here because IMO these 2
> functions belong together and twophase.c seems the right place to put
> them.

+1, moved.

> ~~~
>
> 4.
> +LookupGXactBySubid(Oid subid)
> +{
> + bool found = false;
> +
> + LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
> + for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
> + {
> + GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
> +
> + /* Ignore not-yet-valid GIDs. */
> + if (gxact->valid && checkGid(gxact->gid, subid))
> + {
> + found = true;
> + break;
> + }
> + }
> + LWLockRelease(TwoPhaseStateLock);
> + return found;
> +}
>
> AFAIK the gxact also has the 'xid' available, so won't it be better to
> pass BOTH the 'xid' and 'subid' to the checkGid so you can do a full
> comparison instead of comparing only the subid part of the gid?

IIUC, the xid written in the gxact means the transaction id on the subscriber,
but formatted GID has xid on the publisher. So the value cannot be used.

> ======
> src/backend/commands/subscriptioncmds.c
>
> 5. AlterSubscription
>
> + /* XXX */
> + if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
> + {
>
> The "XXX" comment looks like it is meant to say something more...

This flag was used only for me, removed.

> ~~~
>
> 6.
> + /*
> + * two_phase can be only changed for disabled
> + * subscriptions
> + */
> + if (form->subenabled)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("cannot set %s for enabled subscription",
> + "two_phase")));
>
> 6a.
> Should this have a more comprehensive comment giving the reason like
> the 'failover' option has?

Modified, but it is almost the same as failover's one.

> 6b.
> Maybe this should include a "translator" comment to say don't
> translate the option name.

Hmm, but other parts in AlterSubscription() does not have.
For now, I kept current style.

> ~~~
>
> 7.
> + /* Check whether the number of prepared transactions */
> + if (!opts.twophase &&
> + form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED
> &&
> + LookupGXactBySubid(subid))
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("cannot disable two_phase when uncommitted prepared
> transactions present")));
> +
>
> 7a.
> The first comment seems to be an incomplete sentence. I think it
> should say something a bit like:
> two_phase cannot be disabled if there are any uncommitted prepared
> transactions present.

Modified, but this part would be replaced by upcoming patches.

> 7b.
> Also, if ereport occurs what is the user supposed to do about it?
> Shouldn't the ereport include some errhint with appropriate advice?

The hint was added, but this part would be replaced by upcoming patches.

> ~~~
>
> 8.
> + /*
> + * The changed failover option of the slot can't be rolled
> + * back.
> + */
> + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET
> (two_phase)");
> +
> + /* Change system catalog acoordingly */
> + values[Anum_pg_subscription_subtwophasestate - 1] =
> + CharGetDatum(opts.twophase ?
> + LOGICALREP_TWOPHASE_STATE_PENDING :
> + LOGICALREP_TWOPHASE_STATE_DISABLED);
> + replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
> + }
>
> Typo I think: /failover option/two_phase option/

Right, fixed.

> ======
> .../libpqwalreceiver/libpqwalreceiver.c
>
> 9.
> static void
> libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
> - bool failover)
> + bool two_phase, bool failover)
>
> Same comment as mentioned elsewhere (#15), IMO the new 'two_phase'
> parameter should be last.

Fixed. Also, some ordering of declarations and if-blocks were also changed.
In later part, I did not reply similar comments but I addressed all of them.

> ======
> src/backend/replication/logical/launcher.c
>
> 10.
> +/*
> + * Stop all the subscription workers.
> + */
> +void
> +logicalrep_workers_stop(Oid subid)
> +{
> + List *subworkers;
> + ListCell *lc;
> +
> + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> + subworkers = logicalrep_workers_find(subid, false);
> + LWLockRelease(LogicalRepWorkerLock);
> + foreach(lc, subworkers)
> + {
> + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
> +
> + logicalrep_worker_stop(w->subid, w->relid);
> + }
> + list_free(subworkers);
> +}
>
> I was confused by the logicalrep_workers_find(subid, false). IIUC the
> 'false' means everything (instead of 'only_running') but then I don't
> know why we want to "stop" anything that is NOT running. OTOH I see
> that this code was extracted from where it was previously inlined in
> subscriptioncmds.c, so maybe the 'false' is necessary for another
> reason? At least maybe some explanatory comment is needed for why you
> are passing this flag as false?

Sorry, let me give time for more investigation around here. For now,
I added "XXX" mark.
I think it is listed just in case, but there may be a timing issue.

> ======
> src/backend/replication/logical/worker.c
>
> 11.
> - /* two-phase should not be altered */
> + /* two-phase should not be altered while the worker exists */
> Assert(newsub->twophasestate == MySubscription->twophasestate);
> /should not/cannot/

Fixed.

> ~~~
>
> 13.
> + if (MyReplicationSlot->data.two_phase != two_phase)
> + {
> + SpinLockAcquire(&MyReplicationSlot->mutex);
> + MyReplicationSlot->data.two_phase = two_phase;
> + SpinLockRelease(&MyReplicationSlot->mutex);
> +
> + update_slot = true;
> + }
> +
> +
> if (MyReplicationSlot->data.failover != failover)
> {
> SpinLockAcquire(&MyReplicationSlot->mutex);
> MyReplicationSlot->data.failover = failover;
> SpinLockRelease(&MyReplicationSlot->mutex);
>
> + update_slot = true;
> + }
>
> 13a.
> Doesn't it make more sense for the whole check/set to be "atomic",
> i.e. put the mutex also around the check?
>
> SUGGEST
> SpinLockAcquire(&MyReplicationSlot->mutex);
> if (MyReplicationSlot->data.two_phase != two_phase)
> {
> MyReplicationSlot->data.two_phase = two_phase;
> update_slot = true;
> }
> SpinLockRelease(&MyReplicationSlot->mutex);
>
> ~
>
> Also, (if you agree with the above) why not include both checks
> (two_phase and failover) within the same mutex instead of
> acquiring/releasing it twice:
>
> SUGGEST
> SpinLockAcquire(&MyReplicationSlot->mutex);
> if (MyReplicationSlot->data.two_phase != two_phase)
> {
> MyReplicationSlot->data.two_phase = two_phase;
> update_slot = true;
> }
> if (MyReplicationSlot->data.failover != failover)
> {
> MyReplicationSlot->data.failover = failover;
> update_slot = true;
> }
> SpinLockAcquire(&MyReplicationSlot->mutex);

Hmm. According to comments atop ReplicationSlot, backends which own the slot do
not have to set mutex for reading attributes. Concurrent backends, which do not
acquire the slot, must set the mutex lock before the read. Based on the manner,
I want to keep current style.

```
* - Individual fields are protected by mutex where only the backend owning
* the slot is authorized to update the fields from its own slot. The
* backend owning the slot does not need to take this lock when reading its
* own fields, while concurrent backends not owning this slot should take the
* lock when reading this slot's data.
*/
typedef struct ReplicationSlot
```

> 13b.
> There are double blank lines after the first if-block.

Removed.

> ======
> src/test/regress/sql/subscription.sql
>
> 16.
> I know you do this already in the TAP test, but doesn't the test case
> to demonstrate that 'two-phase' option can be altered when the
> subscription is disabled actually belong here in the regression
> instead?

Actually it cannot be done at main regression test. Because altering two_phase
requires the connection between pub/sub, but it is not established in subscription.sql
file. Succeeded case for altering failover has not been tested neither, and
I think they have same reason.

> src/test/subscription/t/021_twophase.pl
>
> 17.
> +# Disable the subscription and alter it to two_phase = false,
> +# verify that the altered subscription reflects the two_phase option.
>
> /verify/then verify/

Fixed.

> 18.
> +# Now do a prepare on publisher and make sure that it is not replicated.
> +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
> +$node_publisher->safe_psql(
> + 'postgres', qq{
> + BEGIN;
> + INSERT INTO tab_copy VALUES (100);
> + PREPARE TRANSACTION 'newgid';
> + });
> +
>
> 18a.
> /on publisher/on the publisher/

Fixed.

> 18b.
> What is that "DROP SUBSCRIPTION tap_sub" doing here? It seems
> misplaced under this comment.

The subscription must be dropped because it also prepares a transaction.
Moved before the test case and added comments.

> 19.
> +# Make sure that there is 0 prepared transaction on the subscriber
> +$result = $node_subscriber->safe_psql('postgres',
> + "SELECT count(*) FROM pg_prepared_xacts;");
> +is($result, qq(0), 'transaction is prepared on subscriber');
>
> 19a.
> SUGGESTION
> Make sure there are no prepared transactions on the subscriber

Fixed.

> 19b.
> /'transaction is prepared on subscriber'/'should be no prepared
> transactions on subscriber'/

Replaced/

> 20.
> +# Made sure that the commited transaction is replicated.
>
> /Made sure/Make sure/
>
> /commited/committed/

Fixed.

> 21.
> +# Make sure that the two-phase is enabled on the subscriber
> +$result = $node_subscriber->safe_psql('postgres',
> + "SELECT subtwophasestate FROM pg_subscription WHERE subname =
> 'tap_sub_copy';"
> +);
> +is($result, qq(e), 'two-phase is disabled');
>
> The 'two-phase is disabled' is the identical message used in the
> opposite case earlier, so something is amiss. Maybe this one should
> say 'two-phase should be enabled' and the earlier counterpart should
> say 'two-phase should be disabled'.

Both of them were fixed.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED
https://www.fujitsu.com/

Attachment Content-Type Size
v7-0001-Allow-altering-of-two_phase-option-of-a-SUBSCRIPT.patch application/octet-stream 25.5 KB
v7-0002-Alter-slot-option-two_phase-only-when-altering-tr.patch application/octet-stream 8.7 KB
v7-0003-Abort-prepared-transactions-while-altering-two_ph.patch application/octet-stream 7.4 KB
v7-0004-Add-force_alter-option.patch application/octet-stream 8.2 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Richard Guo 2024-05-08 09:01:51 Re: A problem about partitionwise join
Previous Message Peter Eisentraut 2024-05-08 08:09:46 Re: add --no-sync to pg_upgrade's calls to pg_dump and pg_dumpall