Re: [HACKERS] logical decoding of two-phase transactions

From: vignesh C <vignesh21(at)gmail(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Ajin Cherian <itsajin(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: [HACKERS] logical decoding of two-phase transactions
Date: 2021-04-26 11:22:24
Message-ID: CALDaNm1BxeN7NP9MjK4VRatjcXqWEO6K_35aVN0um+5AFdv-+A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Apr 21, 2021 at 12:13 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> On Tue, Apr 20, 2021 at 3:45 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> >
> > Please find attached the latest patch set v73`*
> >
> > Differences from v72* are:
> >
> > * Rebased to HEAD @ today (required because v72-0001 no longer applied cleanly)
> >
> > * Minor documentation correction for protocol messages for Commit Prepared ('K')
> >
> > * Non-functional code tidy (mostly proto.c) to reduce overloading
> > different meanings to same member names for prepare/commit times.
>
>
> Please find attached a re-posting of patch set v73*
>
> This is the same as yesterday's v73 but with a contrib module compile
> error fixed.

Thanks for the updated patch, few comments:
1) Should "final_lsn not set in begin message" be "prepare_lsn not set
in begin message"
+logicalrep_read_begin_prepare(StringInfo in,
LogicalRepPreparedTxnData *begin_data)
+{
+ /* read fields */
+ begin_data->prepare_lsn = pq_getmsgint64(in);
+ if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "final_lsn not set in begin message");

2) Should "These commands" be "ALTER SUBSCRIPTION ... REFRESH
PUBLICATION and ALTER SUBSCRIPTION ... SET/ADD PUBLICATION ..." as
copy_data cannot be specified with alter subscription .. drop
publication.
+ These commands also cannot be executed with <literal>copy_data =
true</literal>
+ when the subscription has <literal>two_phase</literal> commit enabled. See
+ column <literal>subtwophasestate</literal> of
+ <xref linkend="catalog-pg-subscription"/> to know the actual
two-phase state.

3) <term>Byte1('A')</term> should be <term>Byte1('r')</term> as we
have defined LOGICAL_REP_MSG_ROLLBACK_PREPARED as r.
+<term>Rollback Prepared</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('A')</term>
+<listitem><para>
+ Identifies this message as the rollback of a
two-phase transaction message.
+</para></listitem>
+</varlistentry>

4) Should "Check if the prepared transaction with the given GID and
lsn is around." be
"Check if the prepared transaction with the given GID, lsn & timestamp
is around."
+/*
+ * LookupGXact
+ * Check if the prepared transaction with the given GID
and lsn is around.
+ *
+ * Note that we always compare with the LSN where prepare ends because that is
+ * what is stored as origin_lsn in the 2PC file.
+ *
+ * This function is primarily used to check if the prepared transaction
+ * received from the upstream (remote node) already exists. Checking only GID
+ * is not sufficient because a different prepared xact with the same GID can
+ * exist on the same node. So, we are ensuring to match origin_lsn and
+ * origin_timestamp of prepared xact to avoid the possibility of a match of
+ * prepared xact from two different nodes.
+ */

5) Should we change "The LSN of the prepare." to "The LSN of the begin prepare."
+<term>Begin Prepare</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('b')</term>
+<listitem><para>
+ Identifies this message as the beginning of a
two-phase transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+ The LSN of the prepare.
+</para></listitem>
+</varlistentry>

6) Similarly in cases of "Commit Prepared" and "Rollback Prepared"

7) No need to initialize has_subrels as we will always assign the
value returned by HeapTupleIsValid
+HasSubscriptionRelations(Oid subid)
+{
+ Relation rel;
+ int nkeys = 0;
+ ScanKeyData skey[2];
+ SysScanDesc scan;
+ bool has_subrels = false;
+
+ rel = table_open(SubscriptionRelRelationId, AccessShareLock);

8) We could include errhint, like errhint("Option \"two_phase\"
specified more than once") to specify a more informative error
message.
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting
or redundant options")));
+ two_phase_option_given = true;
+
+ data->two_phase = defGetBoolean(defel);
+ }

9) We have a lot of function parameters for
parse_subscription_options, should we change it to struct?
@@ -69,7 +69,8 @@ parse_subscription_options(List *options,
char **synchronous_commit,
bool *refresh,
bool *binary_given,
bool *binary,
- bool
*streaming_given, bool *streaming)
+ bool
*streaming_given, bool *streaming,
+ bool
*twophase_given, bool *twophase)

10) Should we change " errhint("Use ALTER SUBSCRIPTION ...SET
PUBLICATION with refresh = false, or with copy_data = false, or use
DROP/CREATE SUBSCRIPTION.")" to "errhint("Use ALTER SUBSCRIPTION
...SET/ADD PUBLICATION with refresh = false, or with copy_data =
false.")" as we don't support copy_data in ALTER subscription ... DROP
publication.
+ /*
+ * See
ALTER_SUBSCRIPTION_REFRESH for details why this is
+ * not allowed.
+ */
+ if (sub->twophasestate ==
LOGICALREP_TWOPHASE_STATE_ENABLED && copy_data)
+ ereport(ERROR,
+
(errcode(ERRCODE_SYNTAX_ERROR),
+
errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed
when two_phase is enabled"),
+
errhint("Use ALTER SUBSCRIPTION ...SET PUBLICATION with refresh =
false, or with copy_data = false"
+
", or use DROP/CREATE SUBSCRIPTION.")));

11) Should 14000 be 15000 as this feature will be committed in PG15
+ if (options->proto.logical.twophase &&
+ PQserverVersion(conn->streamConn) >= 140000)
+ appendStringInfoString(&cmd, ", two_phase 'on'");

12) should we change "begin message" to "begin prepare message"
+ if (begin_data->prepare_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "final_lsn not set in begin message");
+ begin_data->end_lsn = pq_getmsgint64(in);
+ if (begin_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn not set in begin message");

13) should we change "commit prepare message" to "commit prepared message"
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in commit prepare
message", flags);
+
+ /* read fields */
+ prepare_data->commit_lsn = pq_getmsgint64(in);
+ if (prepare_data->commit_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "commit_lsn is not set in commit prepared message");
+ prepare_data->end_lsn = pq_getmsgint64(in);
+ if (prepare_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn is not set in commit prepared message");
+ prepare_data->commit_time = pq_getmsgint64(in);

14) should we change "commit prepared message" to "rollback prepared message"
+void
+logicalrep_read_rollback_prepared(StringInfo in,
+
LogicalRepRollbackPreparedTxnData *rollback_data)
+{
+ /* read flags */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in rollback prepare
message", flags);
+
+ /* read fields */
+ rollback_data->prepare_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->prepare_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "prepare_end_lsn is not set in commit
prepared message");
+ rollback_data->rollback_end_lsn = pq_getmsgint64(in);
+ if (rollback_data->rollback_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "rollback_end_lsn is not set in commit
prepared message");
+ rollback_data->prepare_time = pq_getmsgint64(in);
+ rollback_data->rollback_time = pq_getmsgint64(in);
+ rollback_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(rollback_data->gid, pq_getmsgstring(in));
+}

15) We can include check pg_stat_replication_slots to verify if
statistics is getting updated.
+$node_publisher->safe_psql('postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (11);
+ PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*)
FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED
'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup($appname);

Regards,
Vignesh

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message houzj.fnst@fujitsu.com 2021-04-26 11:26:29 RE: Parallel INSERT SELECT take 2
Previous Message Arne Roland 2021-04-26 11:00:11 Re: PATCH: generate fractional cheapest paths in generate_orderedappend_path