From 26a1c4a14a0f1145527f35c4011372eedc5c72a2 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 2 Nov 2023 15:03:13 +0100 Subject: [PATCH v202311272 1/4] Logical decoding of sequences Extend the logical decoding to allow decoding sequence changes. Each sequence change is either transactional or non-transactional, depending on what whether the sequence relfilenode is visible outside the transaction. When a transaction creates a new relfilenode to the sequence (usually because of CREATE SEQUENCE or ALTER SEQUENCE), all subsequent sequence changes are treated as transactional - queued just like other changes in the transaction, and either replayed at commit time or discarded if the transaction aborts. All other sequence changes, performed on sequences / relfilenodes created by earlier transactions, are non-transactional. These changes are not queued but replayed immediately after decoding. This also means these changes are not subject to abort. This mixed behavior is necessary, and is consistent with how sequences behave in general - calling nextval(s) makes the change visible to concurrent transactions, and can't be rolled back. But if DDL is performed on a sequence, changes become transactional - invisible to concurrent transactions until commit, and subject to abort. To identify changes that need transactional handling, each transaction tracks relfilenodes it created in a hash table. Then when a change is decoded, this hash table is used to check if the relfilenode is new. For each relfilenode we track the XID of (sub)transaction that created it, which is needed for cleanup at the transaction end. We don't need to check the XID to decide if an increment is transactional - if we find a match in the hash table, it has to be the same transaction. This requires only minor changes to WAL-logging. We need to ensure the sequence record is always associated with the subxact XID - until now the change might have XID 0 if it was the first change in a subxact. But the sequence might have been created in the same top-level transaction, in which case it still needs transactional treatment. So we ensure the XID is assigned when WAL-logging sequence changes, if wal_level=logical. A patch adding decoding of sequences was originally submitted by Cary Huang. This commit reworks many important aspects (e.g. the WAL logging and transactional/non-transactional handling). However, the original patch and reviews were very useful. Author: Tomas Vondra, Cary Huang Reviewed-by: Ashutosh Bapat, Amit Kapila, Peter Eisentraut, Hannu Krosing, Andres Freund, Zhijie Hou Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca Discussion: https://postgr.es/m/76e5fcd8-8072-8ea2-d361-2e811941000c@enterprisedb.com --- doc/src/sgml/logicaldecoding.sgml | 62 +- src/backend/commands/sequence.c | 89 +++ src/backend/replication/logical/decode.c | 230 ++++++ src/backend/replication/logical/logical.c | 100 ++- .../replication/logical/reorderbuffer.c | 737 +++++++++++++++++- src/include/access/rmgrlist.h | 4 +- src/include/replication/decode.h | 2 + src/include/replication/logical.h | 5 + src/include/replication/output_plugin.h | 23 + src/include/replication/reorderbuffer.h | 42 + src/tools/pgindent/typedefs.list | 1 + 11 files changed, 1284 insertions(+), 11 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index cd152d4ced..e1fe5c108e 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -488,6 +488,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; @@ -502,6 +503,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamSequenceCB stream_sequence_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; @@ -510,10 +512,13 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); The begin_cb, change_cb and commit_cb callbacks are required, while startup_cb, truncate_cb, - message_cb, filter_by_origin_cb, + message_cb, sequence_cb, + filter_by_origin_cb, and shutdown_cb are optional. If truncate_cb is not set but a TRUNCATE is to be decoded, the action will be ignored. + Similarly, if sequence_cb is not set and a sequence + change is to be decoded, the action will be ignored. @@ -521,7 +526,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); in-progress transactions. The stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb, and stream_change_cb - are required, while stream_message_cb and + are required, while stream_message_cb, + stream_sequence_cb and stream_truncate_cb are optional. The stream_prepare_cb is also required if the output plugin also support two-phase commits. @@ -840,6 +846,35 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, + + Sequence Callback + + + The optional sequence_cb callback is called for + actions that update a sequence value. + +typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 value); + + The txn parameter contains meta information about + the transaction the sequence change is part of. Note however that for + non-transactional updates, the transaction may be NULL, depending on + if the transaction already has an XID assigned. + The sequence_lsn has the WAL location of the + sequence update. transactional says if the + sequence has to be replayed as part of the transaction or directly. + + The value parameter describes the sequence change. + Note that this may not the value obtained by the process updating the + process, but the future sequence value written to WAL (typically about + 32 values ahead). + + + Prepare Filter Callback @@ -1051,6 +1086,24 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx + + Stream Sequence Callback + + The optional stream_sequence_cb callback is called + for actions that change a sequence in a block of streamed changes + (demarcated by stream_start_cb and + stream_stop_cb calls). + +typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 value); + + + + Stream Truncate Callback @@ -1230,8 +1283,9 @@ OutputPluginWrite(ctx, true); in-progress transactions. There are multiple required streaming callbacks (stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb - and stream_change_cb) and two optional callbacks - (stream_message_cb and stream_truncate_cb). + and stream_change_cb) and three optional callbacks + (stream_message_cb, stream_sequence_cb, + and stream_truncate_cb). Also, if streaming of two-phase commands is to be supported, then additional callbacks must be provided. (See for details). diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index da2ace79cc..616b8685e3 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -204,6 +204,14 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq) stmt->tablespacename = NULL; stmt->if_not_exists = seq->if_not_exists; + /* + * Make sure the relfilenode creation (in DefineRelation) is associated + * with the XID, so that decoding of sequences can handle this as a + * transactional change. + */ + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + address = DefineRelation(stmt, RELKIND_SEQUENCE, seq->ownerId, NULL, NULL); seqoid = address.objectId; Assert(seqoid != InvalidOid); @@ -302,6 +310,13 @@ ResetSequence(Oid seq_relid) seq->is_called = false; seq->log_cnt = 0; + /* + * Make sure the relfilenode creation is associated with the XID, so that + * decoding of sequences can handle this as a transactional change. + */ + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + /* * Create a new storage file for the sequence. */ @@ -392,8 +407,19 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum) /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(rel)) + { GetTopTransactionId(); + /* + * Make sure the subtransaction has a XID assigned, so that the + * sequence change WAL record is properly associated with it. This + * matters for sequences created/altered in a transaction - the + * subsequent changes need to be handled as transactional. + */ + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + START_CRIT_SECTION(); MarkBufferDirty(buf); @@ -417,6 +443,9 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum) XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) tuple->t_data, tuple->t_len); + /* allow filtering by origin on a sequence update */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); PageSetLSN(page, recptr); @@ -499,6 +528,14 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) if (RelationNeedsWAL(seqrel)) GetTopTransactionId(); + /* + * Make sure the relfilenode creation is associated with the XID, so + * that decoding of sequences can handle this as a transactional + * change. + */ + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + /* * Create a new storage file for the sequence, making the state * changes transactional. @@ -548,8 +585,19 @@ SequenceChangePersistence(Oid relid, char newrelpersistence) /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(seqrel)) + { GetTopTransactionId(); + /* + * Make sure the subtransaction has a XID assigned, so that the + * sequence change WAL record is properly associated with it. This + * matters for sequences created/altered in a transaction - the + * subsequent changes need to be handled as transactional. + */ + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + (void) read_seq_tuple(seqrel, &buf, &seqdatatuple); RelationSetNewRelfilenumber(seqrel, newrelpersistence); fill_seq_with_data(seqrel, &seqdatatuple); @@ -792,10 +840,34 @@ nextval_internal(Oid relid, bool check_permissions) * It's sufficient to ensure the toplevel transaction has an xid, no need * to assign xids subxacts, that'll already trigger an appropriate wait. * (Have to do that here, so we're outside the critical section) + * + * We have to ensure we have a proper XID, which will be included in the + * XLOG record by XLogRecordAssemble. Otherwise the first nextval() in a + * subxact (without any preceding changes) would get XID 0, and it would + * then be impossible to decide which top xact it belongs to. It'd also + * trigger assert in DecodeSequence. We only do that with + * wal_level=logical, though. + * + * XXX This might seem unnecessary, because if there's no XID the xact + * couldn't have done anything important yet, e.g. it could not have + * created a sequence. But that's incorrect, because of subxacts. The + * current subtransaction might not have done anything yet (thus no XID), + * but an earlier one might have created the sequence. */ if (logit && RelationNeedsWAL(seqrel)) + { GetTopTransactionId(); + /* + * Make sure the subtransaction has a XID assigned, so that the + * sequence change WAL record is properly associated with it. This + * matters for sequences created/altered in a transaction - the + * subsequent changes need to be handled as transactional. + */ + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); @@ -835,6 +907,9 @@ nextval_internal(Oid relid, bool check_permissions) XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); + /* allow filtering by origin on a sequence update */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); PageSetLSN(page, recptr); @@ -996,8 +1071,19 @@ do_setval(Oid relid, int64 next, bool iscalled) /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(seqrel)) + { GetTopTransactionId(); + /* + * Make sure the subtransaction has a XID assigned, so that the + * sequence change WAL record is properly associated with it. This + * matters for sequences created/altered in a transaction - the + * subsequent changes need to be handled as transactional. + */ + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); @@ -1021,6 +1107,9 @@ do_setval(Oid relid, int64 next, bool iscalled) XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); + /* allow filtering by origin on a sequence update */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); PageSetLSN(page, recptr); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1237118e84..9bfed0f5ce 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -35,6 +35,7 @@ #include "access/xlogrecord.h" #include "access/xlogutils.h" #include "catalog/pg_control.h" +#include "catalog/storage_xlog.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" @@ -42,6 +43,7 @@ #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standby.h" +#include "commands/sequence.h" /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -63,6 +65,7 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple); +static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple); /* helper functions for decoding transactions */ static inline bool FilterPrepare(LogicalDecodingContext *ctx, @@ -1320,3 +1323,230 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, return false; } + +/* + * DecodeSeqTuple + * decode tuple describing the sequence change + * + * Sequences are represented as a table with a single row, which gets updated by + * nextval() etc. The tuple is stored in WAL right after the xl_seq_rec, so we + * simply copy it into the tuplebuf (similar to seq_redo). + */ +static void +DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) +{ + int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader; + + Assert(datalen >= 0); + + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; + + ItemPointerSetInvalid(&tuple->tuple.t_self); + + tuple->tuple.t_tableOid = InvalidOid; + + memcpy(((char *) tuple->tuple.t_data), + data + sizeof(xl_seq_rec), + SizeofHeapTupleHeader); + + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, + data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, + datalen); +} + +/* + * Handle sequence decode + * + * Decoding sequences is a bit tricky, because while most sequence actions + * are non-transactional (immediately visible and not subject to rollback), + * some need to be handled as transactional (e.g. for sequences created in + * a transaction are invisible until that transaction commits). + * + * By default, a sequence change is non-transactional - we must not queue + * it in a transaction as other changes, because the transaction might get + * rolled back and we'd discard the change. But that'd be incorrect, as the + * change is already visible to other processes accessing the sequence. If + * we discard the change, the downstream will not be notified about the + * change, and may fall behind. + * + * On the other hand, the sequence may be created in a transaction. In this + * case we *have to* queue it in the transaction just like other changes, + * because we don't want to process changes for sequences that may be unknown + * outside the transaction - the downstream might get confused about which + * sequence it's related to etc. + */ +void +seq_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferTupleBuf *tuplebuf; + RelFileLocator target_locator; + XLogReaderState *r = buf->record; + char *tupledata = NULL; + Size tuplelen; + Size datalen = 0; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + Snapshot snapshot = NULL; + RepOriginId origin_id = XLogRecGetOrigin(r); + bool transactional; + + /* ignore sequences when the plugin does not have the callbacks */ + if (!ctx->sequences) + return; + + /* only decode changes flagged with XLOG_SEQ_LOG */ + if (info != XLOG_SEQ_LOG) + elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info); + + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding sequences. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + return; + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + if (target_locator.dbOid != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + /* time to decode the sequence data from the tuple */ + tupledata = XLogRecGetData(r); + datalen = XLogRecGetDataLen(r); + tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); + + /* the sequence should not have changed without data */ + if (!datalen || !tupledata) + elog(ERROR, "sequence decode missing tuple data"); + + tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + DecodeSeqTuple(tupledata, datalen, tuplebuf); + + /* + * Should we handle the sequence change as transactional or not? + * + * If the relfilenode was created in the current transaction, treat the + * change as transactional and queue it. Otherwise it needs to be treated + * as non-transactional, in which case we just send it to the plugin right + * away. + */ + transactional = ReorderBufferSequenceIsTransactional(ctx->reorder, + target_locator, + NULL); + + /* Skip the change if already processed (per the snapshot). */ + if (transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) + return; + else if (!transactional && + (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || + SnapBuildXactNeedsSkip(builder, buf->origptr))) + return; + + /* + * We also skip decoding in fast_forward mode. This check must be last + * because we don't want to set the processing_required flag unless we + * have a decodable sequence change. + */ + if (ctx->fast_forward) + { + /* + * We need to set processing_required flag to notify the sequence + * change existence to the caller. Usually, the flag is set when + * either the COMMIT or ABORT records are decoded, but this must be + * turned on here because the non-transactional logical message is + * decoded without waiting for these records. + */ + if (!transactional) + ctx->processing_required = true; + + return; + } + + /* + * If this is a non-transactional change, get the snapshot we're expected + * to use. We only get here when the snapshot is consistent, and the + * change is not meant to be skipped. + * + * For transactional changes we don't need a snapshot, we'll use the + * regular snapshot maintained by ReorderBuffer. We just leave it NULL. + */ + if (!transactional) + snapshot = SnapBuildGetOrBuildSnapshot(builder); + + /* Queue the change (or send immediately if not transactional). */ + ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr, + origin_id, target_locator, transactional, + tuplebuf); +} + +/* + * Decode relfilenode change + * + * Decode SMGR records, so that we can later decide which sequence changes + * need to be treated as transactional. Most sequence changes are going to + * be non-transactional (applied as if outside the decoded transaction, not + * subject to rollback, etc.). Changes for sequences created/altered in the + * transaction need to be handled as transactional (i.e. applied as part of + * the decoded transaction, same as all other changes). + * + * To decide which of those cases is it we decode XLOG_SMGR_CREATE records + * and track relfilenodes created in each (sub)transaction. Changes for + * these relfilenodes are then queued and treated as transactional, while + * remaining changes are treated as non-transactional. + * + * We only care about XLOG_SMGR_CREATE records for "our" database (logical + * decoding is restricted to a single database), and we do the filtering + * and skipping, as appropriate. + */ +void +smgr_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogReaderState *r = buf->record; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + xl_smgr_create *xlrec; + + /* + * Bail out when not decoding sequences, which is currently the only case + * when we need to know about relfilenodes created in a transaction. + */ + if (!ctx->sequences) + return; + + /* + * We only care about XLOG_SMGR_CREATE, because that's what determines if + * the following sequence changes are transactional. + */ + if (info != XLOG_SMGR_CREATE) + return; + + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding relfilenode information. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + /* only interested in our database */ + xlrec = (xl_smgr_create *) XLogRecGetData(r); + if (xlrec->rlocator.dbOid != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + ReorderBufferAddRelFileLocator(ctx->reorder, xid, buf->endptr, xlrec->rlocator); +} diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8288da5277..3a7b3b9be0 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -75,6 +75,9 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, int64 value); /* streaming callbacks */ static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -92,6 +95,9 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, int64 value); static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); @@ -223,13 +229,14 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->sequence = sequence_cb_wrapper; /* * To support streaming, we require start/stop/abort/commit/change - * callbacks. The message and truncate callbacks are optional, similar to - * regular output plugins. We however enable streaming when at least one - * of the methods is enabled so that we can easily identify missing - * methods. + * callbacks. The message, sequence and truncate callbacks are optional, + * similar to regular output plugins. We however enable streaming when at + * least one of the methods is enabled so that we can easily identify + * missing methods. * * We decide it here, but only check it later in the wrappers. */ @@ -239,6 +246,7 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || + (ctx->callbacks.stream_sequence_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); /* @@ -256,6 +264,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; + ctx->reorder->stream_sequence = stream_sequence_cb_wrapper; ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; @@ -289,6 +298,13 @@ StartupDecodingContext(List *output_plugin_options, */ ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper; + /* + * To support logical decoding of sequences, we require the sequence + * callback. We decide it here, but only check it later in the wrappers. + */ + ctx->sequences = ((ctx->callbacks.sequence_cb != NULL) || + (ctx->callbacks.stream_sequence_cb != NULL)); + ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; ctx->write = do_write; @@ -1262,6 +1278,42 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, bool transactional, + int64 value) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (ctx->callbacks.sequence_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "sequence"; + state.report_location = sequence_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = sequence_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, + transactional, value); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) @@ -1577,6 +1629,46 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, int64 value) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (ctx->callbacks.stream_sequence_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_sequence"; + state.report_location = sequence_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = sequence_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, + transactional, value); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 12edc5772a..f21aa340c8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -77,6 +77,41 @@ * a bit more memory to the oldest subtransactions, because it's likely * they are the source for the next sequence of changes. * + * When decoding sequences, we differentiate between transactional and + * non-transactional changes. Sequence change is "non-transactional" if + * it was visible to outside transactions immediately, and would not be + * undone by a rollback. Most sequence changes are non-transactional, as + * the changes performed by nextval() are visible and can't be undone. + * Non-transactional changes are replayed immediately, as if outside any + * transaction (and thus also not subject to rollback etc.). + * + * However, if a transaction modifies the sequence in a way that creates + * a new relfilenode (e.g. by running ALTER SEQUENCE), we consider the + * subsequent changes "transactional" and queue them in the transaction + * that performed them, just like other changes. If that transaction rolls + * back, these sequence changes are discarded too (together with the + * new relfilenode). + * + * This mixed behavior is necessary - sequences are non-transactional + * (e.g. ROLLBACK does not undo that the sequence moved forward). But + * changes on new relfilenodes need to be handled as transactional. + * + * To decide if a sequence change is transactional, we maintain a hash + * table of relfilenodes created in each (sub)transactions, along with + * the XID of the (sub)transaction that created the relfilenode. The + * entries from substransactions are copied to the top-level transaction + * to make checks cheaper. The hash table gets cleaned up when the + * transaction completes (commit/abort). + * + * The XID of the subxact that created the relfilenode is necessary, as + * that's where the sequence changes need to be queued - if this subxact + * rolls back, we want to discard the changes tied to this relfilenode + * (even if done in some other subtransaction). + * + * The XID may be valid even for non-transactional sequences - we simply + * keep the XID decoded from WAL, it's up to the reorderbuffer to decide + * if the change is transactional. + * * ------------------------------------------------------------------------- */ #include "postgres.h" @@ -91,6 +126,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "commands/sequence.h" #include "lib/binaryheap.h" #include "miscadmin.h" #include "pgstat.h" @@ -116,6 +152,13 @@ typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXN *txn; } ReorderBufferTXNByIdEnt; +/* entry for hash table we use to track sequences created in running xacts */ +typedef struct ReorderBufferSequenceEnt +{ + RelFileLocator rlocator; + ReorderBufferTXN *txn; +} ReorderBufferSequenceEnt; + /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */ typedef struct ReorderBufferTupleCidKey { @@ -225,6 +268,7 @@ static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn); static void AssertTXNLsnOrder(ReorderBuffer *rb); +static void AssertCheckSequences(ReorderBuffer *rb); /* --------------------------------------- * support functions for lsn-order iterating over the ->changes of a @@ -456,12 +500,48 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->invalidations = NULL; } + if (txn->sequences_hash != NULL) + { + hash_destroy(txn->sequences_hash); + txn->sequences_hash = NULL; + } + /* Reset the toast hash */ ReorderBufferToastReset(rb, txn); pfree(txn); } +/* + * Initialize hash table of relfilenodes created by the transaction. + * + * Each entry maps the relfilenode to the (sub)transaction that created the + * relfilenode - which is also the transaction the sequence change needs to + * be part of (in transactional case). + * + * We don't do this in ReorderBufferGetTXN because that'd allocate the hash + * for all transactions, and we expect new relfilenodes to be fairly rare. + * So only do that when adding the first entry. + */ +static void +ReorderBufferTXNSequencesInit(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + HASHCTL hash_ctl; + + /* bail out if already initialized */ + if (txn->sequences_hash) + return; + + /* hash table of sequences, mapping relfilelocator to transaction */ + hash_ctl.keysize = sizeof(RelFileLocator); + hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt); + hash_ctl.hcxt = rb->context; + + /* we expect relfilenodes to be created only rarely, so 32 seems enough */ + txn->sequences_hash = hash_create("ReorderBufferTXNSequenceHash", 32, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + /* * Get a fresh ReorderBufferChange. */ @@ -536,6 +616,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, change->data.truncate.relids = NULL; } break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.tuple) + { + ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple); + change->data.sequence.tuple = NULL; + } + break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -804,7 +891,8 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, change->action == REORDER_BUFFER_CHANGE_DELETE || change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT || change->action == REORDER_BUFFER_CHANGE_TRUNCATE || - change->action == REORDER_BUFFER_CHANGE_MESSAGE) + change->action == REORDER_BUFFER_CHANGE_MESSAGE || + change->action == REORDER_BUFFER_CHANGE_SEQUENCE) { ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn); @@ -895,6 +983,351 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } +/* + * Treat the sequence change as transactional? + * + * To decide if a sequence change should be handled as transactional or applied + * immediately, we need to decide if the relfilenode was created by a running + * transaction. Each transaction has a hash table of such relfilenodes. + * + * A naive approach would be to just loop through all transactions and check + * each of them, but there may be (easily thousands) of subtransactions, and + * the check happens for each sequence change. So this could be very costly. + * + * To limit the number of transactions we need to check, we transfer the + * relfilenodes to the top-level xact, which allows us to check just the + * top-level xacts (and there should be a very limited number of those). We + * expect relfilenode creation to be a quite rare thing, so this should not + * waste too much memory. OTOH sequence changes are very common, so making + * the check cheaper seems like a good trade off. + * + * Returns true for transactional changes, false otherwise. + * + * Optionaly (when the "xid" parameter is set) returns XID of the (sub)xact + * to use for queueing transactional changes. + * + * XXX As an optimization, maybe we should try searching the current xact (or + * it's top-level xact) first. If the change is transactional, where we'd + * find the match. + */ +bool +ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, + RelFileLocator rlocator, + TransactionId *xid) +{ + bool found = false; + dlist_iter iter; + + AssertCheckSequences(rb); + + /* + * Walk all top-level transactions (some of which may be subxacts, except + * that we haven't processed the assignments yet), and check if any of + * them created the relfilenode. + */ + dlist_foreach(iter, &rb->toplevel_by_lsn) + { + ReorderBufferSequenceEnt *entry; + ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node, + iter.cur); + + /* transaction has no relfilenodes at all */ + if (!txn->sequences_hash) + continue; + + entry = hash_search(txn->sequences_hash, + (void *) &rlocator, + HASH_FIND, + &found); + + /* + * If we found an entry with matchine relfilenode, we're done - we + * have to treat the sequence change as transactional, and replay it + * in the same (sub)transaction just like any other change. + * + * Optionally set XID of the (sub)xact that created the relfilenode. + */ + if (found) + { + if (xid) + *xid = entry->txn->xid; + + break; + } + } + + return found; +} + +/* + * Cleanup sequences after a subtransaction got aborted. + * + * The hash table will get destroyed in ReorderBufferReturnTXN, so we don't + * need to worry about that. But the entries were copied to the parent xact, + * and that's still being decoded - we make sure to remove the entries from + * the aborted one. + */ +static void +ReorderBufferSequenceCleanup(ReorderBufferTXN *txn) +{ + HASH_SEQ_STATUS scan_status; + ReorderBufferSequenceEnt *ent; + + /* Bail out if not a subxact, or if there are no entries. */ + if (!rbtxn_is_known_subxact(txn)) + return; + + if (!txn->sequences_hash) + return; + + /* + * Scan the top-level transaction hash and remove the entries from it. If + * we have entries for subxact, the top-level hash must have been + * initialized. + */ + hash_seq_init(&scan_status, txn->sequences_hash); + while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) + { + (void) hash_search(txn->toptxn->sequences_hash, + (void *) &ent->rlocator, + HASH_REMOVE, NULL); + } +} + +/* + * A transactional sequence change is queued to be processed upon commit + * and a non-transactional change gets processed immediately. + * + * A sequence update may be both transactional and non-transactional. When + * created in a running transaction, treat it as transactional and queue + * the change in it. Otherwise treat it as non-transactional, so that we + * don't forget the change in case of a rollback. + */ +void +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, + RelFileLocator rlocator, bool transactional, + ReorderBufferTupleBuf *tuplebuf) +{ + AssertCheckSequences(rb); + + /* + * Change needs to be handled as transactional, because the sequence was + * created in a transaction that is still seen as running. In that case + * all the changes need to be queued in that transaction, we must not send + * the changes to the downstream until the transaction commits. + * + * There's a bit of a trouble with subtransactions - we can't queue it + * into the subxact, because it might be rolled back and we'd lose the + * change (and that'd be wrong, as sequences are not transactional). But + * we also can't queue it into the top-level transaction, because the + * relfilenode creation is transactional - if that subxact gets aborted, + * we must throw away the changes too. + * + * So we need to queue it into the same (sub)xact that created the new + * sequence relfilenode, which is why we have the XID in the hash table. + */ + if (transactional) + { + MemoryContext oldcontext; + ReorderBufferChange *change; + + /* allocate and queue the transactional sequence change */ + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + + change->action = REORDER_BUFFER_CHANGE_SEQUENCE; + change->origin_id = origin_id; + + memcpy(&change->data.sequence.locator, &rlocator, sizeof(RelFileLocator)); + + change->data.sequence.tuple = tuplebuf; + + /* lookup the XID for transaction that created the relfilenode */ + ReorderBufferSequenceIsTransactional(rb, rlocator, &xid); + + /* the XID should be valid for a transactional change */ + Assert(TransactionIdIsValid(xid)); + + /* add it to the same (sub)xact that created that relfilenode */ + ReorderBufferQueueChange(rb, xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); + } + else + { + /* + * This change is for a sequence that was not created in any running + * transaction, so we treat it as non-transactional and just send it + * to the output plugin directly. + */ + ReorderBufferTXN *txn = NULL; + volatile Snapshot snapshot_now = snapshot; + bool using_subtxn; + + /* non-transactional changes require a valid snapshot */ + Assert(snapshot_now); + + /* Make sure the sequence is not in any of the hash tables */ + Assert(!ReorderBufferSequenceIsTransactional(rb, rlocator, NULL)); + + if (xid != InvalidTransactionId) + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* setup snapshot to allow catalog access */ + SetupHistoricSnapshot(snapshot_now, NULL); + + /* + * Decoding needs access to syscaches et al., which in turn use + * heavyweight locks and such. Thus we need to have enough state + * around to keep track of those. The easiest way is to simply use a + * transaction internally. That also allows us to easily enforce that + * nothing writes to the database by checking for xid assignments. + * + * When we're called via the SQL SRF there's already a transaction + * started, so start an explicit subtransaction there. + */ + using_subtxn = IsTransactionOrTransactionBlock(); + + PG_TRY(); + { + Relation relation; + HeapTuple tuple; + Form_pg_sequence_data seq; + Oid reloid; + int64 value; + + if (using_subtxn) + BeginInternalSubTransaction("sequence"); + else + StartTransactionCommand(); + + reloid = RelidByRelfilenumber(rlocator.spcOid, rlocator.relNumber); + + if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(rlocator, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", + reloid, + relpathperm(rlocator, MAIN_FORKNUM)); + + tuple = &tuplebuf->tuple; + seq = (Form_pg_sequence_data) GETSTRUCT(tuple); + + /* + * Calculate the first value of the next batch (at which point we + * generate and decode another WAL record. + */ + value = seq->last_value; + value += (seq->is_called) ? seq->log_cnt : 0; + + rb->sequence(rb, txn, lsn, relation, transactional, value); + + RelationClose(relation); + + TeardownHistoricSnapshot(false); + + AbortCurrentTransaction(); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + } + PG_CATCH(); + { + TeardownHistoricSnapshot(true); + + AbortCurrentTransaction(); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + + PG_RE_THROW(); + } + PG_END_TRY(); + } +} + +/* + * ReorderBufferAddRelFileLocator + * Add newly created relfilenode to the hash table for transaction. + * + * If the transaction is already known to be a subtransaction, we add the same + * entry into the parent transaction (but it still points at the subxact, so + * that we know where to queue changes or what to discard in case of an abort). + */ +void +ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, RelFileLocator rlocator) +{ + bool found; + ReorderBufferSequenceEnt *entry; + ReorderBufferTXN *txn; + + AssertCheckSequences(rb); + + /* + * We only care about sequence relfilenodes for now, and those always have + * a XID. So if there's no XID, don't bother adding them to the hash. + */ + if (xid == InvalidTransactionId) + return; + + /* + * This might be the first change decoded for this transaction, so make + * sure we create it if needed. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* + * First add it to the top-level transaction, but make sure it links to + * the correct subtransaction (so that we add later changes to it). + */ + if (txn->toptxn) + { + /* make sure the hash table is initialized */ + ReorderBufferTXNSequencesInit(rb, txn->toptxn); + + /* search the lookup table */ + entry = hash_search(txn->toptxn->sequences_hash, + (void *) &rlocator, + HASH_ENTER, + &found); + + /* + * We've just decoded creation of the relfilenode, so if we found it + * in the hash table, something is wrong. + */ + Assert(!found); + + entry->txn = txn; + } + + /* make sure the hash table is initialized */ + ReorderBufferTXNSequencesInit(rb, txn); + + /* search the lookup table */ + entry = hash_search(txn->sequences_hash, + (void *) &rlocator, + HASH_ENTER, + &found); + + /* + * We've just decoded creation of the relfilenode, so if we found it in + * the hash table, something is wrong. + */ + Assert(!found); + + entry->txn = txn; + + AssertCheckSequences(rb); +} + /* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer @@ -969,6 +1402,161 @@ AssertTXNLsnOrder(ReorderBuffer *rb) #endif } +/* + * AssertCheckSequences + * Verify consistency of hash tables tracking new relfilenodes. + * + * We check that the hash table in the top-level transaction is consistent with + * respect to the hash tables in it's subxacts. That is, each entry has to be + * either from the top-level xact or one of it's subtransactions. Likewise, each + * entry in subxact hash table has to have a match in the top-level hash table. + */ +static void +AssertCheckSequences(ReorderBuffer *rb) +{ +#ifdef USE_ASSERT_CHECKING + LogicalDecodingContext *ctx = rb->private_data; + dlist_iter iter; + + /* + * Skip the verification if we don't reach the LSN at which we start + * decoding the contents of transactions yet because until we reach the + * LSN, we could have transactions that don't have the association between + * the top-level transaction and subtransaction yet and consequently have + * the same LSN. We don't guarantee this association until we try to + * decode the actual contents of transaction. The ordering of the records + * prior to the start_decoding_at LSN should have been checked before the + * restart. + */ + if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, ctx->reader->EndRecPtr)) + return; + + /* + * Make sure the relfilenodes from subxacts are properly recorded in the + * top-level transaction hash table. + */ + dlist_foreach(iter, &rb->toplevel_by_lsn) + { + int nentries = 0, + nsubentries = 0; + dlist_iter subiter; + ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, node, + iter.cur); + + /* + * We don't skip top-level transactions without relfilenodes, because + * there might be a subtransaction with some, and we want to detect + * such cases too. + */ + if (txn->sequences_hash) + nentries = hash_get_num_entries(txn->sequences_hash); + + /* walk all subxacts, and check the hash table in each one */ + dlist_foreach(subiter, &txn->subtxns) + { + HASH_SEQ_STATUS scan_status; + ReorderBufferSequenceEnt *entry; + + ReorderBufferTXN *subtxn = dlist_container(ReorderBufferTXN, node, + subiter.cur); + + /* + * If this subxact has no relfilenodes, skip it. We'll do the + * check in the opposite direction (that all top-level + * relfilenodes are in the correct subxact) later. + */ + if (!subtxn->sequences_hash) + continue; + + /* add number of relfilenodes in this subxact */ + nsubentries += hash_get_num_entries(subtxn->sequences_hash); + + /* + * Check that all subxact relfilenodes are in the top-level txn + * too, and are pointing to this subtransaction. + */ + hash_seq_init(&scan_status, subtxn->sequences_hash); + while ((entry = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) + { + bool found = false; + ReorderBufferSequenceEnt *subentry; + + /* search for the same relfilenode in the top-level txn */ + subentry = hash_search(txn->sequences_hash, + (void *) &entry->rlocator, + HASH_FIND, + &found); + + /* + * The top-level txn hash should have the relfilenode too, and + * it should point to this subxact. + */ + Assert(found); + + /* + * The entry has to point to the subxact - there's no subxact + * "below" this one to which the relfilenode could belong. + */ + Assert(subentry->txn == subtxn); + } + } + + /* + * We shouldn't have more relfilenodes in subtransactions than in the + * top-level one. There might be relfilenodes in the top-level one + * directly, so this needs to be inequality. + */ + Assert(nentries >= nsubentries); + + /* + * Now do the check in the opposite direction - check that every entry + * in the top-level txn (except those pointing to the top-level txn + * itself) point to one of the subxacts, and there's an entry in the + * subxact hash. + */ + if (txn->sequences_hash) + { + HASH_SEQ_STATUS scan_status; + ReorderBufferSequenceEnt *entry; + + hash_seq_init(&scan_status, txn->sequences_hash); + while ((entry = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) + { + bool found = false; + ReorderBufferSequenceEnt *subentry; + + /* Skip entries for the top-level txn itself. */ + if (entry->txn == txn) + continue; + + /* Is it a subxact of this txn? */ + Assert(rbtxn_is_known_subxact(entry->txn)); + Assert(entry->txn->toptxn == txn); + + /* + * Search for the same relfilenode in the subxact (it should + * be initialized, as we expect it to contain the + * relfilenode). + */ + subentry = hash_search(entry->txn->sequences_hash, + (void *) &entry->rlocator, + HASH_FIND, + &found); + + Assert(found); + + /* + * Check the txn pointer in the top-level hash table entry is + * consistent with the subxact hash table (we already checked + * the subxact entry points to the subxact itself). + */ + Assert(subentry->txn = entry->txn); + } + } + } +#endif +} + /* * AssertChangeLsnOrder * @@ -1094,6 +1682,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, subtxn->toplevel_xid = xid; Assert(subtxn->nsubtxns == 0); + /* There should be no sequence relfilenodes in the subxact yet. */ + Assert(!subtxn->sequences_hash); + /* set the reference to top-level transaction */ subtxn->toptxn = txn; @@ -1997,6 +2588,35 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message); } +/* + * Helper function for ReorderBufferProcessTXN for applying sequences. + */ +static inline void +ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change, + bool streaming) +{ + HeapTuple tuple; + Form_pg_sequence_data seq; + int64 value; + + tuple = &change->data.sequence.tuple->tuple; + seq = (Form_pg_sequence_data) GETSTRUCT(tuple); + + value = seq->last_value; + value += (seq->is_called) ? seq->log_cnt : 0; + + /* + * When called from ReorderBufferApplySequence, we're applying changes + * accumulated in a ReorderBufferTXN, so all those are transactional + * changes of sequences. + */ + if (streaming) + rb->stream_sequence(rb, txn, change->lsn, relation, true, value); + else + rb->sequence(rb, txn, change->lsn, relation, true, value); +} + /* * Function to store the command id and snapshot at the end of the current * stream so that we can reuse the same while sending the next stream. @@ -2442,6 +3062,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; + + case REORDER_BUFFER_CHANGE_SEQUENCE: + Assert(snapshot_now); + + reloid = RelidByRelfilenumber(change->data.sequence.locator.spcOid, + change->data.sequence.locator.relNumber); + + if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(change->data.sequence.locator, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", + reloid, + relpathperm(change->data.sequence.locator, + MAIN_FORKNUM)); + + if (RelationIsLogicallyLogged(relation)) + ReorderBufferApplySequence(rb, txn, relation, change, streaming); + + RelationClose(relation); + break; } /* @@ -2872,6 +3517,10 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, /* cleanup: make sure there's no cache pollution */ ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + + /* cleanup: remove sequence relfilenodes from the top-level txn */ + ReorderBufferSequenceCleanup(txn); + ReorderBufferCleanupTXN(rb, txn); } @@ -2921,8 +3570,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, /* cosmetic... */ txn->final_lsn = lsn; + /* remove sequence relfilenodes from the top-level txn */ + ReorderBufferSequenceCleanup(txn); + /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); + + AssertCheckSequences(rb); } /* @@ -2958,8 +3612,13 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) if (rbtxn_is_streamed(txn)) rb->stream_abort(rb, txn, InvalidXLogRecPtr); + /* remove sequence relfilenodes from the top-level txn */ + ReorderBufferSequenceCleanup(txn); + /* remove potential on-disk data, and deallocate this tx */ ReorderBufferCleanupTXN(rb, txn); + + AssertCheckSequences(rb); } else return; @@ -3008,6 +3667,9 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) else Assert(txn->ninvalidations == 0); + /* remove sequence relfilenodes from a top-level txn */ + ReorderBufferSequenceCleanup(txn); + /* remove potential on-disk data, and deallocate */ ReorderBufferCleanupTXN(rb, txn); } @@ -3909,6 +4571,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, memcpy(data, change->data.truncate.relids, size); data += size; + break; + } + case REORDER_BUFFER_CHANGE_SEQUENCE: + { + char *data; + ReorderBufferTupleBuf *tup; + Size len = 0; + + tup = change->data.sequence.tuple; + + if (tup) + { + sz += sizeof(HeapTupleData); + len = tup->tuple.t_len; + sz += len; + } + + /* make sure we have enough space */ + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + if (len) + { + memcpy(data, &tup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, tup->tuple.t_data, len); + data += len; + } + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -4173,6 +4868,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change) { sz += sizeof(Oid) * change->data.truncate.nrelids; + break; + } + case REORDER_BUFFER_CHANGE_SEQUENCE: + { + ReorderBufferTupleBuf *tup; + Size len = 0; + + tup = change->data.sequence.tuple; + + if (tup) + { + sz += sizeof(HeapTupleData); + len = tup->tuple.t_len; + sz += len; + } + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -4476,6 +5187,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } + + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.tuple) + { + uint32 tuplelen = ((HeapTuple) data)->t_len; + + change->data.sequence.tuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&change->data.sequence.tuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ + change->data.sequence.tuple->tuple.t_data = + ReorderBufferTupleBufData(change->data.sequence.tuple); + + /* restore tuple data itself */ + memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen); + data += tuplelen; + } + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 463bcb67c5..305ca49a55 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -27,7 +27,7 @@ /* symbol name, textual name, redo, desc, identify, startup, cleanup, mask, decode */ PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, xlog_decode) PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, xact_decode) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, smgr_decode) PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL) @@ -40,7 +40,7 @@ PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, seq_decode) PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 14fa921ab4..8d06247165 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -27,6 +27,8 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void seq_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void smgr_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index dffc0d1564..72a17112ef 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -90,6 +90,11 @@ typedef struct LogicalDecodingContext */ bool twophase; + /* + * Does the output pluging support decoding of sequence changes? + */ + bool sequences; + /* * Is two-phase option given by output plugin? * diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 2ffcf17505..7b5085f7d0 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -90,6 +90,16 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, Size message_size, const char *message); +/* + * Called for the generic logical decoding sequences. + */ +typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 value); + /* * Filter changes by origin. */ @@ -201,6 +211,17 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx Size message_size, const char *message); +/* + * Called for the streaming generic logical decoding sequences from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 value); + /* * Callback for streaming truncates from in-progress transactions. */ @@ -221,6 +242,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; @@ -239,6 +261,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamSequenceCB stream_sequence_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index f986101e50..58a99b7406 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -74,6 +74,7 @@ typedef enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_SEQUENCE, } ReorderBufferChangeType; /* forward declaration */ @@ -167,6 +168,13 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; + + /* Context data for Sequence changes */ + struct + { + RelFileLocator locator; + ReorderBufferTupleBuf *tuple; + } sequence; } data; /* @@ -392,6 +400,12 @@ typedef struct ReorderBufferTXN */ HTAB *toast_hash; + /* + * Sequence relfilenodes created in this transaction (also includes + * altered sequences, which assigns new relfilenode). + */ + HTAB *sequences_hash; + /* * non-hierarchical list of subtransactions that are *not* aborted. Only * used in toplevel transactions. @@ -470,6 +484,14 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* sequence callback signature */ +typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 value); + /* begin prepare callback signature */ typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -536,6 +558,14 @@ typedef void (*ReorderBufferStreamMessageCB) ( const char *prefix, Size sz, const char *message); +/* stream sequence callback signature */ +typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 value); + /* stream truncate callback signature */ typedef void (*ReorderBufferStreamTruncateCB) ( ReorderBuffer *rb, @@ -592,6 +622,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferSequenceCB sequence; /* * Callbacks to be called when streaming a transaction at prepare time. @@ -611,6 +642,7 @@ struct ReorderBuffer ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamSequenceCB stream_sequence; ReorderBufferStreamTruncateCB stream_truncate; /* @@ -696,6 +728,10 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +extern void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, + RelFileLocator locator, bool transactional, + ReorderBufferTupleBuf *tuplebuf); extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); @@ -750,4 +786,10 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr); extern void StartupReorderBuffer(void); +extern void ReorderBufferAddRelFileLocator(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, RelFileLocator rlocator); +extern bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, + RelFileLocator locator, + TransactionId *xid); + #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 86a9886d4f..73b7b37a5e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2329,6 +2329,7 @@ ReorderBufferIterTXNState ReorderBufferMessageCB ReorderBufferPrepareCB ReorderBufferRollbackPreparedCB +ReorderBufferSequenceEnt ReorderBufferStreamAbortCB ReorderBufferStreamChangeCB ReorderBufferStreamCommitCB -- 2.27.0