From 83135d7f8db59c80c2ab3b1a5c5542d9b0400fdb Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Fri, 27 Dec 2019 23:05:20 +0100 Subject: [PATCH 11/17] Add support for streaming to built-in replication To add support for streaming of in-progress transactions into the built-in transaction, we need to do three things: * Extend the logical replication protocol, so identify in-progress transactions, and allow adding additional bits of information (e.g. XID of subtransactions). * Modify the output plugin (pgoutput) to implement the new stream API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle streamed in-progress transaction by spilling the data to disk and then replaying them on commit. We however must explicitly disable streaming replication during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover we don't have a replication connection open so we don't have where to send the data anyway. --- doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 12 + src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 60 +- src/backend/postmaster/pgstat.c | 12 + .../libpqwalreceiver/libpqwalreceiver.c | 8 +- src/backend/replication/logical/launcher.c | 2 + src/backend/replication/logical/logical.c | 4 +- src/backend/replication/logical/proto.c | 157 ++- src/backend/replication/logical/worker.c | 1031 +++++++++++++++++ src/backend/replication/pgoutput/pgoutput.c | 263 ++++- src/backend/replication/slotfuncs.c | 7 + src/backend/replication/walsender.c | 6 + src/include/catalog/pg_subscription.h | 3 + src/include/pgstat.h | 6 +- src/include/replication/logicalproto.h | 42 +- src/include/replication/walreceiver.h | 1 + src/test/subscription/t/009_stream_simple.pl | 86 ++ src/test/subscription/t/010_stream_subxact.pl | 102 ++ src/test/subscription/t/011_stream_ddl.pl | 95 ++ .../t/012_stream_subxact_abort.pl | 82 ++ .../t/013_stream_subxact_ddl_abort.pl | 84 ++ 22 files changed, 2027 insertions(+), 42 deletions(-) create mode 100644 src/test/subscription/t/009_stream_simple.pl create mode 100644 src/test/subscription/t/010_stream_subxact.pl create mode 100644 src/test/subscription/t/011_stream_ddl.pl create mode 100644 src/test/subscription/t/012_stream_subxact_abort.pl create mode 100644 src/test/subscription/t/013_stream_subxact_ddl_abort.pl diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 6dfb2e4d3e..e1fb9075e1 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -163,8 +163,9 @@ ALTER SUBSCRIPTION name RENAME TO < This clause alters parameters originally set by . See there for more - information. The allowed options are slot_name and - synchronous_commit + information. The allowed options are slot_name, + synchronous_commit, work_mem + and streaming. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 91790b0c95..d9abf5e64c 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -218,6 +218,18 @@ CREATE SUBSCRIPTION subscription_name + + + streaming (boolean) + + + Specifies whether streaming of in-progress transactions should + be enabled for this subscription. By default, all transactions + are fully decoded on the publisher, and only then sent to the + subscriber as a whole. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 2a276482c1..15a6f5a8b3 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->workmem = subform->subworkmem; + sub->stream = subform->substream; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index fbb447379f..b2b93d6234 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -59,7 +59,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh, int *logical_wm, - bool *logical_wm_given) + bool *logical_wm_given, bool *streaming, + bool *streaming_given) { ListCell *lc; bool connect_given = false; @@ -92,6 +93,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *refresh = true; if (logical_wm) *logical_wm_given = false; + if (streaming) + *streaming_given = false; /* Parse options */ foreach(lc, options) @@ -186,6 +189,26 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *logical_wm_given = true; *logical_wm = defGetInt32(defel); + + /* + * Check that the value is not smaller than 64kB (which is + * the minimum value for logical_work_mem). + */ + if (*logical_wm < 64) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%d is outside the valid range for parameter \"work_mem\" (64 .. 2147483647)", + *logical_wm))); + } + else if (strcmp(defel->defname, "streaming") == 0 && streaming) + { + if (*streaming_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *streaming_given = true; + *streaming = defGetBoolean(defel); } else ereport(ERROR, @@ -332,6 +355,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool copy_data; int logical_wm; bool logical_wm_given; + bool streaming; + bool streaming_given; char *synchronous_commit; char *conninfo; char *slotname; @@ -348,7 +373,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) parse_subscription_options(stmt->options, &connect, &enabled_given, &enabled, &create_slot, &slotname_given, &slotname, ©_data, &synchronous_commit, - NULL, &logical_wm, &logical_wm_given); + NULL, &logical_wm, &logical_wm_given, + &streaming, &streaming_given); /* * Since creating a replication slot is not transactional, rolling back @@ -430,7 +456,15 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subworkmem - 1] = Int32GetDatum(logical_wm); else - nulls[Anum_pg_subscription_subworkmem - 1] = true; + values[Anum_pg_subscription_subworkmem - 1] = + Int32GetDatum(-1); + + if (streaming_given) + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(streaming); + else + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(false); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -692,11 +726,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *synchronous_commit; int logical_wm; bool logical_wm_given; + bool streaming; + bool streaming_given; parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, &slotname_given, &slotname, NULL, &synchronous_commit, NULL, - &logical_wm, &logical_wm_given); + &logical_wm, &logical_wm_given, + &streaming, &streaming_given); if (slotname_given) { @@ -728,6 +765,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subworkmem - 1] = true; } + if (streaming_given) + { + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(streaming); + replaces[Anum_pg_subscription_substream - 1] = true; + } + update_tuple = true; break; } @@ -740,7 +784,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, NULL, NULL, NULL, NULL, NULL, - NULL, NULL); + NULL, NULL, NULL, NULL); Assert(enabled_given); if (!sub->slotname && enabled) @@ -778,7 +822,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, &refresh, NULL, NULL); + NULL, &refresh, NULL, NULL, + NULL, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -815,7 +860,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL, NULL, NULL, NULL); + NULL, NULL, NULL, NULL, NULL, + NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 7410b2ff5e..a479ce9329 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -4102,6 +4102,18 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_WAL_WRITE: event_name = "WALWrite"; break; + case WAIT_EVENT_LOGICAL_CHANGES_READ: + event_name = "ReorderLogicalChangesRead"; + break; + case WAIT_EVENT_LOGICAL_CHANGES_WRITE: + event_name = "ReorderLogicalChangesWrite"; + break; + case WAIT_EVENT_LOGICAL_SUBXACT_READ: + event_name = "ReorderLogicalSubxactRead"; + break; + case WAIT_EVENT_LOGICAL_SUBXACT_WRITE: + event_name = "ReorderLogicalSubxactWrite"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0ab6855ad8..9970170e47 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -406,8 +406,12 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, "proto_version '%u'", options->proto.logical.proto_version); - appendStringInfo(&cmd, ", work_mem '%d'", - options->proto.logical.work_mem); + if (options->proto.logical.work_mem != -1) + appendStringInfo(&cmd, ", work_mem '%d'", + options->proto.logical.work_mem); + + if (options->proto.logical.streaming) + appendStringInfo(&cmd, ", streaming 'on'"); pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index c57b578b48..0a013ed220 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -14,6 +14,8 @@ * *------------------------------------------------------------------------- */ +#include +#include #include "postgres.h" diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index b88b58505a..ad43ab365e 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1149,7 +1149,7 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "stream_start"; - /* state.report_location = apply_lsn; */ + state.report_location = InvalidXLogRecPtr; errcallback.callback = output_plugin_error_callback; errcallback.arg = (void *) &state; errcallback.previous = error_context_stack; @@ -1194,7 +1194,7 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "stream_stop"; - /* state.report_location = apply_lsn; */ + state.report_location = InvalidXLogRecPtr; errcallback.callback = output_plugin_error_callback; errcallback.arg = (void *) &state; errcallback.previous = error_context_stack; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index e7df47de3e..5a379fb6bc 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -139,7 +139,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) +logicalrep_write_insert(StringInfo out, TransactionId xid, + Relation rel, HeapTuple newtuple) { pq_sendbyte(out, 'I'); /* action INSERT */ @@ -147,6 +148,10 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -182,8 +187,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) * Write UPDATE to the output stream. */ void -logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple) +logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, + HeapTuple oldtuple, HeapTuple newtuple) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -191,6 +196,10 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -252,13 +261,18 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, * Write DELETE to the output stream. */ void -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) +logicalrep_write_delete(StringInfo out, TransactionId xid, + Relation rel, HeapTuple oldtuple) { + pq_sendbyte(out, 'D'); /* action DELETE */ + Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); - pq_sendbyte(out, 'D'); /* action DELETE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -300,6 +314,7 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) */ void logicalrep_write_truncate(StringInfo out, + TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs) @@ -309,6 +324,10 @@ logicalrep_write_truncate(StringInfo out, pq_sendbyte(out, 'T'); /* action TRUNCATE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + pq_sendint32(out, nrelids); /* encode and send truncate flags */ @@ -351,12 +370,16 @@ logicalrep_read_truncate(StringInfo in, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) { char *relname; pq_sendbyte(out, 'R'); /* sending RELATION */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + /* use Oid as relation identifier */ pq_sendint32(out, RelationGetRelid(rel)); @@ -401,7 +424,7 @@ logicalrep_read_rel(StringInfo in) * This function will always write base type info. */ void -logicalrep_write_typ(StringInfo out, Oid typoid) +logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) { Oid basetypoid = getBaseType(typoid); HeapTuple tup; @@ -409,6 +432,10 @@ logicalrep_write_typ(StringInfo out, Oid typoid) pq_sendbyte(out, 'Y'); /* sending TYPE */ + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for type %u", basetypoid); @@ -689,3 +716,119 @@ logicalrep_read_namespace(StringInfo in) return nspname; } + +void +logicalrep_write_stream_start(StringInfo out, + TransactionId xid, bool first_segment) +{ + pq_sendbyte(out, 'S'); /* action STREAM START */ + + Assert(TransactionIdIsValid(xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); + + /* 1 if this is the first streaming segment for this xid */ + pq_sendint32(out, first_segment ? 1 : 0); +} + +TransactionId +logicalrep_read_stream_start(StringInfo in, bool *first_segment) +{ + TransactionId xid; + + Assert(first_segment); + + xid = pq_getmsgint(in, 4); + *first_segment = (pq_getmsgint(in, 4) == 1); + + return xid; +} + +void +logicalrep_write_stream_stop(StringInfo out, TransactionId xid) +{ + pq_sendbyte(out, 'E'); /* action STREAM END */ + + Assert(TransactionIdIsValid(xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); +} + +TransactionId +logicalrep_read_stream_stop(StringInfo in) +{ + TransactionId xid; + + xid = pq_getmsgint(in, 4); + + return xid; +} + +void +logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ + + Assert(TransactionIdIsValid(txn->xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, txn->xid); + + /* send the flags field (unused for now) */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); +} + +TransactionId +logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) +{ + TransactionId xid; + uint8 flags; + + xid = pq_getmsgint(in, 4); + + /* read flags (unused for now) */ + flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit message", flags); + + /* read fields */ + commit_data->commit_lsn = pq_getmsgint64(in); + commit_data->end_lsn = pq_getmsgint64(in); + commit_data->committime = pq_getmsgint64(in); + + return xid; +} + +void +logicalrep_write_stream_abort(StringInfo out, TransactionId xid, + TransactionId subxid) +{ + pq_sendbyte(out, 'A'); /* action STREAM ABORT */ + + Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); + pq_sendint32(out, subxid); +} + +void +logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, + TransactionId *subxid) +{ + Assert(xid && subxid); + + *xid = pq_getmsgint(in, 4); + *subxid = pq_getmsgint(in, 4); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c80acd3eb0..cf053e948b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -18,11 +18,32 @@ * This module includes server facing code and shares libpqwalreceiver * module with walreceiver for providing the libpq specific functionality. * + * + * STREAMED TRANSACTIONS + * --------------------- + * + * Streamed transactions (large transactions exceeding a memory limit on the + * upstream) are not applied immediately, but instead the data is written + * to files and then applied at once when the final commit arrives. + * + * Unlike the regular (non-streamed) case, handling streamed transactions has + * to handle aborts of both the toplevel transaction and subtransactions. This + * is achieved by tracking offsets for subtransactions, which is then used + * to truncate the file with serialized changes. + * + * The files are placed in /tmp by default, and the filenames include both + * the XID of the toplevel transaction and OID of the subscription. This + * is necessary so that different workers processing a remote transaction + * with the same XID don't interfere. + * *------------------------------------------------------------------------- */ #include "postgres.h" +#include +#include + #include "access/table.h" #include "access/tableam.h" #include "access/xact.h" @@ -31,6 +52,7 @@ #include "catalog/namespace.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "catalog/pg_tablespace.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -60,6 +82,7 @@ #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" #include "storage/bufmgr.h" +#include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -67,6 +90,7 @@ #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/catcache.h" +#include "utils/dynahash.h" #include "utils/datum.h" #include "utils/fmgroids.h" #include "utils/guc.h" @@ -106,12 +130,59 @@ bool MySubscriptionValid = false; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +/* fields valid only when processing streamed transaction */ +bool in_streamed_transaction = false; + +static TransactionId stream_xid = InvalidTransactionId; +static int stream_fd = -1; + +typedef struct SubXactInfo +{ + TransactionId xid; /* XID of the subxact */ + off_t offset; /* offset in the file */ +} SubXactInfo; + +static uint32 nsubxacts = 0; +static uint32 nsubxacts_max = 0; +static SubXactInfo * subxacts = NULL; +static TransactionId subxact_last = InvalidTransactionId; + +static void subxact_filename(char *path, Oid subid, TransactionId xid); +static void changes_filename(char *path, Oid subid, TransactionId xid); + +/* + * Information about subtransactions of a given toplevel transaction. + */ +static void subxact_info_write(Oid subid, TransactionId xid); +static void subxact_info_read(Oid subid, TransactionId xid); +static void subxact_info_add(TransactionId xid); + +/* + * Serialize and deserialize changes for a toplevel transaction. + */ +static void stream_cleanup_files(Oid subid, TransactionId xid); +static void stream_open_file(Oid subid, TransactionId xid, bool first); +static void stream_write_change(char action, StringInfo s); +static void stream_close_file(void); + +/* + * Array of serialized XIDs. + */ +static int nxids = 0; +static int maxnxids = 0; +static TransactionId *xids = NULL; + +static bool handle_streamed_transaction(const char action, StringInfo s); + static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); +/* prototype needed because of stream_commit */ +static void apply_dispatch(StringInfo s); + /* * Should this worker apply changes for given relation. * @@ -163,6 +234,42 @@ ensure_transaction(void) return true; } +/* + * Handle streamed transactions. + * + * If in streaming mode (receiving a block of streamed transaction), we + * simply redirect it to a file for the proper toplevel transaction. + * + * Returns true for streamed transactions, false otherwise (regular mode). + */ +static bool +handle_streamed_transaction(const char action, StringInfo s) +{ + TransactionId xid; + + /* not in streaming mode */ + if (!in_streamed_transaction) + return false; + + Assert(stream_fd != -1); + Assert(TransactionIdIsValid(stream_xid)); + + /* + * We should have received XID of the subxact as the first part of the + * message, so extract it. + */ + xid = pq_getmsgint(s, 4); + + Assert(TransactionIdIsValid(xid)); + + /* Add the new subxact to the array (unless already there). */ + subxact_info_add(xid); + + /* write the change to the current file */ + stream_write_change(action, s); + + return true; +} /* * Executor state preparation for evaluation of constraint expressions, @@ -528,6 +635,318 @@ apply_handle_origin(StringInfo s) errmsg("ORIGIN message sent out of order"))); } +/* + * Handle STREAM START message. + */ +static void +apply_handle_stream_start(StringInfo s) +{ + bool first_segment; + + Assert(!in_streamed_transaction); + + /* notify handle methods we're processing a remote transaction */ + in_streamed_transaction = true; + + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + + /* open the spool file for this transaction */ + stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); + + /* + * if this is not the first segment, open existing file + * + * XXX Note that the cleanup is performed by stream_open_file. + */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, stream_xid); + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle STREAM STOP message. + */ +static void +apply_handle_stream_stop(StringInfo s) +{ + Assert(in_streamed_transaction); + + /* + * Close the file with serialized changes, and serialize information about + * subxacts for the toplevel transaction. + */ + subxact_info_write(MyLogicalRepWorker->subid, stream_xid); + stream_close_file(); + + in_streamed_transaction = false; + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle STREAM abort message. + */ +static void +apply_handle_stream_abort(StringInfo s) +{ + TransactionId xid; + TransactionId subxid; + + Assert(!in_streamed_transaction); + + logicalrep_read_stream_abort(s, &xid, &subxid); + + /* + * If the two XIDs are the same, it's in fact abort of toplevel xact, so + * just delete the files with serialized info. + */ + if (xid == subxid) + { + char path[MAXPGPATH]; + + /* + * XXX Maybe this should be an error instead? Can we receive abort for + * a toplevel transaction we haven't received? + */ + + changes_filename(path, MyLogicalRepWorker->subid, xid); + + if (unlink(path) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", path))); + + subxact_filename(path, MyLogicalRepWorker->subid, xid); + + if (unlink(path) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", path))); + + return; + } + else + { + /* + * OK, so it's a subxact. We need to read the subxact file for the + * toplevel transaction, determine the offset tracked for the subxact, + * and truncate the file with changes. We also remove the subxacts + * with higher offsets (or rather higher XIDs). + * + * We intentionally scan the array from the tail, because we're likely + * aborting a change for the most recent subtransactions. + * + * XXX Can we rely on the subxact XIDs arriving in sorted order? That + * would allow us to use binary search here. + * + * XXX Or perhaps we can rely on the aborts to arrive in the reverse + * order, i.e. from the inner-most subxact (when nested)? In which + * case we could simply check the last element. + */ + + int64 i; + int64 subidx; + bool found = false; + char path[MAXPGPATH]; + + subidx = -1; + subxact_info_read(MyLogicalRepWorker->subid, xid); + + /* FIXME optimize the search by bsearch on sorted data */ + for (i = nsubxacts; i > 0; i--) + { + if (subxacts[i - 1].xid == subxid) + { + subidx = (i - 1); + found = true; + break; + } + } + + /* We should not receive aborts for unknown subtransactions. */ + Assert(found); + + /* OK, truncate the file at the right offset. */ + Assert((subidx >= 0) && (subidx < nsubxacts)); + + changes_filename(path, MyLogicalRepWorker->subid, xid); + + if (truncate(path, subxacts[subidx].offset)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not truncate file \"%s\": %m", path))); + + /* discard the subxacts added later */ + nsubxacts = subidx; + + /* write the updated subxact list */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + } +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + int fd; + TransactionId xid; + StringInfoData s2; + int nchanges; + + char path[MAXPGPATH]; + char *buffer = NULL; + LogicalRepCommitData commit_data; + + MemoryContext oldcxt; + + Assert(!in_streamed_transaction); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + /* open the spool file for the committed transaction */ + changes_filename(path, MyLogicalRepWorker->subid, xid); + + elog(DEBUG1, "replaying changes from file '%s'", path); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } + + /* XXX Should this be allocated in another memory context? */ + + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + + buffer = palloc(8192); + initStringInfo(&s2); + + MemoryContextSwitchTo(oldcxt); + + ensure_transaction(); + + /* + * Make sure the handle apply_dispatch methods are aware we're in a remote + * transaction. + */ + in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); + + /* + * Read the entries one by one and pass them through the same logic as in + * apply_dispatch. + */ + nchanges = 0; + while (true) + { + int nbytes; + int len; + + /* read length of the on-disk record */ + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_CHANGES_READ); + nbytes = read(fd, &len, sizeof(len)); + pgstat_report_wait_end(); + + /* have we reached end of the file? */ + if (nbytes == 0) + break; + + /* do we have a correct length? */ + if (nbytes != sizeof(len)) + { + int save_errno = errno; + + CloseTransientFile(fd); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file: %m"))); + return; + } + + Assert(len > 0); + + /* make sure we have sufficiently large buffer */ + buffer = repalloc(buffer, len); + + /* and finally read the data into the buffer */ + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_CHANGES_READ); + if (read(fd, buffer, len) != len) + { + int save_errno = errno; + + CloseTransientFile(fd); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file: %m"))); + return; + } + pgstat_report_wait_end(); + + /* copy the buffer to the stringinfo and call apply_dispatch */ + resetStringInfo(&s2); + appendBinaryStringInfo(&s2, buffer, len); + + /* Ensure we are reading the data into our memory context. */ + oldcxt = MemoryContextSwitchTo(ApplyMessageContext); + + apply_dispatch(&s2); + + MemoryContextReset(ApplyMessageContext); + + MemoryContextSwitchTo(oldcxt); + + nchanges++; + + if (nchanges % 1000 == 0) + elog(DEBUG1, "replayed %d changes from file '%s'", + nchanges, path); + + /* + * send feedback to upstream + * + * XXX Probably should send a valid LSN. But which one? + */ + send_feedback(InvalidXLogRecPtr, false, false); + } + + CloseTransientFile(fd); + + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data.end_lsn; + replorigin_session_origin_timestamp = commit_data.committime; + + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data.end_lsn); + + elog(DEBUG1, "replayed %d (all) changes from file '%s'", + nchanges, path); + + in_remote_transaction = false; + pgstat_report_activity(STATE_IDLE, NULL); + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + pfree(buffer); + pfree(s2.data); +} + /* * Handle RELATION message. * @@ -541,6 +960,9 @@ apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; + if (handle_streamed_transaction('R', s)) + return; + rel = logicalrep_read_rel(s); logicalrep_relmap_update(rel); } @@ -556,6 +978,9 @@ apply_handle_type(StringInfo s) { LogicalRepTyp typ; + if (handle_streamed_transaction('Y', s)) + return; + logicalrep_read_typ(s, &typ); logicalrep_typmap_update(&typ); } @@ -591,6 +1016,9 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; + if (handle_streamed_transaction('I', s)) + return; + ensure_transaction(); relid = logicalrep_read_insert(s, &newtup); @@ -694,6 +1122,9 @@ apply_handle_update(StringInfo s) bool found; MemoryContext oldctx; + if (handle_streamed_transaction('U', s)) + return; + ensure_transaction(); relid = logicalrep_read_update(s, &has_oldtup, &oldtup, @@ -814,6 +1245,9 @@ apply_handle_delete(StringInfo s) bool found; MemoryContext oldctx; + if (handle_streamed_transaction('D', s)) + return; + ensure_transaction(); relid = logicalrep_read_delete(s, &oldtup); @@ -913,6 +1347,9 @@ apply_handle_truncate(StringInfo s) List *relids_logged = NIL; ListCell *lc; + if (handle_streamed_transaction('T', s)) + return; + ensure_transaction(); remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); @@ -1004,6 +1441,22 @@ apply_dispatch(StringInfo s) case 'O': apply_handle_origin(s); break; + /* STREAM START */ + case 'S': + apply_handle_stream_start(s); + break; + /* STREAM END */ + case 'E': + apply_handle_stream_stop(s); + break; + /* STREAM ABORT */ + case 'A': + apply_handle_stream_abort(s); + break; + /* STREAM COMMIT */ + case 'c': + apply_handle_stream_commit(s); + break; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1100,6 +1553,22 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } } +/* + * Cleanup function. + * + * Called on logical replication worker exit. + */ +static void +worker_onexit(int code, Datum arg) +{ + int i; + + elog(LOG, "cleanup files for %d transactions", nxids); + + for (i = nxids-1; i >= 0; i--) + stream_cleanup_files(MyLogicalRepWorker->subid, xids[i]); +} + /* * Apply main loop. */ @@ -1116,6 +1585,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "ApplyMessageContext", ALLOCSET_DEFAULT_SIZES); + /* do cleanup on worker exit (e.g. after DROP SUBSCRIPTION) */ + before_shmem_exit(worker_onexit, (Datum) 0); + /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -1564,6 +2036,564 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) MySubscriptionValid = false; } +/* + * subxact_info_write + * Store information about subxacts for a toplevel transaction. + * + * For each subxact we store offset of it's first change in the main file. + * The file is always over-written as a whole, and we also include CRC32C + * checksum of the information. + * + * XXX We should only store subxacts that were not aborted yet. + * + * XXX Maybe we should only include the checksum when the cluster is + * initialized with checksums? + * + * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end. + */ +static void +subxact_info_write(Oid subid, TransactionId xid) +{ + int fd; + char path[MAXPGPATH]; + uint32 checksum; + Size len; + + Assert(TransactionIdIsValid(xid)); + + subxact_filename(path, subid, xid); + + fd = OpenTransientFile(path, O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY); + if (fd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", + path))); + return; + } + + len = sizeof(SubXactInfo) * nsubxacts; + + /* compute the checksum */ + INIT_CRC32C(checksum); + COMP_CRC32C(checksum, (char *) &nsubxacts, sizeof(nsubxacts)); + COMP_CRC32C(checksum, (char *) subxacts, len); + FIN_CRC32C(checksum); + + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_WRITE); + + if (write(fd, &checksum, sizeof(checksum)) != sizeof(checksum)) + { + int save_errno = errno; + + CloseTransientFile(fd); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + path))); + return; + } + + if (write(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) + { + int save_errno = errno; + + CloseTransientFile(fd); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + path))); + return; + } + + if ((len > 0) && (write(fd, subxacts, len) != len)) + { + int save_errno = errno; + + CloseTransientFile(fd); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + path))); + return; + } + + pgstat_report_wait_end(); + + /* + * We don't need to fsync or anything, as we'll recreate the files after a + * crash from scratch. So just close the file. + */ + CloseTransientFile(fd); + + /* + * But we free the memory allocated for subxact info. There might be one + * exceptional transaction with many subxacts, and we don't want to keep + * the memory allocated forewer. + * + */ + if (subxacts) + pfree(subxacts); + + subxacts = NULL; + subxact_last = InvalidTransactionId; + nsubxacts = 0; + nsubxacts_max = 0; +} + +/* + * subxact_info_read + * Restore information about subxacts of a streamed transaction. + * + * Read information about subxacts into the global variables, and while + * reading the information verify the checksum. + * + * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end. + * + * XXX Do we need to allocate it in TopMemoryContext? + */ +static void +subxact_info_read(Oid subid, TransactionId xid) +{ + int fd; + char path[MAXPGPATH]; + uint32 checksum; + uint32 checksum_new; + Size len; + MemoryContext oldctx; + + Assert(TransactionIdIsValid(xid)); + Assert(!subxacts); + Assert(nsubxacts == 0); + Assert(nsubxacts_max == 0); + + subxact_filename(path, subid, xid); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + return; + } + + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_READ); + + /* read the checksum */ + if (read(fd, &checksum, sizeof(checksum)) != sizeof(checksum)) + { + int save_errno = errno; + + CloseTransientFile(fd); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + path))); + return; + } + + /* read number of subxact items */ + if (read(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts)) + { + int save_errno = errno; + + CloseTransientFile(fd); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + path))); + return; + } + + pgstat_report_wait_end(); + + len = sizeof(SubXactInfo) * nsubxacts; + + /* we keep the maximum as a power of 2 */ + nsubxacts_max = 1 << my_log2(nsubxacts); + + /* subxacts are long-lived */ + oldctx = MemoryContextSwitchTo(TopMemoryContext); + subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_READ); + + if ((len > 0) && ((read(fd, subxacts, len)) != len)) + { + int save_errno = errno; + + CloseTransientFile(fd); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + path))); + return; + } + + pgstat_report_wait_end(); + + /* recompute the checksum */ + INIT_CRC32C(checksum_new); + COMP_CRC32C(checksum_new, (char *) &nsubxacts, sizeof(nsubxacts)); + COMP_CRC32C(checksum_new, (char *) subxacts, len); + FIN_CRC32C(checksum_new); + + if (checksum_new != checksum) + ereport(ERROR, + (errmsg("checksum failure when reading subxacts"))); + + CloseTransientFile(fd); +} + +/* + * subxact_info_add + * Add information about a subxact (offset in the main file). + * + * XXX Do we need to allocate it in TopMemoryContext? + */ +static void +subxact_info_add(TransactionId xid) +{ + int64 i; + + /* + * If the XID matches the toplevel transaction, we don't want to add it. + */ + if (stream_xid == xid) + return; + + /* + * In most cases we're checking the same subxact as we've already seen in + * the last call, so make ure just ignore it (this change comes later). + */ + if (subxact_last == xid) + return; + + /* OK, remember we're processing this XID. */ + subxact_last = xid; + + /* + * Check if the transaction is already present in the array of subxact. We + * intentionally scan the array from the tail, because we're likely adding + * a change for the most recent subtransactions. + * + * XXX Can we rely on the subxact XIDs arriving in sorted order? That + * would allow us to use binary search here. + */ + for (i = nsubxacts; i > 0; i--) + { + /* found, so we're done */ + if (subxacts[i - 1].xid == xid) + return; + } + + /* This is a new subxact, so we need to add it to the array. */ + + if (nsubxacts == 0) + { + MemoryContext oldctx; + + nsubxacts_max = 128; + oldctx = MemoryContextSwitchTo(TopMemoryContext); + subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + } + else if (nsubxacts == nsubxacts_max) + { + nsubxacts_max *= 2; + subxacts = repalloc(subxacts, nsubxacts_max * sizeof(SubXactInfo)); + } + + subxacts[nsubxacts].xid = xid; + subxacts[nsubxacts].offset = lseek(stream_fd, 0, SEEK_END); + + nsubxacts++; +} + +/* format filename for file containing the info about subxacts */ +static void +subxact_filename(char *path, Oid subid, TransactionId xid) +{ + char tempdirpath[MAXPGPATH]; + + TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID); + + /* + * We might need to create the tablespace's tempfile directory, if no + * one has yet done so. + * + * Don't check for error from mkdir; it could fail if the directory + * already exists (maybe someone else just did the same thing). If + * it doesn't work then we'll bomb out when opening the file + */ + mkdir(tempdirpath, S_IRWXU); + + snprintf(path, MAXPGPATH, "%s/logical-%u-%u.subxacts", + tempdirpath, subid, xid); +} + +/* format filename for file containing serialized changes */ +static void +changes_filename(char *path, Oid subid, TransactionId xid) +{ + char tempdirpath[MAXPGPATH]; + + TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID); + + /* + * We might need to create the tablespace's tempfile directory, if no + * one has yet done so. + * + * Don't check for error from mkdir; it could fail if the directory + * already exists (maybe someone else just did the same thing). If + * it doesn't work then we'll bomb out when opening the file + */ + mkdir(tempdirpath, S_IRWXU); + + snprintf(path, MAXPGPATH, "%s/logical-%u-%u.changes", + tempdirpath, subid, xid); +} + +/* + * stream_cleanup_files + * Cleanup files for a subscription / toplevel transaction. + * + * Remove files with serialized changes and subxact info for a particular + * toplevel transaction. Each subscription has a separate set of files. + * + * Note: The files may not exists, so handle ENOENT as non-error. + * + * TODO: Add missing_ok flag to specify in which cases it's OK not to + * find the files, and when it's an error. + */ +static void +stream_cleanup_files(Oid subid, TransactionId xid) +{ + int i; + char path[MAXPGPATH]; + bool found = false; + + subxact_filename(path, subid, xid); + + if ((unlink(path) < 0) && (errno != ENOENT)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", path))); + + changes_filename(path, subid, xid); + + if ((unlink(path) < 0) && (errno != ENOENT)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", path))); + + /* + * Cleanup the XID from the array - find the XID in the array and + * remove it by shifting all the remaining elements. The array is + * bound to be fairly small (maximum number of in-progress xacts, + * so max_connections + max_prepared_transactions) so simply loop + * through the array and find index of the XID. Then move the rest + * of the array by one element to the left. + * + * Notice we also call this from stream_open_file for first segment + * of each transaction, to deal with possible left-overs after a + * crash, so it's entirely possible not to find the XID in the + * array here. In that case we don't remove anything. + * + * XXX Perhaps it'd be better to handle this automatically after a + * restart, instead of doing it over and over for each transaction. + */ + for (i = 0; i < nxids; i++) + { + if (xids[i] == xid) + { + found = true; + break; + } + } + + if (!found) + return; + + /* + * Move the last entry from the array to the place. We don't keep + * the streamed transactions sorted or anything - we only expect + * a few of them in progress (max_connections + max_prepared_xacts) + * so linear search is just fine. + */ + xids[i] = xids[nxids-1]; + nxids--; +} + +/* + * stream_open_file + * Open file we'll use to serialize changes for a toplevel transaction. + * + * Open a file for streamed changes from a toplevel transaction identified + * by stream_xid (global variable). If it's the first chunk of streamed + * changes for this transaction, perform cleanup by removing existing + * files after a possible previous crash. + * + * This can only be called at the beginning of a "streaming" block, i.e. + * between stream_start/stream_stop messages from the upstream. + */ +static void +stream_open_file(Oid subid, TransactionId xid, bool first_segment) +{ + char path[MAXPGPATH]; + int flags; + + Assert(in_streamed_transaction); + Assert(OidIsValid(subid)); + Assert(TransactionIdIsValid(xid)); + Assert(stream_fd == -1); + + /* + * If this is the first segment for this transaction, try removing + * existing files (if there are any, possibly after a crash). + */ + if (first_segment) + { + MemoryContext oldcxt; + + /* XXX make sure there are no previous files for this transaction */ + stream_cleanup_files(subid, xid); + + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + + /* + * We need to remember the XIDs we spilled to files, so that we can + * remove them at worker exit (e.g. after DROP SUBSCRIPTION). + * + * The number of XIDs we may need to track is fairly small, because + * we can only stream toplevel xacts (so limited by max_connections + * and max_prepared_transactions), and we only stream the large ones. + * So we simply keep the XIDs in an unsorted array. If the number of + * xacts gets large for some reason (e.g. very high max_connections), + * a more elaborate approach might be better - e.g. sorted array, to + * speed-up the lookups. + */ + if (nxids == maxnxids) /* array of XIDs is full */ + { + if (!xids) + { + maxnxids = 64; + xids = palloc(maxnxids * sizeof(TransactionId)); + } + else + { + maxnxids = 2 * maxnxids; + xids = repalloc(xids, maxnxids * sizeof(TransactionId)); + } + } + + xids[nxids++] = xid; + + MemoryContextSwitchTo(oldcxt); + } + + changes_filename(path, subid, xid); + + elog(DEBUG1, "opening file '%s' for streamed changes", path); + + /* + * If this is the first streamed segment, the file must not exist, so + * make sure we're the ones creating it. Otherwise just open the file + * for writing, in append mode. + */ + if (first_segment) + flags = (O_WRONLY | O_CREAT | O_EXCL | PG_BINARY); + else + flags = (O_WRONLY | O_APPEND | PG_BINARY); + + stream_fd = OpenTransientFile(path, flags); + + if (stream_fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); +} + +/* + * stream_close_file + * Close the currently open file with streamed changes. + * + * This can only be called at the beginning of a "streaming" block, i.e. + * between stream_start/stream_stop messages from the upstream. + */ +static void +stream_close_file(void) +{ + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != -1); + + CloseTransientFile(stream_fd); + + stream_xid = InvalidTransactionId; + stream_fd = -1; +} + +/* + * stream_write_change + * Serialize a change to a file for the current toplevel transaction. + * + * The change is serialied in a simple format, with length (not including + * the length), action code (identifying the message type) and message + * contents (without the subxact TransactionId value). + * + * XXX The subxact file includes CRC32C of the contents. Maybe we should + * include something like that here too, but doing so will not be as + * straighforward, because we write the file in chunks. + */ +static void +stream_write_change(char action, StringInfo s) +{ + int len; + + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != -1); + + /* total on-disk size, including the action type character */ + len = (s->len - s->cursor) + sizeof(char); + + pgstat_report_wait_start(WAIT_EVENT_LOGICAL_CHANGES_WRITE); + + /* first write the size */ + if (write(stream_fd, &len, sizeof(len)) != sizeof(len)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not serialize streamed change to file: %m"))); + + /* then the action */ + if (write(stream_fd, &action, sizeof(action)) != sizeof(action)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not serialize streamed change to file: %m"))); + + /* and finally the remaining part of the buffer (after the XID) */ + len = (s->len - s->cursor); + + if (write(stream_fd, &s->data[s->cursor], len) != len) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not serialize streamed change to file: %m"))); + + pgstat_report_wait_end(); +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -1730,6 +2760,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.work_mem = MySubscription->workmem; + options.proto.logical.streaming = MySubscription->stream; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index cf6e03b9a7..8490ea4717 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -45,16 +45,42 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + static bool publications_valid; +static bool in_streaming; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -/* Entry in the map used to remember which relation schemas we sent. */ +/* + * Entry in the map used to remember which relation schemas we sent. + * + * The schema_sent flag determines if the current schema record was already + * sent to the subscriber (in which case we don't need to send it again). + * + * The schema cache on downstream is however updated only at commit time, + * and with streamed transactions the commit order may be different from + * the order the transactions are sent in. So streamed trasactions are + * handled separately by using schema_sent flag in ReorderBufferTXN. + */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ + TransactionId xid; /* transaction that created the record */ bool schema_sent; /* did we send the schema? */ bool replicate_valid; PublicationActions pubactions; @@ -64,6 +90,7 @@ typedef struct RelationSyncEntry static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); +static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, @@ -84,16 +111,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; + + /* transaction streaming */ + cb->stream_change_cb = pgoutput_change; + cb->stream_truncate_cb = pgoutput_truncate; + cb->stream_abort_cb = pgoutput_stream_abort; + cb->stream_commit_cb = pgoutput_stream_commit; + cb->stream_start_cb = pgoutput_stream_start; + cb->stream_stop_cb = pgoutput_stream_stop; } static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names, int *logical_decoding_work_mem) + List **publication_names, int *logical_decoding_work_mem, + bool *enable_streaming) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; bool work_mem_given = false; + bool streaming_given = false; foreach(lc, options) { @@ -162,6 +199,23 @@ parse_output_parameters(List *options, uint32 *protocol_version, *logical_decoding_work_mem = (int)parsed; } + else if (strcmp(defel->defname, "streaming") == 0) + { + if (streaming_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + streaming_given = true; + + /* the value must be on/off */ + if (strcmp(strVal(defel->arg), "on") && strcmp(strVal(defel->arg), "off")) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid streaming value"))); + + /* enable streaming if it's 'on' */ + *enable_streaming = (strcmp(strVal(defel->arg), "on") == 0); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -174,6 +228,7 @@ static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { + bool enable_streaming = false; PGOutputData *data = palloc0(sizeof(PGOutputData)); /* Create our memory context for private allocations. */ @@ -197,7 +252,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, &data->publication_names, - &logical_decoding_work_mem); + &logical_decoding_work_mem, + &enable_streaming); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) @@ -217,6 +273,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("publication_names parameter missing"))); + /* + * Decide whether to enable streaming. It is disabled by default, in + * which case we just update the flag in decoding context. Otherwise + * we only allow it with sufficient version of the protocol, and when + * the output plugin supports it. + */ + if (!enable_streaming) + ctx->streaming = false; + else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support streaming, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM))); + else if (!ctx->streaming) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("streaming requested, but not supported by output plugin"))); + + /* Also remember we're currently not streaming any transaction. */ + in_streaming = false; + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -284,9 +361,42 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static void maybe_send_schema(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry) { - if (!relentry->schema_sent) + bool schema_sent; + TransactionId xid = InvalidTransactionId; + + /* + * Remember XID of the (sub)transaction for the change. We don't care if + * it's top-level transaction or not (we have already sent that XID in + * start the current streaming block). + * + * If we're not in a streaming block, just use InvalidTransactionId and + * the write methods will not include it. + */ + if (in_streaming) + xid = change->txn->xid; + + /* + * Do we need to send the schema? We do track streamed transactions + * separately, because those may not be applied later (and the regular + * transactions won't see their effects until then) and in an order + * that we don't know at this point. + */ + if (in_streaming) + { + /* + * TOCHECK: We have to send schema after each catalog change and it may + * occur when streaming already started, so we have to track new catalog + * changes somehow. + */ + schema_sent = txn->is_schema_sent; + } + else + schema_sent = relentry->schema_sent; + + if (!schema_sent) { TupleDesc desc; int i; @@ -312,19 +422,26 @@ maybe_send_schema(LogicalDecodingContext *ctx, continue; OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); + logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); + logicalrep_write_rel(ctx->out, xid, relation); OutputPluginWrite(ctx, false); - relentry->schema_sent = true; + relentry->xid = change->txn->xid; + + if (in_streaming) + txn->is_schema_sent = true; + else + relentry->schema_sent = true; } } /* * Sends the decoded DML over wire. + * + * XXX May be called both in streaming and non-streaming modes. */ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -333,6 +450,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; + TransactionId xid = InvalidTransactionId; + + if (in_streaming) + xid = change->txn->xid; if (!is_publishable_relation(relation)) return; @@ -361,14 +482,14 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - maybe_send_schema(ctx, relation, relentry); + maybe_send_schema(ctx, txn, change, relation, relentry); /* Send the data */ switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, + logicalrep_write_insert(ctx->out, xid, relation, &change->data.tp.newtuple->tuple); OutputPluginWrite(ctx, true); break; @@ -378,7 +499,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, &change->data.tp.oldtuple->tuple : NULL; OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, + logicalrep_write_update(ctx->out, xid, relation, oldtuple, &change->data.tp.newtuple->tuple); OutputPluginWrite(ctx, true); break; @@ -387,7 +508,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (change->data.tp.oldtuple) { OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, + logicalrep_write_delete(ctx->out, xid, relation, &change->data.tp.oldtuple->tuple); OutputPluginWrite(ctx, true); } @@ -413,6 +534,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int i; int nrelids; Oid *relids; + TransactionId xid = InvalidTransactionId; + + if (in_streaming) + xid = change->txn->xid; old = MemoryContextSwitchTo(data->context); @@ -433,13 +558,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, continue; relids[nrelids++] = relid; - maybe_send_schema(ctx, relation, relentry); + maybe_send_schema(ctx, txn, change, relation, relentry); } if (nrelids > 0) { OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, + xid, nrelids, relids, change->data.truncate.cascade, @@ -512,6 +638,91 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) rel_sync_cache_publication_cb(arg, cacheid, hashvalue); } +/* + * Notify downstream to apply the streamed transaction (along with all + * it's subtransactions). + */ +static void +pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + ReorderBufferTXN *toptxn; + + /* + * The abort should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + + /* determine the toplevel transaction */ + toptxn = (txn->toptxn) ? txn->toptxn : txn; + + Assert(rbtxn_is_streamed(toptxn)); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(toptxn->xid, false); +} + +/* + * Notify downstream to discard the streamed transaction (along with all + * it's subtransactions, if it's a toplevel transaction). + */ +static void +pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + /* + * The commit should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(txn->xid, true); +} + + +static void +pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + /* we can't nest streaming of transactions */ + Assert(!in_streaming); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); + OutputPluginWrite(ctx, true); + + /* we're streaming a chunk of transaction now */ + in_streaming = true; +} + +static void +pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + /* we should be streaming a trasanction */ + Assert(in_streaming); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_stop(ctx->out, txn->xid); + OutputPluginWrite(ctx, true); + + /* we've stopped streaming a transaction */ + in_streaming = false; +} + /* * Initialize the relation schema sync cache for a decoding session. * @@ -622,6 +833,34 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) return entry; } +/* + * Cleanup list of streamed transactions and update the schema_sent flag. + * + * When a streamed transaction commits or aborts, we need to remove the + * toplevel XID from the schema cache. If the transaction aborted, the + * subscriber will simply throw away the schema records we streamed, so + * we don't need to do anything else. + * + * If the transaction committed, the subscriber will update the relation + * cache - so tweak the schema_sent flag accordingly. + */ +static void +cleanup_rel_sync_cache(TransactionId xid, bool is_commit) +{ + HASH_SEQ_STATUS hash_seq; + RelationSyncEntry *entry; + + Assert(RelationSyncCache != NULL); + + hash_seq_init(&hash_seq, RelationSyncCache); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + if (is_commit) + entry->schema_sent = true; + } + +} + /* * Relcache invalidation callback */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ba08ad405f..8eb3160041 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -147,6 +147,13 @@ create_logical_replication_slot(char *name, char *plugin, logical_read_local_xlog_page, NULL, NULL, NULL); + /* + * Make sure streaming is disabled here - we may have the methods, + * but we don't have anywhere to send the data yet. + */ + ctx->streaming = false; + + /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1f23665432..2e0743ac8f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -968,6 +968,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); + /* + * Make sure streaming is disabled here - we may have the methods, + * but we don't have anywhere to send the data yet. + */ + ctx->streaming = false; + /* * Signal that we don't need the timeout mechanism. We're just * creating the replication slot and don't yet accept feedback diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 10ea113e4d..8793676258 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -50,6 +50,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW int32 subworkmem; /* Memory to use to decode changes. */ + bool substream; /* Stream in-progress transactions. */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -76,6 +78,7 @@ typedef struct Subscription Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ int workmem; /* Memory to decode changes. */ + bool stream; /* Allow streaming in-progress transactions. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index f2e873d048..c522703d8c 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -945,7 +945,11 @@ typedef enum WAIT_EVENT_WAL_READ, WAIT_EVENT_WAL_SYNC, WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN, - WAIT_EVENT_WAL_WRITE + WAIT_EVENT_WAL_WRITE, + WAIT_EVENT_LOGICAL_CHANGES_READ, + WAIT_EVENT_LOGICAL_CHANGES_WRITE, + WAIT_EVENT_LOGICAL_SUBXACT_READ, + WAIT_EVENT_LOGICAL_SUBXACT_WRITE } WaitEventIO; /* ---------- diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 3fc430af01..bf02cbc19d 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -23,9 +23,13 @@ * we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we * have backwards compatibility for. The client requests protocol version at * connect time. + * + * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with + * support for streaming large transactions. */ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 -#define LOGICALREP_PROTO_VERSION_NUM 1 +#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 +#define LOGICALREP_PROTO_VERSION_NUM 2 /* Tuple coming via logical replication. */ typedef struct LogicalRepTupleData @@ -85,25 +89,45 @@ extern void logicalrep_read_commit(StringInfo in, extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); -extern void logicalrep_write_insert(StringInfo out, Relation rel, - HeapTuple newtuple); +extern void logicalrep_write_insert(StringInfo out, TransactionId xid, + Relation rel, HeapTuple newtuple); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); -extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, +extern void logicalrep_write_update(StringInfo out, TransactionId xid, + Relation rel, HeapTuple oldtuple, HeapTuple newtuple); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); -extern void logicalrep_write_delete(StringInfo out, Relation rel, - HeapTuple oldtuple); +extern void logicalrep_write_delete(StringInfo out, TransactionId xid, + Relation rel, HeapTuple oldtuple); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); -extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], +extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, + int nrelids, Oid relids[], bool cascade, bool restart_seqs); extern List *logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs); -extern void logicalrep_write_rel(StringInfo out, Relation rel); +extern void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); -extern void logicalrep_write_typ(StringInfo out, Oid typoid); +extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp); +extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid, + bool first_segment); +extern TransactionId logicalrep_read_stream_start(StringInfo in, + bool *first_segment); + +extern void logicalrep_write_stream_stop(StringInfo out, TransactionId xid); +extern TransactionId logicalrep_read_stream_stop(StringInfo in); + +extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern TransactionId logicalrep_read_stream_commit(StringInfo out, + LogicalRepCommitData *commit_data); + +extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, + TransactionId subxid); +extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, + TransactionId *subxid); + #endif /* LOGICAL_PROTO_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 1db706af54..3d19b5d88e 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -163,6 +163,7 @@ typedef struct uint32 proto_version; /* Logical protocol version */ List *publication_names; /* String list of publications */ int work_mem; /* Memory limit to use for decoding */ + bool streaming; /* Streaming of large transactions */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/subscription/t/009_stream_simple.pl b/src/test/subscription/t/009_stream_simple.pl new file mode 100644 index 0000000000..2f01133f69 --- /dev/null +++ b/src/test/subscription/t/009_stream_simple.pl @@ -0,0 +1,86 @@ +# Test streaming of simple large transaction +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/010_stream_subxact.pl b/src/test/subscription/t/010_stream_subxact.pl new file mode 100644 index 0000000000..d2ae38592b --- /dev/null +++ b/src/test/subscription/t/010_stream_subxact.pl @@ -0,0 +1,102 @@ +# Test streaming of large transaction containing large subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rowsto exceed 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s4; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(1667|1667|1667), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(1667|1667|1667), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/011_stream_ddl.pl b/src/test/subscription/t/011_stream_ddl.pl new file mode 100644 index 0000000000..0da39a1a8a --- /dev/null +++ b/src/test/subscription/t/011_stream_ddl.pl @@ -0,0 +1,95 @@ +# Test streaming of large transaction with DDL and subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|0|0), 'check initial data was copied to subscriber'); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (3, md5(3::text)); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (4, md5(4::text), -4); +COMMIT; +}); + +# large (streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i); +COMMIT; +}); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e) FROM test_tab"); +is($result, qq(2002|1999|1002|1), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/012_stream_subxact_abort.pl b/src/test/subscription/t/012_stream_subxact_abort.pl new file mode 100644 index 0000000000..402df30f59 --- /dev/null +++ b/src/test/subscription/t/012_stream_subxact_abort.pl @@ -0,0 +1,82 @@ +# Test streaming of large transaction containing multiple subtransactions and rollbacks +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i); +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i); +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i); +ROLLBACK TO s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i); +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(1000|0), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/013_stream_subxact_ddl_abort.pl b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl new file mode 100644 index 0000000000..becbdd0578 --- /dev/null +++ b/src/test/subscription/t/013_stream_subxact_ddl_abort.pl @@ -0,0 +1,84 @@ +# Test behavior with streaming transaction exceeding logical_decoding_work_mem +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i); +ALTER TABLE test_tab DROP COLUMN c; +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(1000|500), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; -- 2.21.0