Re: logical replication empty transactions

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>
Cc: Ajin Cherian <itsajin(at)gmail(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, Euler Taveira <euler(at)timbira(dot)com(dot)br>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Jeff Janes <jeff(dot)janes(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Craig Ringer <craig(at)2ndquadrant(dot)com>
Subject: Re: logical replication empty transactions
Date: 2021-07-19 05:24:39
Message-ID: CAHut+Pv8BN=T_uRiJqw-HQUgeC9ihjwmRGiQcMAqtO1tdERSrg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi Ajin,

I have reviewed the v7 patch and given my feedback comments below.

Apply OK
Build OK
make check OK
TAP (subscriptions) make check OK
Build PG Docs (html) OK

Although I made lots of review comments below, the important point is
that none of them are functional - they are only minore re-wordings
and some code refactoring that I thought would make the code simpler
and/or easier to read. YMMV, so please feel free to disagree with any
of them.

//////////

1a. Commit Comment - wording

BEFORE
This patch addresses the above problem by postponing the BEGIN / BEGIN
PREPARE message until the first change.

AFTER
This patch addresses the above problem by postponing the BEGIN / BEGIN
PREPARE messages until the first change is encountered.

------

1b. Commit Comment - wording

BEFORE
While processing a COMMIT message or a PREPARE message, if there is no
other change for that transaction, do not send COMMIT message or
PREPARE message.

AFTER
If (when processing a COMMIT / PREPARE message) we find there had been
no other change for that transaction, then do not send the COMMIT /
PREPARE message.

------

2. doc/src/sgml/logicaldecoding.sgml - wording

@@ -884,11 +884,19 @@ typedef void (*LogicalDecodePrepareCB) (struct
LogicalDecodingContext *ctx,
The required <function>commit_prepared_cb</function> callback is called
whenever a transaction <command>COMMIT PREPARED</command> has
been decoded.
The <parameter>gid</parameter> field, which is part of the
- <parameter>txn</parameter> parameter, can be used in this callback.
+ <parameter>txn</parameter> parameter, can be used in this callback. The
+ parameters <parameter>prepare_end_lsn</parameter> and
+ <parameter>prepare_time</parameter> can be used to check if the plugin
+ has received this <command>PREPARE TRANSACTION</command> in which case
+ it can commit the transaction, otherwise, it can skip the commit. The
+ <parameter>gid</parameter> alone is not sufficient because the downstream
+ node can have a prepared transaction with the same identifier.

=>

(some minor rewording of the last part)

AFTER:

The parameters <parameter>prepare_end_lsn</parameter> and
<parameter>prepare_time</parameter> can be used to check if the plugin
has received this <command>PREPARE TRANSACTION</command> or not. If
yes, it can commit the transaction, otherwise, it can skip the commit.
The <parameter>gid</parameter> alone is not sufficient to determine
this because the downstream node may already have a prepared
transaction with the same identifier.

------

3. src/backend/replication/logical/proto.c - whitespace

@@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in,
LogicalRepCommitPreparedTxnData *
elog(ERROR, "unrecognized flags %u in commit prepared message", flags);

/* read fields */
+ prepare_data->prepare_end_lsn = pq_getmsgint64(in);
+ if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr)
+ elog(ERROR,"prepare_end_lsn is not set in commit prepared message");

=>

There is missing space before the 2nd elog param.

------

4. src/backend/replication/logical/worker.c - comment typos

/*
- * Update origin state so we can restart streaming from correct position
- * in case of crash.
+ * It is possible that we haven't received the prepare because
+ * the transaction did not have any changes relevant to this
+ * subscription and was essentially an empty prepare. In which case,
+ * the walsender is optimized to drop the empty transaction and the
+ * accompanying prepare. Silently ignore if we don't find the prepared
+ * transaction.
*/

4a. =>

"and was essentially an empty prepare" --> "so was essentially an empty prepare"

4b. =>

"In which case" --> "In this case"

------

5. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_txn

@@ -410,10 +417,32 @@ pgoutput_startup(LogicalDecodingContext *ctx,
OutputPluginOptions *opt,
static void
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
+ PGOutputTxnData *data = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ /*
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
+ */
+ data->sent_begin_txn = false;
+ txn->output_plugin_private = data;
+}

=>

I felt that since this message postponement is now the new behaviour
of this function then probably this should all be a function level
comment instead of the comment being in the body of the function

------

6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin

+
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

=>

Even though it is kind of obvious, it is probably better to provide a
function comment here too

------

7. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_txn

@@ -428,8 +457,22 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
+ PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
+ bool skip;
+
+ Assert(data);
+ skip = !data->sent_begin_txn;
+ pfree(data);
+ txn->output_plugin_private = NULL;
OutputPluginUpdateProgress(ctx);

+ /* skip COMMIT message if nothing was sent */
+ if (skip)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction");
+ return;
+ }
+

7a. =>

I felt that the comment "skip COMMIT message if nothing was sent"
should be done at the point where you *decide* to skip or not. So you
could either move that comment to where the skip variable is assigned.
Or (my preference) leave the comment where it is but change the
variable name to be sent_begin = !data->sent_begin_txn;

------

Regardless I think the comment should be elaborated a bit to describe
the reason more.

7b. =>

BEFORE
/* skip COMMIT message if nothing was sent */

AFTER
/* If a BEGIN message was not yet sent, then it means there were no
relevant changes encountered, so we can skip the COMMIT message too.
*/

------

8. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare_txn

@@ -441,10 +484,28 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
static void
pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
+ /*
+ * Don't send BEGIN PREPARE message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN PREPARE and COMMIT PREPARED messages
+ * to subscribers, using bandwidth on something with little/no use
+ * for logical replication.
+ */
+ pgoutput_begin_txn(ctx, txn);
+}

