Re: [HACKERS] logical decoding of two-phase transactions

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Ajin Cherian <itsajin(at)gmail(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: [HACKERS] logical decoding of two-phase transactions
Date: 2021-03-16 11:33:05
Message-ID: CAA4eK1Kwah+MimFMR3jPY5cSqpGFVh5zfV2g4=gTphaPsacoLw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Mar 15, 2021 at 6:14 PM Ajin Cherian <itsajin(at)gmail(dot)com> wrote:
>
> On Mon, Mar 15, 2021 at 2:04 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>>
>> I think something on these lines should be much
>> easier than the spool-file implementation unless we see any problem
>> with this idea.
>>
>
> Here's a new patch-set that implements this new solution proposed by Amit.
> Patchset-v60 implements:
>

I have reviewed the latest patch and below are my comments, some of
these might overlap with Vignesh's as I haven't looked at his comments
in detail.
Review comments
================
1.
+ * And we also skip the FinishPreparedTransaction if we're using the
+ * Prepare Spoolfile (using_psf) because in that case there is no matching
+ * PrepareTransactionBlock done yet.
+ */
+ if (LookupGXact(rollback_data.gid, rollback_data.prepare_end_lsn,
+ rollback_data.preparetime))

The above comment is not required.

2.
While streaming and two_phase can theoretically be supported,
+ * the current implementation has some issues that could lead to a
+ * streaming prepared transaction to be incorrectly missed in the initial
+ * syncing phase. Hence, disabling this combination till that issue can
+ * be addressed.
+ */
+ if (twophase && *twophase_given && *twophase)

I don't think the above statement is correct as per the current patch.
We can something like: "While streaming and two_phase can
theoretically be supported, it needs more analysis to allow them
together." or something on those lines.

3.
-
- walrcv_create_slot(wrconn, slotname, false,
+ /*
+ * Even if two_phase is set, don't create the slot with two-phase
+ * enabled. Will enable it once all the tables are synced and ready.
+ * This avoids race-conditions that might occur during initial table-sync.
+ */
+ walrcv_create_slot(wrconn, slotname, false, false,
CRS_NOEXPORT_SNAPSHOT, NULL);

Can we please explain a bit more about race conditions due to which we
can enable two_phase only after initial sync?

4.
@@ -648,7 +703,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
InvalidXLogRecPtr);
ereport(DEBUG1,
(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
- rv->schemaname, rv->relname, sub->name)));
+ rv->schemaname, rv->relname, sub->name)));
..
..
@@ -722,9 +777,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)

ereport(DEBUG1,
(errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
- get_namespace_name(get_rel_namespace(relid)),
- get_rel_name(relid),
- sub->name)));
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name)));

Is there any reason for the above changes w.r.t this patch?

5.
+
+ /*
+ * The subscription two_phase commit implementation requires
+ * that replication has passed the initial table
+ * synchronization phase before the two_phase becomes properly
+ * enabled.
+ *
+ * But, having reached this two-phase commit "enabled" state we
+ * must not allow any subsequent table initialization to occur.
+ * So the ALTER SUBSCRIPTION ... REFRESH is disallowed when the
+ * the user had requested two_phase = on mode.

I suggest we expand the comments more here to specify what problem can
happen if we allow subsequent table initialization after the two_phase
is enabled for the subscription. Or you can point to comments atop
worker.c.

6.
@@ -526,6 +527,20 @@ CreateDecodingContext(XLogRecPtr start_lsn,
start_lsn = slot->data.confirmed_flush;
}

+ /*
+ * If starting with two_phase enabled then set two_phase_at point.
+ * Also update the slot to be two_phase enabled and save the slot
+ * to disk.
+ */
+ if (two_phase)
+ {
+ slot->data.two_phase_at = start_lsn;
+ slot->data.two_phase = true;
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+ }

Do we want to Assert that two_phase variables are not already set as
we don't want those to be reset?

7.
/*
- * We allow decoding of prepared transactions iff the two_phase option is
- * enabled at the time of slot creation.
+ * We allow decoding of prepared transactions if the two_phase option is
+ * enabled at the time of slot creation or at restart.
*/

In the above comments, there is no need to change iff to if. iff means
'if and only if' which makes sense in the above comment.

- ctx->twophase &= MyReplicationSlot->data.two_phase;
+ ctx->twophase = slot->data.two_phase || two_phase;

Why you have removed '&' in the above assignment? It is possible that
the plugin doesn't provide two_phase APIs in which case we can't
support two_phase even if asked by the user? I think we need to
probably write it as: ctx->twophase &= (slot->data.two_phase ||
two_phase);

8.
@@ -602,7 +617,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)

SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
- slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
+ slot->data.two_phase_at = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex);

I think we need to set the two_phase_at only when the slot has
two_phase enabled? Previously, it was fine to set it because it was a
generic initial consistent point for a slot but after changing the
variable name it doesn't seem to make sense to assign it unless
two_phase is enabled.

9.
* needs to be sent later along with commit prepared and they must be
* before this point.
*/
- XLogRecPtr initial_consistent_point;
+ XLogRecPtr two_phase_at;

I think the explanation of this variable needs to be also updated
because now this can be used even for the first time when we enable
two_phase during streaming start.

