diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 001bdf3535..850ba7829a 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -117,6 +117,11 @@ RegisterCustomRmgr(RmgrId rmid, const RmgrData *rmgr) errdetail("Custom resource manager \"%s\" already registered with the same ID.", RmgrTable[rmid].rm_name))); + if (rmgr->rm_decode && rmgr->rm_is_record_decodable == NULL) + ereport(ERROR, + (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid), + errdetail("Custom resource manager which has a decode function must have is_reacode_decodable function too."))); + /* check for existing rmgr with the same name */ for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++) { diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 60d26ae015..2e97962e60 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -214,7 +214,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool xlog_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_CHECKPOINT_SHUTDOWN: case XLOG_END_OF_RECOVERY: @@ -401,7 +401,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool xact_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_XACT_COMMIT: case XLOG_XACT_COMMIT_PREPARED: @@ -471,7 +471,7 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool standy_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_RUNNING_XACTS: return true; @@ -550,7 +550,7 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool heap2_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_HEAP2_MULTI_INSERT: case XLOG_HEAP2_NEW_CID: @@ -661,7 +661,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool heap_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_HEAP_INSERT: case XLOG_HEAP_HOT_UPDATE: @@ -782,7 +782,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) bool logicalmsg_is_record_decodable(uint8 info) { - switch (info) + switch (info & ~XLR_INFO_MASK) { case XLOG_LOGICAL_MESSAGE: return true; diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index cfd3e448b1..f19cb68d92 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -320,15 +320,13 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS) while (is_valid && ReadNextXLogRecord(xlogreader)) { RmgrData rmgr; - RmgrIds rmid; - uint8 info; - - /* Check the type of WAL */ - rmid = XLogRecGetRmid(xlogreader); - info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; if (initial_record) { + /* Check the type of WAL */ + RmgrIds rmid = XLogRecGetRmid(xlogreader); + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + /* * Initial record must be either XLOG_CHECKPOINT_SHUTDOWN or * XLOG_SWITCH. @@ -350,11 +348,11 @@ binary_upgrade_validate_wal_logical_end(PG_FUNCTION_ARGS) if (rmgr.rm_decode != NULL) { if (rmgr.rm_is_record_decodable != NULL) - is_valid = rmgr.rm_is_record_decodable(info); + is_valid = rmgr.rm_is_record_decodable(XLogRecGetInfo(xlogreader)); else ereport(ERROR, errmsg("cannot check logical decodability for resource manager \"%s\" with ID %d", - rmgr.rm_name, rmid), + rmgr.rm_name, XLogRecGetRmid(xlogreader)), errdetail("Logical decodability callback is not defined for the resource manager.")); } diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 6e113ef53d..44e10c0a94 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -342,6 +342,9 @@ struct XLogRecordBuffer; * rm_mask takes as input a page modified by the resource manager and masks * out bits that shouldn't be flagged by wal_consistency_checking. * + * If a resource manager implements rm_decode function, rm_is_record_decodable + * function must be also implemented. + * * RmgrTable[] is indexed by RmgrId values (see rmgrlist.h). If rm_name is * NULL, the corresponding RmgrTable entry is considered invalid. */ @@ -356,7 +359,7 @@ typedef struct RmgrData void (*rm_mask) (char *pagedata, BlockNumber blkno); void (*rm_decode) (struct LogicalDecodingContext *ctx, struct XLogRecordBuffer *buf); - bool (*rm_is_record_decodable) (uint8 type); + bool (*rm_is_record_decodable) (uint8 info); } RmgrData; extern PGDLLIMPORT RmgrData RmgrTable[]; diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 3885ce671d..d8a912296c 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -21,6 +21,12 @@ typedef struct XLogRecordBuffer XLogReaderState *record; } XLogRecordBuffer; +/* + * Decode functions for resource managers. + * + * Note that if a rmgr has rm_decode function, it must have + * rm_is_record_decodable function as well. + */ extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -28,6 +34,7 @@ extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +/* is_record_decodable functions */ extern bool xlog_is_record_decodable(uint8 info); extern bool xact_is_record_decodable(uint8 info); extern bool standy_is_record_decodable(uint8 info); diff --git a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c index a304ba54bb..7ac90633f4 100644 --- a/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c +++ b/src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c @@ -10,7 +10,7 @@ * src/test/modules/test_custom_rmgrs/test_custom_rmgrs.c * * Custom WAL resource manager for records containing a simple textual - * payload, no-op redo, and no decoding. + * payload, no-op redo and decode. * * ------------------------------------------------------------------------- */ @@ -21,6 +21,7 @@ #include "access/xlog_internal.h" #include "access/xloginsert.h" #include "fmgr.h" +#include "replication/decode.h" #include "utils/pg_lsn.h" #include "varatt.h" @@ -51,12 +52,17 @@ typedef struct xl_testcustomrmgrs_message void testcustomrmgrs_redo(XLogReaderState *record); void testcustomrmgrs_desc(StringInfo buf, XLogReaderState *record); const char *testcustomrmgrs_identify(uint8 info); +void testcustomrmgrs_decode(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf); +bool testcustomrmgrs_is_record_decodable(uint8 info); static const RmgrData testcustomrmgrs_rmgr = { .rm_name = TESTCUSTOMRMGRS_NAME, .rm_redo = testcustomrmgrs_redo, .rm_desc = testcustomrmgrs_desc, - .rm_identify = testcustomrmgrs_identify + .rm_identify = testcustomrmgrs_identify, + .rm_decode = testcustomrmgrs_decode, + .rm_is_record_decodable = testcustomrmgrs_is_record_decodable }; /* @@ -111,6 +117,30 @@ testcustomrmgrs_identify(uint8 info) return NULL; } +void +testcustomrmgrs_decode(struct LogicalDecodingContext *ctx, + struct XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + + if (info != XLOG_TEST_CUSTOM_RMGRS_MESSAGE) + elog(PANIC, "testcustomrmgrs_redo: unknown op code %u", info); +} + +bool +testcustomrmgrs_is_record_decodable(uint8 info) +{ + switch (info & ~XLR_INFO_MASK) + { + case XLOG_TEST_CUSTOM_RMGRS_MESSAGE: + return true; + default: + elog(ERROR, "unexpected RM_TESTCUSTOMRMGRS_ID record type: %u", + info); + } +} + /* * SQL function for writing a simple message into WAL with the help of custom * WAL resource manager.