Re: logical replication empty transactions

From: Ajin Cherian <itsajin(at)gmail(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, Euler Taveira <euler(at)timbira(dot)com(dot)br>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Jeff Janes <jeff(dot)janes(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Craig Ringer <craig(at)2ndquadrant(dot)com>
Subject: Re: logical replication empty transactions
Date: 2021-07-21 10:58:45
Message-ID: CAFPTHDZT96BmZqJaOzXTQW0oK646csJvFwsBTNga1whobhNsUQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Jul 19, 2021 at 3:24 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:

> 1a. Commit Comment - wording
>
updated.
>
> 1b. Commit Comment - wording
>
updated.

> 2. doc/src/sgml/logicaldecoding.sgml - wording
>
> @@ -884,11 +884,19 @@ typedef void (*LogicalDecodePrepareCB) (struct
> LogicalDecodingContext *ctx,
> The required <function>commit_prepared_cb</function> callback is called
> whenever a transaction <command>COMMIT PREPARED</command> has
> been decoded.
> The <parameter>gid</parameter> field, which is part of the
> - <parameter>txn</parameter> parameter, can be used in this callback.
> + <parameter>txn</parameter> parameter, can be used in this callback. The
> + parameters <parameter>prepare_end_lsn</parameter> and
> + <parameter>prepare_time</parameter> can be used to check if the plugin
> + has received this <command>PREPARE TRANSACTION</command> in which case
> + it can commit the transaction, otherwise, it can skip the commit. The
> + <parameter>gid</parameter> alone is not sufficient because the downstream
> + node can have a prepared transaction with the same identifier.
>
> =>
>
> (some minor rewording of the last part)

updated.

>
> 3. src/backend/replication/logical/proto.c - whitespace
>
> @@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in,
> LogicalRepCommitPreparedTxnData *
> elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
>
> /* read fields */
> + prepare_data->prepare_end_lsn = pq_getmsgint64(in);
> + if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr)
> + elog(ERROR,"prepare_end_lsn is not set in commit prepared message");
>
> =>
>
> There is missing space before the 2nd elog param.
>

fixed.

>
> 4a. =>
>
> "and was essentially an empty prepare" --> "so was essentially an empty prepare"
>
> 4b. =>
>
> "In which case" --> "In this case"
>
> ------

fixed.

> I felt that since this message postponement is now the new behaviour
> of this function then probably this should all be a function level
> comment instead of the comment being in the body of the function
>
> ------
>
> 6. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin
>
> +
> +static void
> +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>
> =>
>
> Even though it is kind of obvious, it is probably better to provide a
> function comment here too
>
> ------

Changed accordingly.

>

> I felt that the comment "skip COMMIT message if nothing was sent"
> should be done at the point where you *decide* to skip or not. So you
> could either move that comment to where the skip variable is assigned.
> Or (my preference) leave the comment where it is but change the
> variable name to be sent_begin = !data->sent_begin_txn;
>

Updated the comment to where the skip variable is assigned.

> ------
>
> Regardless I think the comment should be elaborated a bit to describe
> the reason more.
>
> 7b. =>
>
> BEFORE
> /* skip COMMIT message if nothing was sent */
>
> AFTER
> /* If a BEGIN message was not yet sent, then it means there were no
> relevant changes encountered, so we can skip the COMMIT message too.
> */
>

Updated accordingly.

> ------

> Like previously, I felt that this big comment should be at the
> function level of pgoutput_begin_prepare_txn instead of in the body of
> the function.
>
> ------
>
> 8b. =>
>
> And then the body comment would be something simple like:
>
> /* Delegate to assign the begin sent flag as false same as for the
> BEGIN message. */
> pgoutput_begin_txn(ctx, txn);
>

Updated accordingly.

> ------
>
> 9. src/backend/replication/pgoutput/pgoutput.c - pgoutput_begin_prepare
>
> +
> +static void
> +pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
>
> =>
>
> Probably this needs a function comment.
>

Updated.

> ------
>
> 10. src/backend/replication/pgoutput/pgoutput.c - pgoutput_prepare_txn
>
> @@ -459,8 +520,18 @@ static void
> pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> XLogRecPtr prepare_lsn)
> {
> + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
> + Assert(data);
> OutputPluginUpdateProgress(ctx);
>
> + /* skip PREPARE message if nothing was sent */
> + if (!data->sent_begin_txn)
>
> =>
>
> Maybe elaborate on that "skip PREPARE message if nothing was sent"
> comment in a way similar to my review comment 7b. For example,
>
> AFTER
> /* If the BEGIN was not yet sent, then it means there were no relevant
> changes encountered, so we can skip the PREPARE message too. */
>

Updated.

> ------
>
> 11. src/backend/replication/pgoutput/pgoutput.c - pgoutput_commit_prepared_txn
>
> @@ -471,12 +542,33 @@ pgoutput_prepare_txn(LogicalDecodingContext
> *ctx, ReorderBufferTXN *txn,
> */
> static void
> pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> - XLogRecPtr commit_lsn)
> + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
> + TimestampTz prepare_time)
> {
> + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private;
> +
> OutputPluginUpdateProgress(ctx);
>
> + /*
> + * skip sending COMMIT PREPARED message if prepared transaction
> + * has not been sent.
> + */
> + if (data)
>
> =>
>
> Similar to previous review comment 10, I think the reason for the skip
> should be elaborated a little bit. For example,
>
> AFTER
> /* If the BEGIN PREPARE was not yet sent, then it means there were no
> relevant changes encountered, so we can skip the COMMIT PREPARED
> message too. */
>
> ------

Updated accordingly.

>
> 12. src/backend/replication/pgoutput/pgoutput.c - pgoutput_rollback_prepared_txn
>
> => Similar as for pgoutput_comment_prepared_txn (see review comment 11)
>
> ------

Updated,

>
> 13. src/backend/replication/pgoutput/pgoutput.c - pgoutput_change
>
> @@ -639,11 +749,16 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> Relation relation, ReorderBufferChange *change)
> {
> PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> MemoryContext old;
> RelationSyncEntry *relentry;
> TransactionId xid = InvalidTransactionId;
> Relation ancestor = NULL;
>
> + /* If not streaming, should have setup txndata as part of
> BEGIN/BEGIN PREPARE */
> + if (!in_streaming)
> + Assert(txndata);
> +
> if (!is_publishable_relation(relation))
> return;
>
> 13a. =>
>
> I felt the streaming logic with the txndata is a bit confusing. I
> think it would be easier to have another local variable (sent_begin)
> and use it like this...
>
> bool sent_begin;
> if (in_streaming)
> {
> sent_begin = true;
> else
> {
> PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
> Assert(txndata)
> sent_begin = txn->sent_begin_txn;
> }
>

I did not make the change, because in case of streaming "Sent_begin"
is not true, so it seemed incorrect coding it
that way. Instead , I have modified the comment to mention that
streaming transaction do not send BEG / BEGIN PREPARE.

> ...
>
> ------
>
> + /* output BEGIN if we haven't yet */
>
> 13b. =>
>
> I thought the comment is not quite right
>
> AFTER
> /* Output BEGIN / BEGIN PREPARE if we haven't yet */
>
> ------

Updated.

>
> + if (!in_streaming && !txndata->sent_begin_txn)
> + {
> + if (rbtxn_prepared(txn))
> + pgoutput_begin_prepare(ctx, txn);
> + else
> + pgoutput_begin(ctx, txn);
> + }
> +
>
> 13.c =>
>
> If you introduce the variable (as suggested in 13a) this code becomes
> much simpler:
>

Skipped this. (reason mentioned above)

> ------
>
> 14. src/backend/replication/pgoutput/pgoutput.c - pgoutput_truncate
>
> =>
>
> All the similar review comments made for pg_change (13a, 13b, 13c)
> apply to pgoutput_truncate here also.
>
> ------

Updated.

>
> 15. src/backend/replication/pgoutput/pgoutput.c - pgoutput_message
>
> @@ -842,6 +980,7 @@ pgoutput_message(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> const char *message)
> {
> PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> + PGOutputTxnData *txndata;
> TransactionId xid = InvalidTransactionId;
>
>
> =>
>
> This variable should be declared in the block where it is used,
> similar to the suggestion 13a.
>
> Also is it just an accidental omission that you did Assert(txndata)
> for all the other places but not here?
>

Moved location of the variable and added an assert.

regards,
Ajin Cherian
Fujitsu Australia

Attachment Content-Type Size
v8-0001-Skip-empty-transactions-for-logical-replication.patch application/octet-stream 28.2 KB

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Ajin Cherian 2021-07-21 11:00:14 Re: logical replication empty transactions
Previous Message David Rowley 2021-07-21 10:44:17 Re: Micro-optimizations to avoid some strlen calls.