10.
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- XLogRecPtr initial_consistent_point,
+ XLogRecPtr two_phase_at,
TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
@@ -2703,7 +2703,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb,
TransactionId xid,
* prepare if it was not decoded earlier. We don't need to decode the xact
* for aborts if it is not done already.
*/
- if ((txn->final_lsn < initial_consistent_point) && is_commit)
+ if ((txn->final_lsn < two_phase_at) && is_commit)

How can this change work? During decode prepare processing the patch
only remembers the prepare info in DecodePrepare whereas we would have
skipped the prepare before that via FilterPrepare. I think we need to
remember the prepare info before calling DecodePrepare. If you have
not already tested this scenario then please test it once before
posting the next version and also explain how exactly you have tested
it?

11.
+/*
+ * Are all tablesyncs READY?
+ */
+bool
+AllTablesyncsREADY(void)
+{
+ return !AnyTablesyncsNotREADY();
+}
+
+/*
+ * Are there any tablesyncs which are not yet READY?
+ */
+static bool
+AnyTablesyncsNotREADY(void)
+{

I don't think we need separate functions here.

12.
+/*
+ * Update the p_subscription two_phase tri-state of the current subscription.
+ */
+void
+UpdateTwoPhaseTriState(char new_tristate)

I would prefer not to include 'Tri' in the above function or variable
name. We might want to extend the states in future or even without
that it would be better not to include 'tri' here.

13.
+void
+UpdateTwoPhaseTriState(char new_tristate)
{
..
+#if 1
+ /* This is just debugging, for confirmation the update worked. */
+ {
+ Subscription *new_s;
+
+ StartTransactionCommand();
+ new_s = GetSubscription(MySubscription->oid, false);
+ CommitTransactionCommand();
+ }
+#endif
..
}

Let's remove the debugging code from the main patch.

14.
/*
+ * Even when the two_phase mode is requested by the user, it remains as
+ * the tri-state PENDING until all tablesyncs have reached READY state.
+ * Only then, can it become properly ENABLED.
+ */
+ bool all_tables_ready = AllTablesyncsREADY();

+ if (MySubscription->twophase == LOGICALREP_TWOPHASE_STATE_PENDING &&
all_tables_ready)
+ {
+ /* Start streaming with two_phase enabled */
+ walrcv_startstreaming(wrconn, &options, true);
+ UpdateTwoPhaseTriState(LOGICALREP_TWOPHASE_STATE_ENABLED);
+ MySubscription->twophase = LOGICALREP_TWOPHASE_STATE_ENABLED;
+ }
+ else
+ {
+ walrcv_startstreaming(wrconn, &options, false);
+ }
+
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" 2PC is %s.",
+ MySubscription->name,
+ MySubscription->twophase == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+ MySubscription->twophase == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+ MySubscription->twophase == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+ "?")));

I think here two_phase code is relevant only if we are talking with
server version >= 14. You can check that by
"walrcv_server_version(wrconn) >= 140000".

15.
+static void
+FetchTableStates(bool *started_tx)
+{
+ *started_tx = false;
+
+ if (!table_states_valid)
+ {
+ MemoryContext oldctx;
+ List *rstates;
+ ListCell *lc;
+ SubscriptionRelState *rstate;
+
+
+ /* Clean the old lists. */
+ list_free_deep(table_states_all);
+ table_states_all = NIL;

The patch doesn't seem to be using table_states_all, it might be
leftover from the previous version. Also, the logic in this function
can simply use GetSubscriptionNotReadyRelations as the existing code
is using.

16.
+static bool
+AnyTablesyncsNotREADY(void)
+{
+ bool found_busy = false;
+ bool started_tx = false;
+ int count = 0;
+ ListCell *lc;
+
+ /* We need up-to-date sync state info for subscription tables here. */
+ FetchTableStates(&started_tx);
+
+ /*
+ * Process all not-READY tables to see if any are also not-SYNCDONE
+ */
+ foreach(lc, table_states_not_ready)
+ {
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+
+ count++;
+ /*
+ * When the process_syncing_tables_for_apply changes the state from
+ * SYNCDONE to READY, that change is actually written directly into
+ * the list element of table_states_not_ready.
+ *
+ * So the "table_states_not_ready" list might end up having a READY
+ * state in it even though there was none when it was initially
+ * created. This is reason why we need to check for READY below.
+ */
+ if (rstate->state != SUBREL_STATE_READY)
+ {
+ found_busy = true;
+ break;
+ }
+ }

Do we really need to do this recheck in for loop? How does it matter?
I guess if this is not required, we can simply check if
table_states_not_ready list is empty or not.

17.
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\"
will restart so 2PC can be enabled",

In the above message, I think it is better to write two_phase instead of 2PC.

18.
+/* Has this prepared transaction been committed? */
+#define rbtxn_commit_prepared(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_COMMIT_PREPARED) != 0 \
+)
+
+/* Has this prepared transaction been rollbacked? */
+#define rbtxn_rollback_prepared(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_ROLLBACK_PREPARED) != 0 \
+)
+

Are these macros used anywhere? If not, please remove them.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Bharath Rupireddy 2021-03-16 11:51:19 Re: Should we improve "PID XXXX is not a PostgreSQL server process" warning for pg_terminate_backend(<<postmaster_pid>>)?
Previous Message Ivan Panchenko 2021-03-16 11:32:31 Re[2]: On login trigger: take three