Re: Skipping logical replication transactions on subscriber side

From: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: vignesh C <vignesh21(at)gmail(dot)com>, Greg Nancarrow <gregn4422(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, "tanghy(dot)fnst(at)fujitsu(dot)com" <tanghy(dot)fnst(at)fujitsu(dot)com>, "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>, Alexey Lesovsky <lesovsky(at)gmail(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Skipping logical replication transactions on subscriber side
Date: 2022-01-17 04:18:49
Message-ID: CAD21AoCd3Y2-b67+pVrzrdteUmup1XG6JeHYOa5dGjh8qZ3VuQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Sat, Jan 15, 2022 at 7:24 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Fri, Jan 14, 2022 at 7:49 AM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
> >
> > I agree with all the comments above. I've attached an updated patch.
> >
>
> Review comments
> ================

Thank you for the comments!

> 1.
> +
> + <para>
> + In this case, you need to consider changing the data on the
> subscriber so that it
>
> The starting of this sentence doesn't make sense to me. How about
> changing it like: "To resolve conflicts, you need to ...
>

Fixed.

> 2.
> + <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>)
> + is cleared. See <xref linkend="logical-replication-conflicts"/> for
> + the details of logical replication conflicts.
> + </para>
> +
> + <para>
> + <replaceable>skip_option</replaceable> specifies options for
> this operation.
> + The supported option is:
> +
> + <variablelist>
> + <varlistentry>
> + <term><literal>xid</literal> (<type>xid</type>)</term>
> + <listitem>
> + <para>
> + Specifies the ID of the transaction whose changes are to be skipped
> + by the logical replication worker. Setting
> <literal>NONE</literal> resets
> + the transaction ID.
> + </para>
>
> Empty spaces after line finish are inconsistent. I personally use a
> single space before a new line but I see that others use two spaces
> and the nearby documentation also uses two spaces in this regard so I
> am fine either way but let's be consistent.

Fixed.

>
> 3.
> + case ALTER_SUBSCRIPTION_SKIP:
> + {
> + if (!superuser())
> + ereport(ERROR,
> + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
> + errmsg("must be superuser to skip transaction")));
> +
> + parse_subscription_options(pstate, stmt->options, SUBOPT_XID, &opts);
> +
> + if (IsSet(opts.specified_opts, SUBOPT_XID))
> ..
> ..
>
> Is there a case when the above 'if (IsSet(..' won't be true? If not,
> then probably there should be Assert instead of 'if'.
>

Fixed.

> 4.
> +static TransactionId skipping_xid = InvalidTransactionId;
>
> I find this variable name bit odd. Can we name it skip_xid?
>

Okay, renamed.

> 5.
> + * skipping_xid is valid if we are skipping all data modification changes
> + * (INSERT, UPDATE, etc.) of the specified transaction at
> MySubscription->skipxid.
> + * Once we start skipping changes, we don't stop it until we skip all changes
>
> I think it would be better to write the first line of comment as: "We
> enable skipping all data modification changes (INSERT, UPDATE, etc.)
> for the subscription if the user has specified skip_xid. Once we ..."
>

Changed.

> 6.
> +static void
> +maybe_start_skipping_changes(TransactionId xid)
> +{
> + Assert(!is_skipping_changes());
> + Assert(!in_remote_transaction);
> + Assert(!in_streamed_transaction);
> +
> + /* Make sure subscription cache is up-to-date */
> + maybe_reread_subscription();
>
> Why do we need to update the cache here by calling
> maybe_reread_subscription() and at other places in the patch? It is
> sufficient to get the skip_xid value at the start of the worker via
> GetSubscription().

MySubscription could be out-of-date after a user changes the catalog.
In non-skipping change cases, we check it when starting the
transaction in begin_replication_step() which is called, e.g., when
applying an insert change. But I think we need to make sure it’s
up-to-date at the beginning of applying changes, that is, before
starting a transaction. Otherwise, we may end up skipping the
transaction based on out-of-dated subscription cache.

The reason why calling calling maybe_reread_subscription in both
apply_handle_commit_prepared() and apply_handle_rollback_prepared() is
the same; MySubscription could be out-of-date when applying
commit-prepared or rollback-prepared since we have not called
begin_replication_step() to open a new transaction.

>
> 7. In maybe_reread_subscription(), isn't there a need to check whether
> skip_xid is changed where we exit and launch the worker and compare
> other subscription parameters?

IIUC we relaunch the worker here when subscription parameters such as
slot_name was changed. In the current implementation, I think that
relaunching the worker is not necessarily necessary when skip_xid is
changed. For instance, when skipping the prepared transaction, we
deliberately don’t clear subskipxid of pg_subscription and do that at
commit-prepared or rollback-prepared case. There are chances that the
user changes skip_xid before commit-prepared or rollback-prepared. But
we tolerate this case.

Also, in non-streaming and non-2PC cases, while skipping changes we
don’t call maybe_reread_subscription() until all changes are skipped.
So it cannot work to cancel skipping changes that is already started.

>
> 8.
> +static void
> +clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn,
> + TimestampTz origin_timestamp)
> +{
> + Relation rel;
> + Form_pg_subscription subform;
> + HeapTuple tup;
> + bool nulls[Natts_pg_subscription];
> + bool replaces[Natts_pg_subscription];
> + Datum values[Natts_pg_subscription];
> +
> + memset(values, 0, sizeof(values));
> + memset(nulls, false, sizeof(nulls));
> + memset(replaces, false, sizeof(replaces));
> +
> + if (!IsTransactionState())
> + StartTransactionCommand();
> +
> + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
> + AccessShareLock);
>
> It is important to add a comment as to why we need a lock here.

Added.

>
> 9.
> + * needs to be set subskipxid again. We can reduce the possibility by
> + * logging a replication origin WAL record to advance the origin LSN
> + * instead but it doesn't seem to be worth since it's a very minor case.
>
> You can also add here that there is no way to advance origin_timestamp
> so that would be inconsistent.

Added.

>
> 10.
> +clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn,
> + TimestampTz origin_timestamp)
> {
> ..
> ..
> + if (!IsTransactionState())
> + StartTransactionCommand();
> ..
> ..
> + CommitTransactionCommand();
> ..
> }
>
> The transaction should be committed in this function if it is started
> here otherwise it should be the responsibility of the caller to commit
> it.

Fixed.

I've attached an updated patch that incorporated these comments except
for 6 and 7 that we probably need more discussion on. The comments
from Vignesh are also incorporated.

Regards,

--
Masahiko Sawada
EDB: https://www.enterprisedb.com/

Attachment Content-Type Size
v5-0001-Add-ALTER-SUBSCRIPTION-.-SKIP-to-skip-the-transac.patch application/octet-stream 58.5 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Masahiko Sawada 2022-01-17 04:20:19 Re: Skipping logical replication transactions on subscriber side
Previous Message Tom Lane 2022-01-17 04:01:25 Re: Null commitTS bug