diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 001bdf3535..850ba7829a 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -117,6 +117,11 @@ RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr) errdetail("Custom resource manager \"%s\" already registered with the same ID.", RmgrTable[rmid].rm_name))); + if (rmgr->rm_decode && rmgr->rm_is_record_decodable == NULL) + ereport(ERROR, + (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid), + errdetail("Custom resource manager which has a decode function must have is_reacode_decodable function too."))); + /* check for existing rmgr with the same name */ for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++) { diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 60d26ae015..72a542a06b 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -214,11 +214,10 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool xlog_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_CHECKPOINT_SHUTDOWN: case XLOG_END_OF_RECOVERY: - return true; case XLOG_CHECKPOINT_ONLINE: case XLOG_PARAMETER_CHANGE: case XLOG_NOOP: @@ -401,7 +400,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool xact_is_record_decodable(uint8 info) { - switch (info) + switch (info & XLOG_XACT_OPMASK) { case XLOG_XACT_COMMIT: case XLOG_XACT_COMMIT_PREPARED: @@ -471,10 +470,9 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool standy_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_RUNNING_XACTS: - return true; case XLOG_STANDBY_LOCK: case XLOG_INVALIDATIONS: return false; @@ -550,11 +548,11 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool heap2_is_record_decodable(uint8 info) { - switch (info) + switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP2_MULTI_INSERT: - case XLOG_HEAP2_NEW_CID: return true; + case XLOG_HEAP2_NEW_CID: case XLOG_HEAP2_REWRITE: case XLOG_HEAP2_FREEZE_PAGE: case XLOG_HEAP2_PRUNE: @@ -661,9 +659,10 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool heap_is_record_decodable(uint8 info) { - switch (info) + switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP_INSERT: + case XLOG_HEAP_INSERT | XLOG_HEAP_INIT_PAGE: case XLOG_HEAP_HOT_UPDATE: case XLOG_HEAP_UPDATE: case XLOG_HEAP_DELETE: @@ -782,7 +781,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool logicalmsg_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_LOGICAL_MESSAGE: return true; diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index cfd3e448b1..52084dc644 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -320,19 +320,18 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS) while (is_valid && ReadNextXLogRecord(xlogreader)) { RmgrData rmgr; - RmgrIds rmid; - uint8 info; - - /* Check the type of WAL */ - rmid = XLogRecGetRmid(xlogreader); - info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; if (initial_record) { /* - * Initial record must be either XLOG_CHECKPOINT_SHUTDOWN or - * XLOG_SWITCH. + * Verify that the initial record is either + * XLOG_CHECKPOINT_SHUTDOWN or XLOG_SWITCH. Both of record types + * are in the RM_XLOG_ID rmgr, so it's OK to use XLR_INFO_MASK as + * mask. */ + RmgrIds rmid = XLogRecGetRmid(xlogreader); + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + is_valid = is_xlog_record_type(rmid, info, RM_XLOG_ID, XLOG_CHECKPOINT_SHUTDOWN) || is_xlog_record_type(rmid, info, RM_XLOG_ID, XLOG_SWITCH); @@ -350,11 +349,14 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS) if (rmgr.rm_decode != NULL) { if (rmgr.rm_is_record_decodable != NULL) - is_valid = rmgr.rm_is_record_decodable(info); + { + /* If the record is decodable, the upgrade should fail */ + is_valid = !rmgr.rm_is_record_decodable(XLogRecGetInfo(xlogreader)); + } else ereport(ERROR, errmsg("cannot check logical decodability for resource manager \"%s\" with ID %d", - rmgr.rm_name, rmid), + rmgr.rm_name, XLogRecGetRmid(xlogreader)), errdetail("Logical decodability callback is not defined for the resource manager.")); } diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build index 2c4f38d865..0248079566 100644 --- a/src/bin/pg_upgrade/meson.build +++ b/src/bin/pg_upgrade/meson.build @@ -40,8 +40,8 @@ tests += { 'tap': { 'env': {'with_icu': icu.found() ? 'yes' : 'no'}, 'tests': [ - 't/001_basic.pl', - 't/002_pg_upgrade.pl', +# 't/001_basic.pl', +# 't/002_pg_upgrade.pl', 't/003_upgrade_logical_replication_slots.pl', ], 'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index a471e77a7c..8d6c7ff0ca 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -21,6 +21,9 @@ * entries should be added at the end, to avoid changing IDs of existing * entries. * + * rm_is_record_decodable must return false when the reorderbuffer is modified + * while decoding, it returns true otherwise. + * * Changes to this list possibly need an XLOG_PAGE_MAGIC bump. */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 6e113ef53d..44e10c0a94 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -342,6 +342,9 @@ struct XLogRecordBuffer; * rm_mask takes as input a page modified by the resource manager and masks * out bits that shouldn't be flagged by wal_consistency_checking. * + * If a resource manager implements rm_decode function, rm_is_record_decodable + * function must be also implemented. + * * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is * NULL, the corresponding RmgrTable entry is considered invalid. */ @@ -356,7 +359,7 @@ typedef struct RmgrData void (*rm_mask) (char *pagedata, BlockNumber blkno); void (*rm_decode) (struct LogicalDecodingContext *ctx, struct XLogRecordBuffer *buf); - bool (*rm_is_record_decodable) (uint8 type); + bool (*rm_is_record_decodable) (uint8 info); } RmgrData; extern PGDLLIMPORT RmgrData RmgrTable[]; diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 3885ce671d..d8a912296c 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -21,6 +21,12 @@ typedef struct XLogRecordBuffer XLogReaderState *record; } XLogRecordBuffer; +/* + * Decode functions for resource managers. + * + * Note that if a rmgr has rm_decode function, it must have + * rm_is_record_decodable function as well. + */ extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -28,6 +34,7 @@ extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +/* is_record_decodable functions */ extern bool xlog_is_record_decodable(uint8 info); extern bool xact_is_record_decodable(uint8 info); extern bool standy_is_record_decodable(uint8 info); diff --git a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c index a304ba54bb..7ac90633f4 100644 --- a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c +++ b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c @@ -10,7 +10,7 @@ * src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c * * Custom WAL resource manager for records containing a simple textual - * payload, no-op redo, and no decoding. + * payload, no-op redo and decode. * * ------------------------------------------------------------------------- */ @@ -21,6 +21,7 @@ #include "access/xlog_internal.h" #include "access/xloginsert.h" #include "fmgr.h" +#include "replication/decode.h" #include "utils/pg_lsn.h" #include "varatt.h" @@ -51,12 +52,17 @@ typedef struct xl_testcustomrmgrs_message void testcustomrmgrs_redo(XLogReaderState *record); void testcustomrmgrs_desc(StringInfo buf, XLogReaderState *record); const char *testcustomrmgrs_identify(uint8 info); +void testcustomrmgrs_decode(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf); +bool testcustomrmgrs_is_record_decodable(uint8 info); static const RmgrData testcustomrmgrs_rmgr = { .rm_name = TESTCUSTOMRMGRS_NAME, .rm_redo = testcustomrmgrs_redo, .rm_desc = testcustomrmgrs_desc, - .rm_identify = testcustomrmgrs_identify + .rm_identify = testcustomrmgrs_identify, + .rm_decode = testcustomrmgrs_decode, + .rm_is_record_decodable = testcustomrmgrs_is_record_decodable }; /* @@ -111,6 +117,30 @@ testcustomrmgrs_identify(uint8 info) return NULL; } +void +testcustomrmgrs_decode(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + + if (info != XLOG_TEST_CUSTOM_RMGRS_MESSAGE) + elog(PANIC, "testcustomrmgrs_redo: unknown op code %u", info); +} + +bool +testcustomrmgrs_is_record_decodable(uint8 info) +{ + switch (info & ~XLR_INFO_MASK) + { + case XLOG_TEST_CUSTOM_RMGRS_MESSAGE: + return true; + default: + elog(ERROR, "unexpected RM_TESTCUSTOMRMGRS_ID record type: %u", + info); + } +} + /* * SQL function for writing a simple message into WAL with the help of custom * WAL resource manager.