Re: logical replication empty transactions

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: Ajin Cherian <itsajin(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, Euler Taveira <euler(at)timbira(dot)com(dot)br>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Jeff Janes <jeff(dot)janes(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Craig Ringer <craig(at)2ndquadrant(dot)com>
Subject: Re: logical replication empty transactions
Date: 2022-02-25 10:16:30
Message-ID: CAHut+Ps-Ze=c+2Cnn0E2mssJzdaLrxn03Hs09jUHz1+in+adjw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi. Here are my review comments for the v19 patch.

======

1. Commit message

The current logical replication behavior is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications).

SUGGESTION
"to subscriber even though" --> "to the subscriber even if"

~~~

2. Commit message

This patch addresses the above problem by postponing the BEGIN message
until the first change. While processing a COMMIT message,
if there is no other change for that transaction,
do not send COMMIT message. It means that pgoutput will
skip BEGIN/COMMIT messages for transactions that are empty.

SUGGESTION
"if there is" --> "if there was"
"do not send COMMIT message" --> "do not send the COMMIT message"
"It means that pgoutput" --> "This means that pgoutput"

~~~

3. Commit message

Shouldn't there be some similar description about using a lazy send
mechanism for STREAM START?

~~~

4. src/backend/replication/pgoutput/pgoutput.c - typedef struct PGOutputTxnData

+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN. BEGIN is only sent when the first
+ * change in a transaction is processed. This makes it possible
+ * to skip transactions that are empty.
+ */
+typedef struct PGOutputTxnData
+{
+ bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
+ bool sent_stream_start; /* flag indicating if stream start has been sent */
+ bool sent_any_stream; /* flag indicating if any stream has been sent */
+} PGOutputTxnData;
+

The struct comment looks stale because it doesn't mention anything
about the similar lazy send mechanism for STREAM_START.

~~~

5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn

static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
+ PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ txndata->sent_begin_txn = false;
+ txn->output_plugin_private = txndata;
+}

You don’t need to assign the other members 'sent_stream_start',
'sent_any_stream' because you are doing MemoryContextAllocZero anyway,
but for the same reason you did not really need to assign the
'sent_begin_txn' flag either.

I guess for consistency maybe it is better to (a) set all of them or
(b) set none of them. I prefer (b).

~~~

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin

I feel the 'pgoutput_begin' function is not well named. It makes some
of the code where they are called look quite confusing.

For streaming there is:
1. pgoutput_stream_start (does not send)
2. pgoutput_send_stream_start (does send)
so it is very clear.

OTOH there are
3. pgoutput_begin_txn (does not send)
4. pgoutput_begin (does send)

For consistency I think the 'pgoutput_begin' name should be changed to
include "send" verb
1. pgoutput_begin_txn (does not send)
2. pgoutput_send_begin_txn (does send)

~~~

7. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
if (schema_sent)
return;

+ /* set up txndata */
+ txndata = toptxn->output_plugin_private;
+
+ /*
+ * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
+ * is sent. If not, send now.
+ */
+ if (in_streaming && !txndata->sent_stream_start)
+ pgoutput_send_stream_start(ctx, toptxn);
+ else if (txndata && !txndata->sent_begin_txn)
+ {
+ pgoutput_begin(ctx, toptxn);
+ }
+

