From 1d89c84b1465b28ddef8c110500c3744477488df Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 8 Feb 2023 11:57:37 -0800 Subject: [PATCH v1] WIP: Initial sketch of progress update rework --- src/include/replication/logical.h | 7 +- src/include/replication/output_plugin.h | 1 - src/include/replication/reorderbuffer.h | 12 -- src/backend/replication/logical/logical.c | 188 ++++++++++-------- .../replication/logical/reorderbuffer.c | 20 -- src/backend/replication/pgoutput/pgoutput.c | 10 - src/backend/replication/walsender.c | 14 +- 7 files changed, 116 insertions(+), 136 deletions(-) diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5f49554ea05..472f2a5b84c 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -27,8 +27,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid, - bool skipped_xact -); + bool did_write, + bool finished_xact); typedef struct LogicalDecodingContext { @@ -105,10 +105,9 @@ typedef struct LogicalDecodingContext */ bool accept_writes; bool prepared_write; + bool did_write; XLogRecPtr write_location; TransactionId write_xid; - /* Are we processing the end LSN of a transaction? */ - bool end_xact; } LogicalDecodingContext; diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 2d89d26586e..b9358e15444 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -245,6 +245,5 @@ typedef struct OutputPluginCallbacks /* Functions in replication/logical/logical.c */ extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write); extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write); -extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact); #endif /* OUTPUT_PLUGIN_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 215d1494e90..e5db041df18 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -526,12 +526,6 @@ typedef void (*ReorderBufferStreamTruncateCB) ( Relation relations[], ReorderBufferChange *change); -/* update progress txn callback signature */ -typedef void (*ReorderBufferUpdateProgressTxnCB) ( - ReorderBuffer *rb, - ReorderBufferTXN *txn, - XLogRecPtr lsn); - struct ReorderBuffer { /* @@ -595,12 +589,6 @@ struct ReorderBuffer ReorderBufferStreamMessageCB stream_message; ReorderBufferStreamTruncateCB stream_truncate; - /* - * Callback to be called when updating progress during sending data of a - * transaction (and its subtransactions) to the output plugin. - */ - ReorderBufferUpdateProgressTxnCB update_progress_txn; - /* * Pointer that will be passed untouched to the callbacks. */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c3ec97a0a62..92eae378d98 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -93,10 +93,9 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); -/* callback to update txn's progress */ -static void update_progress_txn_cb_wrapper(ReorderBuffer *cache, - ReorderBufferTXN *txn, - XLogRecPtr lsn); + +static void update_progress(struct LogicalDecodingContext *ctx, + bool finished_xact); static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin); @@ -283,12 +282,6 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->commit_prepared = commit_prepared_cb_wrapper; ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper; - /* - * Callback to support updating progress during sending data of a - * transaction (and its subtransactions) to the output plugin. - */ - ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper; - ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -661,6 +654,7 @@ FreeDecodingContext(LogicalDecodingContext *ctx) void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write) { + /* FIXME: streaming and 2PC support made this message inaccurate */ if (!ctx->accept_writes) elog(ERROR, "writes are only accepted in commit, begin and change callbacks"); @@ -679,20 +673,7 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write); ctx->prepared_write = false; -} - -/* - * Update progress tracking (if supported). - */ -void -OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, - bool skipped_xact) -{ - if (!ctx->update_progress) - return; - - ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, - skipped_xact); + ctx->did_write = true; } /* @@ -759,7 +740,6 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i /* set output state */ ctx->accept_writes = false; - ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.startup_cb(ctx, opt, is_init); @@ -787,7 +767,6 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx) /* set output state */ ctx->accept_writes = false; - ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.shutdown_cb(ctx); @@ -823,7 +802,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; - ctx->end_xact = false; + ctx->did_write = false; /* do the actual work: call callback */ ctx->callbacks.begin_cb(ctx, txn); @@ -855,13 +834,15 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ - ctx->end_xact = true; + ctx->did_write = false; /* do the actual work: call callback */ ctx->callbacks.commit_cb(ctx, txn, commit_lsn); /* Pop the error context stack */ error_context_stack = errcallback.previous; + + update_progress(ctx, true); } /* @@ -896,7 +877,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; - ctx->end_xact = false; + ctx->did_write = false; /* * If the plugin supports two-phase commits then begin prepare callback is @@ -913,6 +894,9 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) /* Pop the error context stack */ error_context_stack = errcallback.previous; + + /* XXX: arguably this does end a transaction */ + update_progress(ctx, false); } static void @@ -941,7 +925,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ - ctx->end_xact = true; + ctx->did_write = false; /* * If the plugin supports two-phase commits then prepare callback is @@ -958,6 +942,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + update_progress(ctx, true); } static void @@ -986,7 +972,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ - ctx->end_xact = true; + ctx->did_write = false; /* * If the plugin support two-phase commits then commit prepared callback @@ -1003,6 +989,8 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + update_progress(ctx, true); } static void @@ -1032,11 +1020,13 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ - ctx->end_xact = true; + ctx->did_write = false; /* * If the plugin support two-phase commits then rollback prepared callback * is mandatory + * + * FIXME: This should have been caught much earlier. */ if (ctx->callbacks.rollback_prepared_cb == NULL) ereport(ERROR, @@ -1050,6 +1040,8 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + update_progress(ctx, true); } static void @@ -1074,6 +1066,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* set output state */ ctx->accept_writes = true; ctx->write_xid = txn->xid; + ctx->did_write = false; /* * Report this change's lsn so replies from clients can give an up-to-date @@ -1083,12 +1076,29 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; - ctx->end_xact = false; ctx->callbacks.change_cb(ctx, txn, relation, change); /* Pop the error context stack */ error_context_stack = errcallback.previous; + + /* + * It is possible that the data is not sent to downstream for a long time + * either because the output plugin filtered it or there is a DDL that + * generates a lot of data that is not processed by the plugin. So, in + * such cases, the downstream can timeout. To avoid that we try to send a + * keepalive message if required. Trying to send a keepalive message + * after every change has some overhead, but testing showed there is no + * noticeable overhead if we do it after every ~100 changes. + * + * FIXME: Need to count the number of changes, or come up with some other + * metric. + */ +#define CHANGES_THRESHOLD 100 + if (!ctx->did_write) + { + update_progress(ctx, false); + } } static void @@ -1102,7 +1112,7 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Assert(!ctx->fast_forward); if (!ctx->callbacks.truncate_cb) - return; + goto out; /* Push callback + info on the error context stack */ state.ctx = ctx; @@ -1116,6 +1126,7 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* set output state */ ctx->accept_writes = true; ctx->write_xid = txn->xid; + ctx->did_write = false; /* * Report this change's lsn so replies from clients can give an up-to-date @@ -1125,12 +1136,14 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; - ctx->end_xact = false; ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ error_context_stack = errcallback.previous; + +out: + update_progress(ctx, false); } bool @@ -1154,7 +1167,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, /* set output state */ ctx->accept_writes = false; - ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid); @@ -1185,7 +1197,6 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) /* set output state */ ctx->accept_writes = false; - ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id); @@ -1193,6 +1204,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) /* Pop the error context stack */ error_context_stack = errcallback.previous; + /* FIXME: I think we need to call update_progress occasionally */ + return ret; } @@ -1208,7 +1221,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Assert(!ctx->fast_forward); if (ctx->callbacks.message_cb == NULL) - return; + goto out; /* Push callback + info on the error context stack */ state.ctx = ctx; @@ -1223,7 +1236,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; - ctx->end_xact = false; + ctx->did_write = false; /* do the actual work: call callback */ ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1231,6 +1244,10 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + +out: + /* XXX: Hm, not sure what's the right thing is here */ + update_progress(ctx, false); } static void @@ -1258,6 +1275,7 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* set output state */ ctx->accept_writes = true; ctx->write_xid = txn->xid; + ctx->did_write = false; /* * Report this message's lsn so replies from clients can give an @@ -1267,7 +1285,6 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = first_lsn; - ctx->end_xact = false; /* in streaming mode, stream_start_cb is required */ if (ctx->callbacks.stream_start_cb == NULL) @@ -1280,6 +1297,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + /* don't call update progress, we didn't really make any */ } static void @@ -1307,6 +1326,7 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* set output state */ ctx->accept_writes = true; ctx->write_xid = txn->xid; + ctx->did_write = false; /* * Report this message's lsn so replies from clients can give an @@ -1316,7 +1336,6 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = last_lsn; - ctx->end_xact = false; /* in streaming mode, stream_stop_cb is required */ if (ctx->callbacks.stream_stop_cb == NULL) @@ -1329,6 +1348,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + /* don't call update progress, we didn't really make any */ } static void @@ -1357,7 +1378,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = abort_lsn; - ctx->end_xact = true; + ctx->did_write = false; /* in streaming mode, stream_abort_cb is required */ if (ctx->callbacks.stream_abort_cb == NULL) @@ -1370,6 +1391,9 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + /* XXX: Progress wasn't updated in pgoutput */ + update_progress(ctx, true); } static void @@ -1402,7 +1426,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; - ctx->end_xact = true; + ctx->did_write = false; /* in streaming mode with two-phase commits, stream_prepare_cb is required */ if (ctx->callbacks.stream_prepare_cb == NULL) @@ -1415,6 +1439,8 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + update_progress(ctx, true); } static void @@ -1443,7 +1469,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; - ctx->end_xact = true; + ctx->did_write = false; /* in streaming mode, stream_commit_cb is required */ if (ctx->callbacks.stream_commit_cb == NULL) @@ -1456,6 +1482,8 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + update_progress(ctx, true); } static void @@ -1483,6 +1511,7 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* set output state */ ctx->accept_writes = true; ctx->write_xid = txn->xid; + ctx->did_write = false; /* * Report this change's lsn so replies from clients can give an up-to-date @@ -1492,7 +1521,6 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; - ctx->end_xact = false; /* in streaming mode, stream_change_cb is required */ if (ctx->callbacks.stream_change_cb == NULL) @@ -1505,6 +1533,15 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + + /* + * FIXME: See change_cb_wrapper(). Probably should be in a helper + * function. + */ + if (!ctx->did_write) + { + update_progress(ctx, false); + } } static void @@ -1523,7 +1560,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* this callback is optional */ if (ctx->callbacks.stream_message_cb == NULL) - return; + goto out; /* Push callback + info on the error context stack */ state.ctx = ctx; @@ -1538,7 +1575,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; - ctx->end_xact = false; + ctx->did_write = false; /* do the actual work: call callback */ ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1546,6 +1583,10 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Pop the error context stack */ error_context_stack = errcallback.previous; + +out: + /* XXX: Hm, not sure what's the right thing is here */ + update_progress(ctx, false); } static void @@ -1578,6 +1619,7 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* set output state */ ctx->accept_writes = true; ctx->write_xid = txn->xid; + ctx->did_write = false; /* * Report this change's lsn so replies from clients can give an up-to-date @@ -1587,51 +1629,33 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; - ctx->end_xact = false; ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ error_context_stack = errcallback.previous; + + /* truncates aren't progress on their own */ } + +/* + * Update progress tracking (if required). + * + * FIXME: This should now get a different name. + + * FIXME: I think instead of skipped_xact we should just track whether + * something was written? + */ static void -update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr lsn) +update_progress(struct LogicalDecodingContext *ctx, + bool finished_xact) { - LogicalDecodingContext *ctx = cache->private_data; - LogicalErrorCallbackState state; - ErrorContextCallback errcallback; + if (!ctx->update_progress) + return; - Assert(!ctx->fast_forward); - - /* Push callback + info on the error context stack */ - state.ctx = ctx; - state.callback_name = "update_progress_txn"; - state.report_location = lsn; - errcallback.callback = output_plugin_error_callback; - errcallback.arg = (void *) &state; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* set output state */ - ctx->accept_writes = false; - ctx->write_xid = txn->xid; - - /* - * Report this change's lsn so replies from clients can give an up-to-date - * answer. This won't ever be enough (and shouldn't be!) to confirm - * receipt of this transaction, but it might allow another transaction's - * commit to be confirmed with one message. - */ - ctx->write_location = lsn; - - ctx->end_xact = false; - - OutputPluginUpdateProgress(ctx, false); - - /* Pop the error context stack */ - error_context_stack = errcallback.previous; + ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, + ctx->did_write, finished_xact); } /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index d5f90a5f5d2..0468d12936f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2100,8 +2100,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, PG_TRY(); { ReorderBufferChange *change; - int changes_count = 0; /* used to accumulate the number of - * changes */ if (using_subtxn) BeginInternalSubTransaction(streaming ? "stream" : "replay"); @@ -2442,24 +2440,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, elog(ERROR, "tuplecid value in changequeue"); break; } - - /* - * It is possible that the data is not sent to downstream for a - * long time either because the output plugin filtered it or there - * is a DDL that generates a lot of data that is not processed by - * the plugin. So, in such cases, the downstream can timeout. To - * avoid that we try to send a keepalive message if required. - * Trying to send a keepalive message after every change has some - * overhead, but testing showed there is no noticeable overhead if - * we do it after every ~100 changes. - */ -#define CHANGES_THRESHOLD 100 - - if (++changes_count >= CHANGES_THRESHOLD) - { - rb->update_progress_txn(rb, txn, change->lsn); - changes_count = 0; - } } /* speculative insertion record must be freed by now */ diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 73b080060da..dc2b958437b 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -584,7 +584,6 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * from this transaction has been sent to the downstream. */ sent_begin_txn = txndata->sent_begin_txn; - OutputPluginUpdateProgress(ctx, !sent_begin_txn); pfree(txndata); txn->output_plugin_private = NULL; @@ -623,8 +622,6 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - OutputPluginUpdateProgress(ctx, false); - OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); @@ -637,8 +634,6 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx, false); - OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); @@ -653,8 +648,6 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - OutputPluginUpdateProgress(ctx, false); - OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, prepare_time); @@ -1895,8 +1888,6 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx, false); - OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); @@ -1916,7 +1907,6 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4ed3747e3f9..697f8be941e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -251,7 +251,7 @@ static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, - bool skipped_xact); + bool did_write, bool finished_xact); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); @@ -1465,12 +1465,11 @@ ProcessPendingWrites(void) */ static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, - bool skipped_xact) + bool did_write, bool finished_xact) { static TimestampTz sendTime = 0; TimestampTz now = GetCurrentTimestamp(); bool pending_writes = false; - bool end_xact = ctx->end_xact; /* * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to @@ -1481,8 +1480,9 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId * transaction LSN. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 - if (end_xact && TimestampDifferenceExceeds(sendTime, now, - WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) + if (finished_xact && + TimestampDifferenceExceeds(sendTime, now, + WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) { LagTrackerWrite(lsn, now); sendTime = now; @@ -1496,7 +1496,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId * the worst case we will just send an extra keepalive message when it is * really not required. */ - if (skipped_xact && + if (finished_xact && !did_write && SyncRepRequested() && ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined) { @@ -1518,7 +1518,7 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId * for large transactions where we don't send any changes to the * downstream and the receiver can timeout due to that. */ - if (pending_writes || (!end_xact && + if (pending_writes || (!finished_xact && now >= TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout / 2))) ProcessPendingWrites(); -- 2.38.0