From 7b2a948ef7ca6fa43f94ccf11f11f8edfb3fe028 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 18 Nov 2019 16:26:33 +0530 Subject: [PATCH 02/17] Issue individual invalidations with wal_level=logical When wal_level=logical, write individual invalidations into WAL so that decoding can use this information. We still add the invalidations to the cache, and write them to WAL at commit time in RecordTransactionCommit(). This uses the existing XLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resource manager (see LogStandbyInvalidations for details). So existing code relying on those invalidations (e.g. redo) does not need to be changed. The individual invalidations are written are written using a new xlog record type XLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource manager. See LogLogicalInvalidations for details. These new xlog records are ignored by existing redo procedures, which still rely on the invalidations written to commit records. The invalidations are decoded and added as a new ReorderBufferChange type (REORDER_BUFFER_CHANGE_INVALIDATION), and then executed during replay, unlike the existing invalidations (which are either decoded as part of commit record, or executed immediately during decoding and not added to reorderbuffer at all). LogStandbyInvalidations was accumulating all the invalidations in memory, and then only wrote them once at commit time, which may reduce the performance impact by amortizing the overhead and deduplicating the invalidations. The new invalidations are written to WAL immediately, without any such caching. Perhaps it would be possible to add similar caching, e.g. at the command level, or something like that? --- src/backend/access/rmgrdesc/xactdesc.c | 52 ++++++++++++++ src/backend/access/transam/xact.c | 7 ++ src/backend/replication/logical/decode.c | 23 +++++++ .../replication/logical/reorderbuffer.c | 56 +++++++++++++-- src/backend/utils/cache/inval.c | 69 +++++++++++++++++++ src/include/access/xact.h | 18 ++++- src/include/replication/reorderbuffer.h | 14 ++++ 7 files changed, 231 insertions(+), 8 deletions(-) diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 4c411c5322..6cfd6af24e 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -20,6 +20,11 @@ #include "storage/standbydefs.h" #include "utils/timestamp.h" +static void xact_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs, + Oid dbId, Oid tsId, + bool relcacheInitFileInval); + /* * Parse the WAL format of an xact commit and abort records into an easier to * understand format. @@ -397,6 +402,14 @@ xact_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "xtop %u: ", xlrec->xtop); xact_desc_assignment(buf, xlrec); } + else if (info == XLOG_XACT_INVALIDATIONS) + { + xl_xact_invalidations *xlrec = (xl_xact_invalidations *) rec; + + xact_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs, + xlrec->dbId, xlrec->tsId, + xlrec->relcacheInitFileInval); + } } const char * @@ -424,7 +437,46 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; + case XLOG_XACT_INVALIDATIONS: + id = "INVALIDATION"; + break; } return id; } + +static void +xact_desc_invalidations(StringInfo buf, + int nmsgs, SharedInvalidationMessage *msgs, + Oid dbId, Oid tsId, + bool relcacheInitFileInval) +{ + int i; + + if (relcacheInitFileInval) + appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", + dbId, tsId); + + appendStringInfoString(buf, "; inval msgs:"); + for (i = 0; i < nmsgs; i++) + { + SharedInvalidationMessage *msg = &msgs[i]; + + if (msg->id >= 0) + appendStringInfo(buf, " catcache %d", msg->id); + else if (msg->id == SHAREDINVALCATALOG_ID) + appendStringInfo(buf, " catalog %u", msg->cat.catId); + else if (msg->id == SHAREDINVALRELCACHE_ID) + appendStringInfo(buf, " relcache %u", msg->rc.relId); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALSMGR_ID) + appendStringInfoString(buf, " smgr"); + /* not expected, but print something anyway */ + else if (msg->id == SHAREDINVALRELMAP_ID) + appendStringInfo(buf, " relmap db %u", msg->rm.dbId); + else if (msg->id == SHAREDINVALSNAPSHOT_ID) + appendStringInfo(buf, " snapshot %u", msg->sn.relId); + else + appendStringInfo(buf, " unrecognized id %d", msg->id); + } +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 708e5233f4..da15556357 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -6001,6 +6001,13 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_INVALIDATIONS) + { + /* + * XXX we do ignore this for now, what matters are invalidations + * written into the commit record. + */ + } else elog(PANIC, "xact_redo: unknown op code %u", info); } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 897b755eb4..9bcefb6e6d 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -281,6 +281,29 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } case XLOG_XACT_ASSIGNMENT: break; + case XLOG_XACT_INVALIDATIONS: + { + TransactionId xid; + xl_xact_invalidations *invals; + + xid = XLogRecGetXid(r); + invals = (xl_xact_invalidations *) XLogRecGetData(r); + + /* XXX for now we're issuing invalidations one by one */ + Assert(invals->nmsgs == 1); + + if (!TransactionIdIsValid(xid)) + break; + + ReorderBufferAddInvalidation(reorder, xid, buf->origptr, + invals->dbId, invals->tsId, + invals->relcacheInitFileInval, + invals->msgs[0]); + + + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + } + break; case XLOG_XACT_PREPARE: /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 53affeb877..b1feff3e71 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -464,6 +464,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INVALIDATION: break; } @@ -1804,17 +1805,23 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, TeardownHistoricSnapshot(false); SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); - - /* - * Every time the CommandId is incremented, we could - * see new catalog contents, so execute all - * invalidations. - */ - ReorderBufferExecuteInvalidations(rb, txn); } break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + + /* + * Execute the invalidation message locally. + * + * XXX Do we need to care about relcacheInitFileInval and + * the other fields added to ReorderBufferChange, or just + * about the message itself? + */ + LocalExecuteInvalidationMessage(&change->data.inval.msg); + txn->is_schema_sent = false; + break; + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; @@ -2207,6 +2214,38 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, txn->ntuplecids++; } +/* + * Setup the invalidation of the toplevel transaction. + */ +void +ReorderBufferAddInvalidation(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, + Oid dbId, Oid tsId, bool relcacheInitFileInval, + SharedInvalidationMessage msg) +{ + MemoryContext oldcontext; + ReorderBufferChange *change; + + /* XXX Should we even write invalidations without valid XID? */ + if (xid == InvalidTransactionId) + return; + + Assert(xid != InvalidTransactionId); + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.dbId = dbId; + change->data.inval.tsId = tsId; + change->data.inval.relcacheInitFileInval = relcacheInitFileInval; + change->data.inval.msg = msg; + + ReorderBufferQueueChange(rb, xid, lsn, change); + + MemoryContextSwitchTo(oldcontext); +} + /* * Setup the invalidation of the toplevel transaction. * @@ -2656,6 +2695,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INVALIDATION: /* ReorderBufferChange contains everything important */ break; } @@ -2752,6 +2792,7 @@ ReorderBufferChangeSize(ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INVALIDATION: /* ReorderBufferChange contains everything important */ break; } @@ -3027,6 +3068,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_INVALIDATION: break; } diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index f09e3a9aff..0682c55b51 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -104,6 +104,7 @@ #include "catalog/pg_constraint.h" #include "miscadmin.h" #include "storage/sinval.h" +#include "storage/standby.h" #include "storage/smgr.h" #include "utils/catcache.h" #include "utils/inval.h" @@ -210,6 +211,9 @@ static struct RELCACHECALLBACK static int relcache_callback_count = 0; +static void LogLogicalInvalidations(int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInitFileInval); + /* ---------------------------------------------------------------- * Invalidation list support functions * @@ -489,6 +493,18 @@ RegisterCatcacheInvalidation(int cacheId, { AddCatcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs, cacheId, hashValue, dbId); + + /* Issue an invalidation WAL record (when wal_level=logical) */ + if (XLogLogicalInfoActive()) + { + SharedInvalidationMessage msg; + + msg.cc.id = (int8) cacheId; + msg.cc.dbId = dbId; + msg.cc.hashValue = hashValue; + + LogLogicalInvalidations(1, &msg, false); + } } /* @@ -501,6 +517,18 @@ RegisterCatalogInvalidation(Oid dbId, Oid catId) { AddCatalogInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs, dbId, catId); + + /* Issue an invalidation WAL record (when wal_level=logical) */ + if (XLogLogicalInfoActive()) + { + SharedInvalidationMessage msg; + + msg.cat.id = SHAREDINVALCATALOG_ID; + msg.cat.dbId = dbId; + msg.cat.catId = catId; + + LogLogicalInvalidations(1, &msg, false); + } } /* @@ -511,6 +539,8 @@ RegisterCatalogInvalidation(Oid dbId, Oid catId) static void RegisterRelcacheInvalidation(Oid dbId, Oid relId) { + bool RelcacheInitFileInval = false; + AddRelcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs, dbId, relId); @@ -529,7 +559,22 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId) * as well. Also zap when we are invalidating whole relcache. */ if (relId == InvalidOid || RelationIdIsInInitFile(relId)) + { transInvalInfo->RelcacheInitFileInval = true; + RelcacheInitFileInval = true; + } + + /* Issue an invalidation WAL record (when wal_level=logical) */ + if (XLogLogicalInfoActive()) + { + SharedInvalidationMessage msg; + + msg.rc.id = SHAREDINVALRELCACHE_ID; + msg.rc.dbId = dbId; + msg.rc.relId = relId; + + LogLogicalInvalidations(1, &msg, RelcacheInitFileInval); + } } /* @@ -1501,3 +1546,27 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue) i = ccitem->link - 1; } } + +/* + * Emit WAL for invalidations. + */ +static void +LogLogicalInvalidations(int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInitFileInval) +{ + xl_xact_invalidations xlrec; + + /* prepare record */ + memset(&xlrec, 0, sizeof(xlrec)); + xlrec.dbId = MyDatabaseId; + xlrec.tsId = MyDatabaseTableSpace; + xlrec.relcacheInitFileInval = relcacheInitFileInval; + xlrec.nmsgs = nmsgs; + + /* perform insertion */ + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvalidations); + XLogRegisterData((char *) msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS); +} diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 5b9740c5c3..82d49428c2 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_INVALIDATIONS 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ @@ -197,6 +197,22 @@ typedef struct xl_xact_assignment #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) +/* + * Invalidations logged with wal_level=logical. + * + * XXX Currently nmsgs=1 but that might change in the future. + */ +typedef struct xl_xact_invalidations +{ + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ + bool relcacheInitFileInval; /* invalidate relcache init file */ + int nmsgs; /* number of shared inval msgs */ + SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER]; +} xl_xact_invalidations; + +#define MinSizeOfXactInvalidations offsetof(xl_xact_invalidations, msgs) + /* * Commit and abort records can contain a lot of information. But a large * portion of the records won't need all possible pieces of information. So we diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0867ee9e63..6a7187bbec 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -57,6 +57,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_MESSAGE, + REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, @@ -149,6 +150,16 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; + + /* Invalidation. */ + struct + { + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ + bool relcacheInitFileInval; /* invalidate relcache init + * file */ + SharedInvalidationMessage msg; /* invalidation message */ + } inval; } data; /* @@ -448,6 +459,9 @@ void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr ls void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid); +void ReorderBufferAddInvalidation(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + Oid dbId, Oid tsId, bool relcacheInitFileInval, + SharedInvalidationMessage msg); void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, -- 2.21.0