How come the in_streaming case is not checking for a NULL txndata
before referencing it? Even if it is OK to do that, some more comments
or assertions might help for this piece of code.
(Stop-Press: see later comments #9, #10)

~~~

8. src/backend/replication/pgoutput/pgoutput.c - maybe_send_schema

@@ -594,6 +663,20 @@ maybe_send_schema(LogicalDecodingContext *ctx,
if (schema_sent)
return;

+ /* set up txndata */
+ txndata = toptxn->output_plugin_private;
+
+ /*
+ * Before we send schema, make sure that STREAM START/BEGIN/BEGIN PREPARE
+ * is sent. If not, send now.
+ */

What part of this code is doing anything about "BEGIN PREPARE" ?

~~~

9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -1183,6 +1267,15 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ /*
+ * Output BEGIN if we haven't yet, unless streaming.
+ */
+ else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
+ pgoutput_begin(ctx, txn);
+

The above code fragment looks more like what IU was expecting should
be in 'maybe_send_schema',

If you expand it out (and tweak the comments) it can become much less
complex looking IMO

e.g.

if (in_streaming)
{
/* If streaming, send STREAM START if we haven't yet */
if (txndata && !txndata->sent_stream_start)
pgoutput_send_stream_start(ctx, txn);
}
else
{
/* If not streaming, send BEGIN if we haven't yet */
if (txndata && !txndata->sent_begin_txn)
pgoutput_begin(ctx, txn);
}

Also, IIUC for the 'in_streaming' case you can Assert(txndata); so
then the code can be made even simpler.

~~~

10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate

@ -1397,6 +1491,17 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

if (nrelids > 0)
{
+ txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ /*
+ * output BEGIN if we haven't yet, unless streaming.
+ */
+ else if (!in_streaming && (txndata && !txndata->sent_begin_txn))
+ pgoutput_begin(ctx, txn);

So now I have seen almost identical code repeated in 3 places so I am
beginning to think these should just be encapsulated in some common
function to call to do the deferred "send". Thoughts?

~~~

11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message

@@ -1429,6 +1534,24 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;

+ /*
+ * Output BEGIN if we haven't yet.
+ * Avoid for streaming and non-transactional messages.
+ */
+ if (in_streaming || transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* If streaming, send STREAM START if we haven't yet */
+ if (in_streaming && (txndata && !txndata->sent_stream_start))
+ pgoutput_send_stream_start(ctx, txn);
+ else if (transactional)
+ {
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_begin(ctx, txn);
+ }
+ }

Does that comment at the top of that code fragment accurately match
this code? It seemed a bit muddled/stale to me.

~~~

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_start

/*
+ * Don't actually send stream start here, instead set a flag that indicates
+ * that stream start hasn't been sent and wait for the first actual change
+ * for this stream to be sent and then send stream start. This is done
+ * to avoid sending empty streams without any changes.
+ */
+ if (txndata == NULL)
+ {
+ txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData));
+ txndata->sent_begin_txn = false;
+ txndata->sent_any_stream = false;
+ txn->output_plugin_private = txndata;
+ }

IMO there is no need to set the members – just let the
MemoryContextAllocZero take care of all that. Then the code is simpler
and it also saves wondering if anything was accidentally missed.

~~~

13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start

+pgoutput_send_stream_start(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+
+ /*
* If we already sent the first stream for this transaction then don't
* send the origin id in the subsequent streams.
*/
- if (rbtxn_is_streamed(txn))
+ if (txndata->sent_any_stream)
send_replication_origin = false;

Given this usage, I wonder if there is a better name for the txndata
member - e.g. 'sent_first_stream' ?

~~~

14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_send_stream_start

- /* we're streaming a chunk of transaction now */
- in_streaming = true;
+ /*
+ * Set the flags that indicate that changes were sent as part of
+ * the transaction and the stream.
+ */
+ txndata->sent_begin_txn = txndata->sent_stream_start = true;
+ txndata->sent_any_stream = true;

Why is this setting member 'sent_begin_txn' true also? It seems odd to
say so because the BEGIN was not actually sent at all, right?

~~~

15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_abort

@@ -1572,6 +1740,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,

/* determine the toplevel transaction */
toptxn = (txn->toptxn) ? txn->toptxn : txn;
+ txndata = toptxn->output_plugin_private;
+ sent_begin_txn = txndata->sent_begin_txn;
+
+ if (txn->toptxn == NULL)
+ {
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+ }
+
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream abort");
+ return;
+ }

I didn't really understand why this code is checking the
'sent_begin_txn' member instead of the 'sent_stream_start' member?

~~~

16. src/backend/replication/pgoutput/pgoutput.c - pgoutput_stream_commit

@@ -1598,7 +1782,17 @@ pgoutput_stream_commit(struct
LogicalDecodingContext *ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));

- OutputPluginUpdateProgress(ctx);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+
+ /* If no changes were part of this transaction then drop the commit */
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream commit");
+ return;
+ }

(Same as previous comment #15). I didn't really understand why this
code is checking the 'sent_begin_txn' member instead of the
'sent_stream_start' member?

~~~

17. src/backend/replication/syncrep.c - SyncRepEnabled

@@ -539,6 +538,15 @@ SyncRepReleaseWaiters(void)
}

/*
+ * Check if synchronous replication is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+ return SyncRepRequested() && ((volatile WalSndCtlData *)
WalSndCtl)->sync_standbys_defined;
+}

That code was once inline in 'SyncRepWaitForLSN' before it was turned
into a function, and there is a long comment in SyncRepWaitForLSN
describing the risks of this logic. e.g.

<quote>
... If it's true, we need to check it again
* later while holding the lock, to check the flag and operate the sync
* rep queue atomically. This is necessary to avoid the race condition
* described in SyncRepUpdateSyncStandbysDefined().
</quote>

This same function is now called from walsender.c. I think maybe it is
OK but please confirm it.

Anyway, the point is maybe this SyncRepEnabled function should be
better commented to make some reference about the race concerns of the
original comment. Otherwise some future caller of this function may be
unaware of it and come to grief.

-------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Julien Rouhaud 2022-02-25 10:30:46 Commitfest manager for 2022-03
Previous Message Yura Sokolov 2022-02-25 09:51:22 Re: BufferAlloc: don't take two simultaneous locks