From ae907baaaf7401a4ac906de6127f4318241ac3a5 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 18 Nov 2019 09:53:08 +0530 Subject: [PATCH 01/17] Immediately WAL-log assignments The logical decoding infrastructure needs to know which top-level transaction the subxact belongs to, in order to decode all the changes. Until now that might be delayed until commit, due to the caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring incremental decoding. So instead we write the assignment info into WAL immediately, as part of the next WAL record (to minimize overhead). --- src/backend/access/transam/xact.c | 45 ++++++++++++++++++++++++ src/backend/access/transam/xloginsert.c | 22 ++++++++++-- src/backend/access/transam/xlogreader.c | 5 +++ src/backend/replication/logical/decode.c | 39 ++++++++++---------- src/include/access/xact.h | 3 ++ src/include/access/xlog.h | 1 + src/include/access/xlogreader.h | 3 ++ src/include/access/xlogrecord.h | 1 + 8 files changed, 98 insertions(+), 21 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 5353b6ab0b..708e5233f4 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -190,6 +190,7 @@ typedef struct TransactionStateData bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ bool chain; /* start a new block after this one */ + bool assigned; /* assigned to toplevel XID */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -5096,6 +5097,7 @@ PushTransaction(void) GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; s->parallelModeLevel = 0; + s->assigned = false; CurrentTransactionState = s; @@ -6002,3 +6004,46 @@ xact_redo(XLogReaderState *record) else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * IsSubTransactionAssignmentPending + * + * This returns true if we are inside a valid substransaction, for which + * the assignment was not yet written to any WAL record. + */ +bool +IsSubTransactionAssignmentPending(void) +{ + if (!XLogLogicalInfoActive()) + return false; + + /* we need to be in a transaction state */ + if (!IsTransactionState()) + return false; + + /* it has to be a subtransaction */ + if (!IsSubTransaction()) + return false; + + /* the subtransaction has to have a XID assigned */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return false; + + /* and it needs to have 'assigned' */ + return !CurrentTransactionState->assigned; + +} + +/* + * MarkSubTransactionAssigned + * + * Mark the subtransaction assignment as completed. + */ +void +MarkSubTransactionAssigned(void) +{ + Assert(IsSubTransactionAssignmentPending()); + + CurrentTransactionState->assigned = true; + +} diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index aa9dca0036..a8a8084713 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -88,11 +88,13 @@ static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) +#define SizeOfTransactionId (sizeof(TransactionId) + sizeof(char)) #define HEADER_SCRATCH_SIZE \ (SizeOfXLogRecord + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ - SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin) + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ + SizeOfTransactionId) /* * An array of XLogRecData structs, to hold registered data. @@ -194,6 +196,10 @@ XLogResetInsertion(void) { int i; + /* reset the subxact assignment flag (if needed) */ + if (curinsert_flags & XLOG_INCLUDE_XID) + MarkSubTransactionAssigned(); + for (i = 0; i < max_registered_block_id; i++) registered_buffers[i].in_use = false; @@ -397,7 +403,7 @@ void XLogSetRecordFlags(uint8 flags) { Assert(begininsert_called); - curinsert_flags = flags; + curinsert_flags |= flags; } /* @@ -743,6 +749,18 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, scratch += sizeof(replorigin_session_origin); } + /* followed by toplevel XID, if not already included in previous record */ + if (IsSubTransactionAssignmentPending()) + { + TransactionId xid = GetTopTransactionIdIfAny(); + + /* update the flag (later used by XLogInsertRecord) */ + curinsert_flags |= XLOG_INCLUDE_XID; + *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; + memcpy(scratch, &xid, sizeof(TransactionId)); + scratch += sizeof(TransactionId); + } + /* followed by main data, if any */ if (mainrdata_len > 0) { diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 67418b05f1..4435c636bc 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1165,6 +1165,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) state->decoded_record = record; state->record_origin = InvalidRepOriginId; + state->toplevel_xid = InvalidTransactionId; ptr = (char *) record; ptr += SizeOfXLogRecord; @@ -1203,6 +1204,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) { COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); } + else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) + { + COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + } else if (block_id <= XLR_MAX_BLOCK_ID) { /* XLogRecordBlockHeader */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index bc532d027b..897b755eb4 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -93,12 +93,28 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) { - XLogRecordBuffer buf; + XLogRecordBuffer buf; + TransactionId txid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; buf.record = record; + txid = XLogRecGetTopXid(record); + + /* + * If the toplevel_xid is valid, we need to assign the subxact to the + * toplevel transaction. We need to do this for all records, hence we + * do it before the switch. + */ + if (TransactionIdIsValid(txid)) + { + ReorderBufferAssignChild(ctx->reorder, + record->toplevel_xid, + record->decoded_record->xl_xid, + buf.origptr); + } + /* cast so we get a warning when new rmgrs are added */ switch ((RmgrIds) XLogRecGetRmid(record)) { @@ -217,12 +233,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * If the snapshot isn't yet fully built, we cannot decode anything, so * bail out. * - * However, it's critical to process XLOG_XACT_ASSIGNMENT records even + * However, it's critical to process records with subxid assignment even * when the snapshot is being built: it is possible to get later records * that require subxids to be properly assigned. */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT && - info != XLOG_XACT_ASSIGNMENT) + !TransactionIdIsValid(r->toplevel_xid)) return; switch (info) @@ -264,22 +280,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_ASSIGNMENT: - { - xl_xact_assignment *xlrec; - int i; - TransactionId *sub_xid; - - xlrec = (xl_xact_assignment *) XLogRecGetData(r); - - sub_xid = &xlrec->xsub[0]; - - for (i = 0; i < xlrec->nsubxacts; i++) - { - ReorderBufferAssignChild(reorder, xlrec->xtop, - *(sub_xid++), buf->origptr); - } - break; - } + break; case XLOG_XACT_PREPARE: /* diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 9d2899dea1..5b9740c5c3 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg); extern void RegisterSubXactCallback(SubXactCallback callback, void *arg); extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); +extern bool IsSubTransactionAssignmentPending(void); +extern void MarkSubTransactionAssigned(void); + extern int xactGetCommittedChildren(TransactionId **ptr); extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 3fea1993bc..b1976ac653 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -227,6 +227,7 @@ extern bool XLOG_DEBUG; */ #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ +#define XLOG_INCLUDE_XID 0x04 /* include XID of toplevel xact */ /* Checkpoint statistics */ diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 0193611b7f..a676151561 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -147,6 +147,8 @@ struct XLogReaderState RepOriginId record_origin; + TransactionId toplevel_xid; /* XID of toplevel transaction */ + /* information about blocks referenced by the record. */ DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; @@ -280,6 +282,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) #define XLogRecGetOrigin(decoder) ((decoder)->record_origin) +#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) #define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index 9375e54195..bcfba0a101 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong #define XLR_BLOCK_ID_DATA_SHORT 255 #define XLR_BLOCK_ID_DATA_LONG 254 #define XLR_BLOCK_ID_ORIGIN 253 +#define XLR_BLOCK_ID_TOPLEVEL_XID 252 #endif /* XLOGRECORD_H */ -- 2.21.0