Re: logical replication empty transactions

From: Ajin Cherian <itsajin(at)gmail(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, Euler Taveira <euler(at)timbira(dot)com(dot)br>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Jeff Janes <jeff(dot)janes(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Craig Ringer <craig(at)2ndquadrant(dot)com>
Subject: Re: logical replication empty transactions
Date: 2022-03-01 05:02:17
Message-ID: CAFPTHDbO-XXY1SGGJoH_tPxr5suGrB5hj96Zrw19uwkkGxzUGg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Fri, Feb 25, 2022 at 9:17 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> Hi. Here are my review comments for the v19 patch.
>
> ======
>
> 1. Commit message
>
> The current logical replication behavior is to send every transaction to
> subscriber even though the transaction is empty (because it does not
> contain changes from the selected publications).
>
> SUGGESTION
> "to subscriber even though" --> "to the subscriber even if"

Fixed.

>
> ~~~
>
> 2. Commit message
>
> This patch addresses the above problem by postponing the BEGIN message
> until the first change. While processing a COMMIT message,
> if there is no other change for that transaction,
> do not send COMMIT message. It means that pgoutput will
> skip BEGIN/COMMIT messages for transactions that are empty.
>
> SUGGESTION
> "if there is" --> "if there was"
> "do not send COMMIT message" --> "do not send the COMMIT message"
> "It means that pgoutput" --> "This means that pgoutput"
>
> ~~~

Fixed.

>
> 3. Commit message
>
> Shouldn't there be some similar description about using a lazy send
> mechanism for STREAM START?
>
> ~~~

Added.

>
> 4. src/backend/replication/pgoutput/pgoutput.c - typedef struct PGOutputTxnData
>
> +/*
> + * Maintain a per-transaction level variable to track whether the
> + * transaction has sent BEGIN. BEGIN is only sent when the first
> + * change in a transaction is processed. This makes it possible
> + * to skip transactions that are empty.
> + */
> +typedef struct PGOutputTxnData
> +{
> + bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
> + bool sent_stream_start; /* flag indicating if stream start has been sent */
> + bool sent_any_stream; /* flag indicating if any stream has been sent */
> +} PGOutputTxnData;
> +
>
> The struct comment looks stale because it doesn't mention anything
> about the similar lazy send mechanism for STREAM_START.
>
> ~~~

Added.

>
> 5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn
>
> static void
> pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
> {
> + PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
> + sizeof(PGOutputTxnData));
> +
> + txndata->sent_begin_txn = false;
> + txn->output_plugin_private = txndata;
> +}
>
> You don’t need to assign the other members 'sent_stream_start',
> 'sent_any_stream' because you are doing MemoryContextAllocZero anyway,
> but for the same reason you did not really need to assign the
> 'sent_begin_txn' flag either.
>
> I guess for consistency maybe it is better to (a) set all of them or
> (b) set none of them. I prefer (b).
>
> ~~~

Did (b)

>
> 6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin
>
> I feel the 'pgoutput_begin' function is not well named. It makes some
> of the code where they are called look quite confusing.
>
> For streaming there is:
> 1. pgoutput_stream_start (does not send)
> 2. pgoutput_send_stream_start (does send)
> so it is very clear.
>
> OTOH there are
> 3. pgoutput_begin_txn (does not send)
> 4. pgoutput_begin (does send)
>
> For consistency I think the 'pgoutput_begin' name should be changed to
> include "send" verb
> 1. pgoutput_begin_txn (does not send)
> 2. pgoutput_send_begin_txn (does send)
>
> ~~~

Changed as mentioned.

>
> 7. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema
>
> @@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
> if (schema_sent)
> return;
>
> + /* set up txndata */
> + txndata = toptxn->output_plugin_private;
> +
> + /*
> + * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
> + * is sent. If not, send now.
> + */
> + if (in_streaming && !txndata->sent_stream_start)
> + pgoutput_send_stream_start(ctx, toptxn);
> + else if (txndata && !txndata->sent_begin_txn)
> + {
> + pgoutput_begin(ctx, toptxn);
> + }
> +
>
> How come the in_streaming case is not checking for a NULL txndata
> before referencing it? Even if it is OK to do that, some more comments
> or assertions might help for this piece of code.
> (Stop-Press: see later comments #9, #10)
>
> ~~~

Updated.

>
> 8. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema
>
> @@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
> if (schema_sent)
> return;
>
> + /* set up txndata */
> + txndata = toptxn->output_plugin_private;
> +
> + /*
> + * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
> + * is sent. If not, send now.
> + */
>
> What part of this code is doing anything about "BEGIN PREPARE" ?
>
> ~~~

Removed that reference.

>
> 9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change
>
> @@ -1183,6 +1267,15 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> Assert(false);
> }
>
> + /* If streaming, send STREAM START if we haven't yet */
> + if (in_streaming && (txndata && !txndata->sent_stream_start))
> + pgoutput_send_stream_start(ctx, txn);
> + /*
> + * Output BEGIN if we haven't yet, unless streaming.
> + */
> + else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
> + pgoutput_begin(ctx, txn);
> +
>
> The above code fragment looks more like what IU was expecting should
> be in 'maybe_send_schema',
>
> If you expand it out (and tweak the comments) it can become much less
> complex looking IMO
>
> e.g.
>
> if (in_streaming)
> {
> /* If streaming, send STREAM START if we haven't yet */
> if (txndata && !txndata->sent_stream_start)
> pgoutput_send_stream_start(ctx, txn);
> }
> else
> {
> /* If not streaming, send BEGIN if we haven't yet */
> if (txndata && !txndata->sent_begin_txn)
> pgoutput_begin(ctx, txn);
> }
>
> Also, IIUC for the 'in_streaming' case you can Assert(txndata); so
> then the code can be made even simpler.
>

Chose your example.

> ~~~
>
> 10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate
>
> @ -1397,6 +1491,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>
> if (nrelids > 0)
> {
> + txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
> + /* If streaming, send STREAM START if we haven't yet */
> + if (in_streaming && (txndata && !txndata->sent_stream_start))
> + pgoutput_send_stream_start(ctx, txn);
> + /*
> + * output BEGIN if we haven't yet, unless streaming.
> + */
> + else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
> + pgoutput_begin(ctx, txn);
>
> So now I have seen almost identical code repeated in 3 places so I am
> beginning to think these should just be encapsulated in some common
> function to call to do the deferred "send". Thoughts?
>
> ~~~

Not sure if we want to add a function call overhead.

>
> 11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message
>
> @@ -1429,6 +1534,24 @@ pgoutput_message(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> if (in_streaming)
> xid = txn->xid;
>
> + /*
> + * Output BEGIN if we haven't yet.
> + * Avoid for streaming and non-transactional messages.
> + */
> + if (in_streaming || transactional)
> + {
> + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
> + /* If streaming, send STREAM START if we haven't yet */
> + if (in_streaming && (txndata && !txndata->sent_stream_start))
> + pgoutput_send_stream_start(ctx, txn);
> + else if (transactional)
> + {
> + if (txndata && !txndata->sent_begin_txn)
> + pgoutput_begin(ctx, txn);
> + }
> + }
>
> Does that comment at the top of that code fragment accurately match
> this code? It seemed a bit muddled/stale to me.
>
> ~~~

Fixed.

>
> 12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_start
>
> /*
> + * Don't actually send stream start here, instead set a flag that indicates
> + * that stream start hasn't been sent and wait for the first actual change
> + * for this stream to be sent and then send stream start. This is done
> + * to avoid sending empty streams without any changes.
> + */
> + if (txndata == NULL)
> + {
> + txndata =
> + MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData));
> + txndata->sent_begin_txn = false;
> + txndata->sent_any_stream = false;
> + txn->output_plugin_private = txndata;
> + }
>
> IMO there is no need to set the members – just let the
> MemoryContextAllocZero take care of all that. Then the code is simpler
> and it also saves wondering if anything was accidentally missed.
>

Fixed.

> ~~~
>
> 13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start
>
> +pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
> + ReorderBufferTXN *txn)
> +{
> + bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
> + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> +
> +
> + /*
> * If we already sent the first stream for this transaction then don't
> * send the origin id in the subsequent streams.
> */
> - if (rbtxn_is_streamed(txn))
> + if (txndata->sent_any_stream)
> send_replication_origin = false;
>
> Given this usage, I wonder if there is a better name for the txndata
> member - e.g. 'sent_first_stream' ?
>
> ~~~

Changed.

>
> 14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start
>
> - /* we're streaming a chunk of transaction now */
> - in_streaming = true;
> + /*
> + * Set the flags that indicate that changes were sent as part of
> + * the transaction and the stream.
> + */
> + txndata->sent_begin_txn = txndata->sent_stream_start = true;
> + txndata->sent_any_stream = true;
>
> Why is this setting member 'sent_begin_txn' true also? It seems odd to
> say so because the BEGIN was not actually sent at all, right?
>
> ~~~

You can have transactions that are partially streamed and partially
not. So if there
is a transaction that started as streaming, but when it is committed,
it is replicated
as part of the commit, then when the changes are decoded, we shouldn't
be sending a "begin"
again.

>
> 15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort
>
> @@ -1572,6 +1740,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
>
> /* determine the toplevel transaction */
> toptxn = (txn->toptxn) ? txn->toptxn : txn;
> + txndata = toptxn->output_plugin_private;
> + sent_begin_txn = txndata->sent_begin_txn;
> +
> + if (txn->toptxn == NULL)
> + {
> + pfree(txndata);
> + txn->output_plugin_private = NULL;
> + }
> +
> + if (!sent_begin_txn)
> + {
> + elog(DEBUG1, "Skipping replication of an empty transaction in stream abort");
> + return;
> + }
>
> I didn't really understand why this code is checking the
> 'sent_begin_txn' member instead of the 'sent_stream_start' member?
>

Yes, changed this to check "sent_first_stream"
> ~~~
>
> 16. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_commit
>
> @@ -1598,7 +1782,17 @@ pgoutput_stream_commit(struct
> LogicalDecodingContext *ctx,
> Assert(!in_streaming);
> Assert(rbtxn_is_streamed(txn));
>
> - OutputPluginUpdateProgress(ctx);
> + pfree(txndata);
> + txn->output_plugin_private = NULL;
> +
> + /* If no changes were part of this transaction then drop the commit */
> + if (!sent_begin_txn)
> + {
> + elog(DEBUG1, "Skipping replication of an empty transaction in stream commit");
> + return;
> + }
>
> (Same as previous comment #15). I didn't really understand why this
> code is checking the 'sent_begin_txn' member instead of the
> 'sent_stream_start' member?
>
> ~~~

Changed.

>
> 17. src/backend/replication/syncrep.c - SyncRepEnabled
>
> @@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
> }
>
> /*
> + * Check if synchronous replication is enabled
> + */
> +bool
> +SyncRepEnabled(void)
> +{
> + return SyncRepRequested() && ((volatile WalSndCtlData *)
> WalSndCtl)->sync_standbys_defined;
> +}
>
> That code was once inline in 'SyncRepWaitForLSN' before it was turned
> into a function, and there is a long comment in SyncRepWaitForLSN
> describing the risks of this logic. e.g.
>
> <quote>
> ... If it's true, we need to check it again
> * later while holding the lock, to check the flag and operate the sync
> * rep queue atomically. This is necessary to avoid the race condition
> * described in SyncRepUpdateSyncStandbysDefined().
> </quote>
>
> This same function is now called from walsender.c. I think maybe it is
> OK but please confirm it.
>
> Anyway, the point is maybe this SyncRepEnabled function should be
> better commented to make some reference about the race concerns of the
> original comment. Otherwise some future caller of this function may be
> unaware of it and come to grief.
>

Leaving this for now, not sure what wording is appropriate to use here.

On Wed, Feb 23, 2022 at 5:24 PM wangw(dot)fnst(at)fujitsu(dot)com
<wangw(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Feb, Wed 23, 2022 at 10:58 PM Ajin Cherian <itsajin(at)gmail(dot)com> wrote:
> >
> Few comments to V19-0001:
>
> 1. I think we should adjust the alignment format.
> git am ../v19-0001-Skip-empty-transactions-for-logical-replication.patch
> .git/rebase-apply/patch:197: indent with spaces.
> * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
> .git/rebase-apply/patch:198: indent with spaces.
> * is sent. If not, send now.
> .git/rebase-apply/patch:199: indent with spaces.
> */
> .git/rebase-apply/patch:201: indent with spaces.
> pgoutput_send_stream_start(ctx, toptxn);
> .git/rebase-apply/patch:204: indent with spaces.
> pgoutput_begin(ctx, toptxn);
> warning: 5 lines add whitespace errors.

Fixed.

>
> 2. Structure member initialization.
> static void
> pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
> {
> + PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
> + sizeof(PGOutputTxnData));
> +
> + txndata->sent_begin_txn = false;
> + txn->output_plugin_private = txndata;
> +}
> Do we need to set sent_stream_start and sent_any_stream to false here?

Fixed

>
> 3. Maybe we should add Assert(txndata) like function pgoutput_commit_txn in
> other functions.
>
> 4. In addition, I think we should keep a unified style.
> a). log style (maybe first one is better.)
> First style : "Skipping replication of an empty transaction in XXX"
> Second style : "skipping replication of an empty transaction"
> b) flag name (maybe second one is better.)
> First style : variable "sent_begin_txn" in function pgoutput_stream_*.
> Second style : variable "skip" in function pgoutput_commit_txn.
>

Fixed,

Regards,
Ajin Cherian
Fujitsu Australia

Attachment Content-Type Size
v21-0001-Skip-empty-transactions-for-logical-replication.patch application/octet-stream 24.1 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Kyotaro Horiguchi 2022-03-01 05:14:13 Re: In-placre persistance change of a relation
Previous Message Pavel Stehule 2022-03-01 04:50:45 Re: Schema variables - new implementation for Postgres 15