8a. =>

Like previously, I felt that this big comment should be at the
function level of pgoutput_begin_prepare_txn instead of in the body of
the function.

------

8b. =>

And then the body comment would be something simple like:

/* Delegate to assign the begin sent flag as false same as for the
BEGIN message. */
pgoutput_begin_txn(ctx, txn);

------

9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare

+
+static void
+pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

=>

Probably this needs a function comment.

------

10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_prepare_txn

@@ -459,8 +520,18 @@ static void
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
+ PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(data);
OutputPluginUpdateProgress(ctx);

+ /* skip PREPARE message if nothing was sent */
+ if (!data->sent_begin_txn)

=>

Maybe elaborate on that "skip PREPARE message if nothing was sent"
comment in a way similar to my review comment 7b. For example,

AFTER
/* If the BEGIN was not yet sent, then it means there were no relevant
changes encountered, so we can skip the PREPARE message too. */

------

11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_prepared_txn

@@ -471,12 +542,33 @@ pgoutput_prepare_txn(LogicalDecodingContext
*ctx, ReorderBufferTXN *txn,
*/
static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
{
+ PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
+
OutputPluginUpdateProgress(ctx);

+ /*
+ * skip sending COMMIT PREPARED message if prepared transaction
+ * has not been sent.
+ */
+ if (data)

=>

Similar to previous review comment 10, I think the reason for the skip
should be elaborated a little bit. For example,

AFTER
/* If the BEGIN PREPARE was not yet sent, then it means there were no
relevant changes encountered, so we can skip the COMMIT PREPARED
message too. */

------

12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_rollback_prepared_txn

=> Similar as for pgoutput_comment_prepared_txn (see review comment 11)

------

13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change

@@ -639,11 +749,16 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
Relation ancestor = NULL;

+ /* If not streaming, should have setup txndata as part of
BEGIN/BEGIN PREPARE */
+ if (!in_streaming)
+ Assert(txndata);
+
if (!is_publishable_relation(relation))
return;

13a. =>

I felt the streaming logic with the txndata is a bit confusing. I
think it would be easier to have another local variable (sent_begin)
and use it like this...

bool sent_begin;
if (in_streaming)
{
sent_begin = true;
else
{
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
Assert(txndata)
sent_begin = txn->sent_begin_txn;
}

...

------

+ /* output BEGIN if we haven't yet */

13b. =>

I thought the comment is not quite right

AFTER
/* Output BEGIN / BEGIN PREPARE if we haven't yet */

------

+ if (!in_streaming && !txndata->sent_begin_txn)
+ {
+ if (rbtxn_prepared(txn))
+ pgoutput_begin_prepare(ctx, txn);
+ else
+ pgoutput_begin(ctx, txn);
+ }
+

13.c =>

If you introduce the variable (as suggested in 13a) this code becomes
much simpler:

AFTER

if (!sent_begin)
{
if (rbtxn_prepared(txn))
pgoutput_begin_prepare(ctx, txn)
else
pgoutput_begin(ctx, txn);
}

------

14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate

=>

All the similar review comments made for pg_change (13a, 13b, 13c)
apply to pgoutput_truncate here also.

------

15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message

@@ -842,6 +980,7 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
const char *message)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata;
TransactionId xid = InvalidTransactionId;

=>

This variable should be declared in the block where it is used,
similar to the suggestion 13a.

Also is it just an accidental omission that you did Assert(txndata)
for all the other places but not here?

------

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Greg Nancarrow 2021-07-19 05:27:57 Re: [HACKERS] logical decoding of two-phase transactions
Previous Message Amit Khandekar 2021-07-19 05:23:22 Re: speed up verifying UTF-8