Re: Rework LogicalOutputPluginWriterUpdateProgress

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Andres Freund <andres(at)anarazel(dot)de>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(dot)oss(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Fabrice Chapuis <fabrice636861(at)gmail(dot)com>, Euler Taveira <euler(at)eulerto(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Petr Jelinek <petr(dot)jelinek(at)enterprisedb(dot)com>, "tanghy(dot)fnst(at)fujitsu(dot)com" <tanghy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>
Subject: Re: Rework LogicalOutputPluginWriterUpdateProgress
Date: 2023-02-28 01:12:03
Message-ID: CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are some comments for the v2-0001 patch.

(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)

======
General

1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced in
commit f95d53e.

~

TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.

======
src/backend/replication/logical/logical.c

2. General - blank lines

There are multiple places in this file where the patch removed some
statements but left blank lines. The result is 2 blank lines remaining
instead of one.

see change_cb_wrapper.
see truncate_cb_wrapper.
see stream_start_cb_wrapper.
see stream_stop_cb_wrapper.
see stream_change_cb_wrapper.

e.g.

BEFORE
ctx->write_location = last_lsn;

ctx->end_xact = false;

/* in streaming mode, stream_stop_cb is required */

AFTER (now there are 2 blank lines)
ctx->write_location = last_lsn;

/* in streaming mode, stream_stop_cb is required */

~~~

3. General - calls to is_skip_threshold_change()

+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive

IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.

e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:

BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;

if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
}

~~~

4. StartupDecodingContext

@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
- LogicalOutputPluginWriterUpdateProgress update_progress)
+ LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
- LogicalOutputPluginWriterUpdateProgress update_progress)
+ LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

6. CreateDecodingContext

@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
- LogicalOutputPluginWriterUpdateProgress update_progress)
+ LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)

(Ditto previous comment #4)

TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.

I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.

~~~

7. OutputPluginPrepareWrite

@@ -662,7 +657,7 @@ void
OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
{
if (!ctx->accept_writes)
- elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+ elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown,
filter_by_origin and filter_prepare callbacks)");

It seems a confusing error message. Can it be worded better? Also, I
noticed this flag is never used except in this one place where it
throws an error, so would an "Assert" would be more appropriate here?

~~~

8. rollback_prepared_cb_wrapper

/*
* If the plugin support two-phase commits then rollback prepared callback
* is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
*/
if (ctx->callbacks.rollback_prepared_cb == NULL)
~
Is this FIXME related to the current patch, or should this be an
entirely different topic?

~~~

9. is_skip_threshold_change

The current usage for this function is like:

if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);

~

IMO a better name for this function might be like
'is_change_threshold_exceeded()' (or
'is_keepalive_threshold_exceeded()' etc) because seems more readable
to say

if (is_change_threshold_exceeded())
do_something();

~~~

10. is_skip_threshold_change

static bool
is_skip_threshold_change(struct LogicalDecodingContext *ctx)
{
static int changes_count = 0; /* used to accumulate the number of
* changes */

/* If the change was published, reset the counter and return false */
if (ctx->did_write)
{
changes_count = 0;
return false;
}

/*
* It is possible that the data is not sent to downstream for a long time
* either because the output plugin filtered it or there is a DDL that
* generates a lot of data that is not processed by the plugin. So, in
* such cases, the downstream can timeout. To avoid that we try to send a
* keepalive message if required. Trying to send a keepalive message
* after every change has some overhead, but testing showed there is no
* noticeable overhead if we do it after every ~100 changes.
*/
#define CHANGES_THRESHOLD 100
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
{
changes_count = 0;
return true;
}

return false;
}

~

That 2nd condition checking if (!ctx->did_write && ++changes_count >=
CHANGES_THRESHOLD) does not seem right. There is no need to check the
ctx->did_write; it must be false because it was checked earlier in the
function:

BEFORE
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)

SUGGESTION1
Assert(!ctx->did_write);
if (++changes_count >= CHANGES_THRESHOLD)

SUGGESTION2
if (++changes_count >= CHANGES_THRESHOLD)

~~~

11. update_progress_and_keepalive

/*
* Update progress tracking and send keep alive (if required).
*/
static void
update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
bool finished_xact)
{
if (!ctx->update_progress_and_keepalive)
return;

ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

~

Maybe it's simpler to code this without the return.

e.g.

if (ctx->update_progress_and_keepalive)
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}

(it is just generic suggested code for example -- I made some other
review comments overlapping this)

======
.../replication/logical/reorderbuffer.c

12. ReorderBufferAbort

+ UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb->private_data,
+ xid, lsn, !TransactionIdIsValid(txn->toplevel_xid));
+

I didn't really recognise how the
"!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean
'finished_xact' param. Can this call have an explanatory comment about
how it works?

======
src/backend/replication/walsender.c

~~~

13. WalSndUpdateProgressAndKeepalive

- if (pending_writes || (!end_xact &&
+ if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout / 2)))
- ProcessPendingWrites();
+ WalSndSendPending();

Is this new function name OK to be WalSndSendPending? From this code,
we can see it can also be called in other scenarios even when there is
nothing "pending" at all.

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2023-02-28 01:21:37 Re: Should vacuum process config file reload more often
Previous Message Nathan Bossart 2023-02-28 00:44:52 Re: Improve WALRead() to suck data directly from WAL buffers when possible