From c6f6ee18707a3dea988b98c6c5351d816a67be70 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 26 Sep 2019 17:26:31 +0200 Subject: [PATCH 04/17] Extend the output plugin API with stream methods This adds four methods the output plugin API, adding support for streaming changes for large transactions. * stream_message * stream_change * stream_truncate * stream_abort * stream_commit * stream_start * stream_stop Most of this is a simple extension of the existing methods, with the semantic difference that the transaction (or subtransaction) is incomplete and may be aborted later (which is something the regular API does not really need to deal with). This also extends the 'test_decoding' plugin, implementing these new stream methods. The stream_start/start_stop are used to demarcate the a chunk of changes streamed for a particular toplevel transaction. --- contrib/test_decoding/test_decoding.c | 100 ++++++ doc/src/sgml/logicaldecoding.sgml | 209 +++++++++++++ src/backend/replication/logical/logical.c | 361 ++++++++++++++++++++++ src/include/replication/logical.h | 5 + src/include/replication/output_plugin.h | 69 +++++ src/include/replication/reorderbuffer.h | 57 ++++ 6 files changed, 801 insertions(+) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 6c33c4bded..9c77791dd5 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -62,6 +62,28 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_stream_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); +static void pg_decode_stream_change(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); +static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change); +static void pg_decode_stream_abort(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static void pg_decode_stream_commit(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr apply_lsn); +static void pg_decode_stream_start(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pg_decode_stream_stop(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); void _PG_init(void) @@ -83,6 +105,13 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->stream_message_cb = pg_decode_stream_message; + cb->stream_change_cb = pg_decode_stream_change; + cb->stream_truncate_cb = pg_decode_stream_truncate; + cb->stream_abort_cb = pg_decode_stream_abort; + cb->stream_commit_cb = pg_decode_stream_commit; + cb->stream_start_cb = pg_decode_stream_start; + cb->stream_stop_cb = pg_decode_stream_stop; } @@ -542,3 +571,74 @@ pg_decode_message(LogicalDecodingContext *ctx, appendBinaryStringInfo(ctx->out, message, sz); OutputPluginWrite(ctx, true); } + +static void +pg_decode_stream_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:", + transactional, prefix, sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_change(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_abort(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid); + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_commit(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr apply_lsn) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid); + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_start(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid); + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_stop(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid); + OutputPluginWrite(ctx, true); +} diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 8db968641e..fc4ad65eae 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -388,6 +388,13 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + LogicalDecodeStreamChangeCB stream_change_cb; + LogicalDecodeStreamTruncateCB stream_truncate_cb; + LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamCommitCB stream_commit_cb; + LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamStartCB stream_start_cb; + LogicalDecodeStreamStopCB stream_stop_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); @@ -400,6 +407,15 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); If truncate_cb is not set but a TRUNCATE is to be decoded, the action will be ignored. + + + An output plugin may also define functions to support streaming of large, + in-progress transactions. The stream_change_cb, + stream_commit_cb, stream_abort_cb, + stream_start_cb and stream_stop_cb + are required, while stream_message_cb and + stream_message_cb are optional. + @@ -678,6 +694,112 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, + + Stream Start Callback + + The stream_start_cb callback is called when opening + a block of streamed changes from an in-progress transaction. + +typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + + + + + + Stream Stop Callback + + The stream_stop_cb callback is called when closing + a block of streamed changes from an in-progress transaction. + +typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + + + + + + Stream Change Callback + + The stream_change_cb callback is called when sending + a change in a block of streamed changes (demarcated by + stream_start_cb and stream_stop_cb calls). + +typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + + + + + + Stream Truncate Callback + + The stream_truncate_cb callback is called for a + TRUNCATE command in a block of streamed changes + (demarcated by stream_start_cb and + stream_stop_cb calls). + +typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + + The parameters are analogous to the stream_change_cb + callback. However, because TRUNCATE actions on + tables connected by foreign keys need to be executed together, this + callback receives an array of relations instead of just a single one. + See the description of the statement for + details. + + + + + Stream Message Callback + + The stream_message_cb callback is called when sending + a generic message in a block of streamed changes (demarcated by + stream_start_cb and stream_stop_cb calls). + +typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + + + + + + Stream Commit Callback + + The stream_commit_cb callback is called to commit + a previously streamed transaction. + +typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + + + + + + Stream Abort Callback + + The stream_abort_cb callback is called to abort + a previously streamed transaction. + +typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + @@ -746,4 +868,91 @@ OutputPluginWrite(ctx, true); + + + Streaming of Large Transactions for Logical Decoding + + + The basic output plugin callbacks (e.g. begin_cb, + change_cb, commit_cb and + message_cb) are only invoked when the transaction + actually commits. The changes are still decoded from the transaction + log, but are only passed to the output plugin at commit (and discarded + if the transaction aborts). + + + + This means that while the decoding happens incrementally, and may spill + to disk to keep memory usage under control, all the decoded changes have + to be transmitted when the transaction finally commits (or more precisely, + when the commit is decoded from the transaction log). Depending on the + size of the transaction size and network bandwidth, the transfer time + may significantly increase the apply lag. + + + + To reduce the apply lag caused by large transactions, an output plugin + may provide additional callback to support incremental streaming of + in-progress transactions. There are multiple required streaming callbacks + (stream_change_cb, stream_commit_cb, + stream_abort_cb, stream_start_cb + and stream_stop_cb) and one optional callback + (stream_message_cb). + + + + When streaming an in-progress transaction, the changes (and messages) are + streamed in blocks demarcated by stream_start_cb + and stream_stop_cb callbacks. Once all the decoded + changes are transmitted, the transaction is committed using the + stream_commit_cb callback (or possibly aborted using + the stream_abort_cb callback). + + + + One example sequence of streaming callback calls for one transaction may + look like this: + +stream_start_cb(...); <-- start of first block of changes + stream_change_cb(...); + stream_change_cb(...); + stream_message_cb(...); + stream_change_cb(...); + ... + stream_change_cb(...); +stream_stop_cb(...); <-- end of first block of changes + +stream_start_cb(...); <-- start of second block of changes + stream_change_cb(...); + stream_change_cb(...); + stream_change_cb(...); + ... + stream_message_cb(...); + stream_change_cb(...); +stream_stop_cb(...); <-- end of second block of changes + +stream_commit_cb(...); <-- commit of the streamed transaction + + + + + The actual sequence of callback calls may be more complicated, of course. + There may be blocks for multiple streamed transactions, some of the + transactions may get aborted, etc. + + + + Similarly to spill-to-disk behavior, streaming is triggered when the total + amount of changes decoded from the WAL (for all in-progress transactions) + exceeds limit defined by logical_work_mem setting. At + that point the largest toplevel transaction (measured by amount of memory + currently used for decoded changes) is selected and streamed. + + + + Even when streaming large transactions, the changes are still applied in + commit order, preserving the same guarantees as the non-streaming mode. + + + diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7e06615864..b88b58505a 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -65,6 +65,21 @@ static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +/* streaming callbacks */ +static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); +static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change); +static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message); +static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr apply_lsn); +static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); +static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); + static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); /* @@ -189,6 +204,39 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + /* + * To support streaming, we require change/commit/abort callbacks. The + * message callback is optional, similarly to regular output plugins. We + * however enable streaming when at least one of the methods is enabled, + * so that we can easily identify missing methods. + * + * We decide it here, but only check it later in the wrappers. + */ + ctx->streaming = (ctx->callbacks.stream_change_cb != NULL) || + (ctx->callbacks.stream_abort_cb != NULL) || + (ctx->callbacks.stream_message_cb != NULL) || + (ctx->callbacks.stream_truncate_cb != NULL) || + (ctx->callbacks.stream_commit_cb != NULL) || + (ctx->callbacks.stream_start_cb != NULL) || + (ctx->callbacks.stream_stop_cb != NULL); + + /* + * streaming callbacks + * + * stream_message and stream_truncate callbacks are optional, + * so we do not fail with ERROR when missing, but the wrappers + * simply do nothing. We must set the ReorderBuffer callbacks + * to something, otherwise the calls from there will crash (we + * don't want to move the checks there). + */ + ctx->reorder->stream_change = stream_change_cb_wrapper; + ctx->reorder->stream_abort = stream_abort_cb_wrapper; + ctx->reorder->stream_commit = stream_commit_cb_wrapper; + ctx->reorder->stream_start = stream_start_cb_wrapper; + ctx->reorder->stream_stop = stream_stop_cb_wrapper; + ctx->reorder->stream_message = stream_message_cb_wrapper; + ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -863,6 +911,319 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_change"; + state.report_location = change->lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = change->lsn; + + /* in streaming mode, stream_change_cb is required */ + if (ctx->callbacks.stream_change_cb == NULL) + ereport(ERROR, + (errmsg("Output plugin supports streaming, but has not registered " + "stream_change_cb callback."))); + + ctx->callbacks.stream_change_cb(ctx, txn, relation, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (!ctx->callbacks.stream_truncate_cb) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_truncate"; + state.report_location = change->lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = change->lsn; + + ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (ctx->callbacks.stream_message_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_message"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix, + message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_abort"; + state.report_location = abort_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = abort_lsn; + + /* in streaming mode, stream_abort_cb is required */ + if (ctx->callbacks.stream_abort_cb == NULL) + ereport(ERROR, + (errmsg("Output plugin supports streaming, but has not registered " + "stream_abort_cb callback."))); + + ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr apply_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_commit"; + state.report_location = apply_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = apply_lsn; + + /* in streaming mode, stream_abort_cb is required */ + if (ctx->callbacks.stream_commit_cb == NULL) + ereport(ERROR, + (errmsg("Output plugin supports streaming, but has not registered " + "stream_commit_cb callback."))); + + ctx->callbacks.stream_commit_cb(ctx, txn, apply_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_start"; + /* state.report_location = apply_lsn; */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + /* FIXME ctx->write_location = apply_lsn; */ + + /* in streaming mode, stream_start_cb is required */ + if (ctx->callbacks.stream_start_cb == NULL) + ereport(ERROR, + (errmsg("Output plugin supports streaming, but has not registered " + "stream_start_cb callback."))); + + ctx->callbacks.stream_start_cb(ctx, txn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_stop"; + /* state.report_location = apply_lsn; */ + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + /* FIXME ctx->write_location = apply_lsn; */ + + /* in streaming mode, stream_stop_cb is required */ + if (ctx->callbacks.stream_stop_cb == NULL) + ereport(ERROR, + (errmsg("Output plugin supports streaming, but has not registered " + "stream_stop_cb callback."))); + + ctx->callbacks.stream_stop_cb(ctx, txn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 6879a2e6d2..1e934d25e6 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -79,6 +79,11 @@ typedef struct LogicalDecodingContext */ void *output_writer_private; + /* + * Does the output plugin support streaming, and is it enabled? + */ + bool streaming; + /* * State for writing output. */ diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index d4ce54f26d..a30546250a 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -99,6 +99,67 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct */ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); +/* + * Callback for streaming individual changes from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* + * Callback for streaming truncates from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + +/* + * Callback for streaming generic logical decoding messages from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + +/* + * Called to discard changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* + * Called to apply changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called when starting to stream a block of changes from in-progress + * transaction (may be called repeatedly, if it's streamed in multiple + * chunks). + */ +typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called when stopping to stream a block of changes from in-progress + * transaction to a remote node (may be called repeatedly, if it's streamed + * in multiple chunks). + */ +typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + /* * Output plugin callbacks */ @@ -112,6 +173,14 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; + /* streaming of changes */ + LogicalDecodeStreamChangeCB stream_change_cb; + LogicalDecodeStreamTruncateCB stream_truncate_cb; + LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamCommitCB stream_commit_cb; + LogicalDecodeStreamStartCB stream_start_cb; + LogicalDecodeStreamStopCB stream_stop_cb; } OutputPluginCallbacks; /* Functions in replication/logical/logical.c */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 6a7187bbec..5b4be2bf88 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -345,6 +345,52 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* stream change callback signature */ +typedef void (*ReorderBufferStreamChangeCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* stream truncate callback signature */ +typedef void (*ReorderBufferStreamTruncateCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + +/* stream message callback signature */ +typedef void (*ReorderBufferStreamMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, Size sz, + const char *message); + +/* discard streamed transaction callback signature */ +typedef void (*ReorderBufferStreamAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* commit streamed transaction callback signature */ +typedef void (*ReorderBufferStreamCommitCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* start streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStartCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn); + +/* stop streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStopCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn); + struct ReorderBuffer { /* @@ -383,6 +429,17 @@ struct ReorderBuffer ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + /* + * Callbacks to be called when streaming a transaction. + */ + ReorderBufferStreamStartCB stream_start; + ReorderBufferStreamStopCB stream_stop; + ReorderBufferStreamChangeCB stream_change; + ReorderBufferStreamTruncateCB stream_truncate; + ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamCommitCB stream_commit; + /* * Pointer that will be passed untouched to the callbacks. */ -- 2.21.0