From: | Peter Smith <smithpb2250(at)gmail(dot)com> |
---|---|
To: | "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com> |
Cc: | Andres Freund <andres(at)anarazel(dot)de>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(dot)oss(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Fabrice Chapuis <fabrice636861(at)gmail(dot)com>, Euler Taveira <euler(at)eulerto(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Petr Jelinek <petr(dot)jelinek(at)enterprisedb(dot)com>, "tanghy(dot)fnst(at)fujitsu(dot)com" <tanghy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com> |
Subject: | Re: Rework LogicalOutputPluginWriterUpdateProgress |
Date: | 2023-02-28 01:12:03 |
Message-ID: | CAHut+Pt3ZEMo-KTF=5KJSU+HdWJD19GPGGCKOmBeM47484Ychw@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Here are some comments for the v2-0001 patch.
(I haven't looked at the v3 that was posted overnight; maybe some of
my comments have already been addressed.)
======
General
1. (Info from the commit message)
Since we can know whether the change is an end of transaction change in the
common code, we removed the LogicalDecodingContext->end_xact introduced in
commit f95d53e.
~
TBH, it was not clear to me that this change was an improvement. IIUC,
it removes the "unnecessary" member, but only does that by replacing
it everywhere with a boolean parameter passed to
update_progress_and_keepalive(). So the end result seems no less code,
but it is less readable code now because you need to know what the
true/false parameter means. I wonder if it would have been better just
to leave this how it was.
======
src/backend/replication/logical/logical.c
2. General - blank lines
There are multiple places in this file where the patch removed some
statements but left blank lines. The result is 2 blank lines remaining
instead of one.
see change_cb_wrapper.
see truncate_cb_wrapper.
see stream_start_cb_wrapper.
see stream_stop_cb_wrapper.
see stream_change_cb_wrapper.
e.g.
BEFORE
ctx->write_location = last_lsn;
ctx->end_xact = false;
/* in streaming mode, stream_stop_cb is required */
AFTER (now there are 2 blank lines)
ctx->write_location = last_lsn;
/* in streaming mode, stream_stop_cb is required */
~~~
3. General - calls to is_skip_threshold_change()
+ if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);
There are multiple calls like this, which are guarding the
update_progress_and_keepalive() with the is_skip_threshold_change()
- See truncate_cb_wrapper
- See message_cb_wrapper
- See stream_change_cb_wrapper
- See stream_message_cb_wrapper
- See stream_truncate_cb_wrapper
- See UpdateDecodingProgressAndKeepalive
IIUC, then I was thinking all those conditions maybe can be pushed
down *into* the wrapper, thereby making every calling code simpler.
e.g. make the wrapper function code look similar to the current
UpdateDecodingProgressAndKeepalive:
BEFORE (update_progress_and_keepalive)
{
if (!ctx->update_progress_and_keepalive)
return;
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
AFTER
{
if (!ctx->update_progress_and_keepalive)
return;
if (finished_xact || is_skip_threshold_change(ctx))
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
}
~~~
4. StartupDecodingContext
@@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
- LogicalOutputPluginWriterUpdateProgress update_progress)
+ LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)
TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.
I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.
~~~
5. @@ -334,7 +329,7 @@ CreateInitDecodingContext(const char *plugin,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
- LogicalOutputPluginWriterUpdateProgress update_progress)
+ LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)
(Ditto previous comment #4)
TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.
I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.
~~~
6. CreateDecodingContext
@@ -493,7 +488,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
- LogicalOutputPluginWriterUpdateProgress update_progress)
+ LogicalOutputPluginWriterUpdateProgressAndKeepalive
update_progress_and_keepalive)
(Ditto previous comment #4)
TBH, I find it confusing that the new parameter name
('update_progress_and_keepalive') is identical to the static function
name in the same C source file. It introduces a kind of unnecessary
shadowing and makes it harder to search/read the code.
I suggest just calling this param something unique and local to the
function like 'do_update_keepalive'.
~~~
7. OutputPluginPrepareWrite
@@ -662,7 +657,7 @@ void
OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
{
if (!ctx->accept_writes)
- elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+ elog(ERROR, "writes are only accepted in callbacks in the
OutputPluginCallbacks structure (except startup, shutdown,
filter_by_origin and filter_prepare callbacks)");
It seems a confusing error message. Can it be worded better? Also, I
noticed this flag is never used except in this one place where it
throws an error, so would an "Assert" would be more appropriate here?
~~~
8. rollback_prepared_cb_wrapper
/*
* If the plugin support two-phase commits then rollback prepared callback
* is mandatory
+ *
+ * FIXME: This should have been caught much earlier.
*/
if (ctx->callbacks.rollback_prepared_cb == NULL)
~
Is this FIXME related to the current patch, or should this be an
entirely different topic?
~~~
9. is_skip_threshold_change
The current usage for this function is like:
if (is_skip_threshold_change(ctx))
+ update_progress_and_keepalive(ctx, false);
~
IMO a better name for this function might be like
'is_change_threshold_exceeded()' (or
'is_keepalive_threshold_exceeded()' etc) because seems more readable
to say
if (is_change_threshold_exceeded())
do_something();
~~~
10. is_skip_threshold_change
static bool
is_skip_threshold_change(struct LogicalDecodingContext *ctx)
{
static int changes_count = 0; /* used to accumulate the number of
* changes */
/* If the change was published, reset the counter and return false */
if (ctx->did_write)
{
changes_count = 0;
return false;
}
/*
* It is possible that the data is not sent to downstream for a long time
* either because the output plugin filtered it or there is a DDL that
* generates a lot of data that is not processed by the plugin. So, in
* such cases, the downstream can timeout. To avoid that we try to send a
* keepalive message if required. Trying to send a keepalive message
* after every change has some overhead, but testing showed there is no
* noticeable overhead if we do it after every ~100 changes.
*/
#define CHANGES_THRESHOLD 100
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
{
changes_count = 0;
return true;
}
return false;
}
~
That 2nd condition checking if (!ctx->did_write && ++changes_count >=
CHANGES_THRESHOLD) does not seem right. There is no need to check the
ctx->did_write; it must be false because it was checked earlier in the
function:
BEFORE
if (!ctx->did_write && ++changes_count >= CHANGES_THRESHOLD)
SUGGESTION1
Assert(!ctx->did_write);
if (++changes_count >= CHANGES_THRESHOLD)
SUGGESTION2
if (++changes_count >= CHANGES_THRESHOLD)
~~~
11. update_progress_and_keepalive
/*
* Update progress tracking and send keep alive (if required).
*/
static void
update_progress_and_keepalive(struct LogicalDecodingContext *ctx,
bool finished_xact)
{
if (!ctx->update_progress_and_keepalive)
return;
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
~
Maybe it's simpler to code this without the return.
e.g.
if (ctx->update_progress_and_keepalive)
{
ctx->update_progress_and_keepalive(ctx, ctx->write_location,
ctx->write_xid, ctx->did_write,
finished_xact);
}
(it is just generic suggested code for example -- I made some other
review comments overlapping this)
======
.../replication/logical/reorderbuffer.c
12. ReorderBufferAbort
+ UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb->private_data,
+ xid, lsn, !TransactionIdIsValid(txn->toplevel_xid));
+
I didn't really recognise how the
"!TransactionIdIsValid(txn->toplevel_xid)" maps to the boolean
'finished_xact' param. Can this call have an explanatory comment about
how it works?
======
src/backend/replication/walsender.c
~~~
13. WalSndUpdateProgressAndKeepalive
- if (pending_writes || (!end_xact &&
+ if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout / 2)))
- ProcessPendingWrites();
+ WalSndSendPending();
Is this new function name OK to be WalSndSendPending? From this code,
we can see it can also be called in other scenarios even when there is
nothing "pending" at all.
------
Kind Regards,
Peter Smith.
Fujitsu Australia
From | Date | Subject | |
---|---|---|---|
Next Message | Andres Freund | 2023-02-28 01:21:37 | Re: Should vacuum process config file reload more often |
Previous Message | Nathan Bossart | 2023-02-28 00:44:52 | Re: Improve WALRead() to suck data directly from WAL buffers when possible |