Re: Skipping logical replication transactions on subscriber side

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
Cc: vignesh C <vignesh21(at)gmail(dot)com>, Greg Nancarrow <gregn4422(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, "tanghy(dot)fnst(at)fujitsu(dot)com" <tanghy(dot)fnst(at)fujitsu(dot)com>, "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>, Alexey Lesovsky <lesovsky(at)gmail(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Skipping logical replication transactions on subscriber side
Date: 2022-01-15 10:23:49
Message-ID: CAA4eK1KT0C2TDL++os03EpY_5v2STjUBmF8WJYX8zT36MJ5SfQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Fri, Jan 14, 2022 at 7:49 AM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
>
> I agree with all the comments above. I've attached an updated patch.
>

Review comments
================
1.
+
+ <para>
+ In this case, you need to consider changing the data on the
subscriber so that it

The starting of this sentence doesn't make sense to me. How about
changing it like: "To resolve conflicts, you need to ...

2.
+ <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>)
+ is cleared. See <xref linkend="logical-replication-conflicts"/> for
+ the details of logical replication conflicts.
+ </para>
+
+ <para>
+ <replaceable>skip_option</replaceable> specifies options for
this operation.
+ The supported option is:
+
+ <variablelist>
+ <varlistentry>
+ <term><literal>xid</literal> (<type>xid</type>)</term>
+ <listitem>
+ <para>
+ Specifies the ID of the transaction whose changes are to be skipped
+ by the logical replication worker. Setting
<literal>NONE</literal> resets
+ the transaction ID.
+ </para>

Empty spaces after line finish are inconsistent. I personally use a
single space before a new line but I see that others use two spaces
and the nearby documentation also uses two spaces in this regard so I
am fine either way but let's be consistent.

3.
+ case ALTER_SUBSCRIPTION_SKIP:
+ {
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to skip transaction")));
+
+ parse_subscription_options(pstate, stmt->options, SUBOPT_XID, &opts);
+
+ if (IsSet(opts.specified_opts, SUBOPT_XID))
..
..

Is there a case when the above 'if (IsSet(..' won't be true? If not,
then probably there should be Assert instead of 'if'.

4.
+static TransactionId skipping_xid = InvalidTransactionId;

I find this variable name bit odd. Can we name it skip_xid?

5.
+ * skipping_xid is valid if we are skipping all data modification changes
+ * (INSERT, UPDATE, etc.) of the specified transaction at
MySubscription->skipxid.
+ * Once we start skipping changes, we don't stop it until we skip all changes

I think it would be better to write the first line of comment as: "We
enable skipping all data modification changes (INSERT, UPDATE, etc.)
for the subscription if the user has specified skip_xid. Once we ..."

6.
+static void
+maybe_start_skipping_changes(TransactionId xid)
+{
+ Assert(!is_skipping_changes());
+ Assert(!in_remote_transaction);
+ Assert(!in_streamed_transaction);
+
+ /* Make sure subscription cache is up-to-date */
+ maybe_reread_subscription();

Why do we need to update the cache here by calling
maybe_reread_subscription() and at other places in the patch? It is
sufficient to get the skip_xid value at the start of the worker via
GetSubscription().

7. In maybe_reread_subscription(), isn't there a need to check whether
skip_xid is changed where we exit and launch the worker and compare
other subscription parameters?

8.
+static void
+clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn,
+ TimestampTz origin_timestamp)
+{
+ Relation rel;
+ Form_pg_subscription subform;
+ HeapTuple tup;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ if (!IsTransactionState())
+ StartTransactionCommand();
+
+ LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+ AccessShareLock);

It is important to add a comment as to why we need a lock here.

9.
+ * needs to be set subskipxid again. We can reduce the possibility by
+ * logging a replication origin WAL record to advance the origin LSN
+ * instead but it doesn't seem to be worth since it's a very minor case.

You can also add here that there is no way to advance origin_timestamp
so that would be inconsistent.

10.
+clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn,
+ TimestampTz origin_timestamp)
{
..
..
+ if (!IsTransactionState())
+ StartTransactionCommand();
..
..
+ CommitTransactionCommand();
..
}

The transaction should be committed in this function if it is started
here otherwise it should be the responsibility of the caller to commit
it.

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Kapila 2022-01-15 10:28:21 Re: Skipping logical replication transactions on subscriber side
Previous Message Denis Hirn 2022-01-15 10:01:09 Re: [PATCH] Allow multiple recursive self-references