diff --git a/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql b/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql index 51d25fc4c63..c24164c480c 100644 --- a/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql +++ b/src/test/modules/xid_wraparound/xid_wraparound--1.0.sql @@ -10,3 +10,7 @@ AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION consume_xids_until(targetxid xid8) RETURNS xid8 IMMUTABLE PARALLEL SAFE STRICT AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION consume_mxids(nmxids bigint) +RETURNS xid8 IMMUTABLE PARALLEL SAFE STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/xid_wraparound/xid_wraparound.c b/src/test/modules/xid_wraparound/xid_wraparound.c index dce81c0c6d6..935a770a683 100644 --- a/src/test/modules/xid_wraparound/xid_wraparound.c +++ b/src/test/modules/xid_wraparound/xid_wraparound.c @@ -14,6 +14,7 @@ */ #include "postgres.h" +#include "access/multixact.h" #include "access/xact.h" #include "miscadmin.h" #include "storage/proc.h" @@ -24,6 +25,8 @@ PG_MODULE_MAGIC; static int64 consume_xids_shortcut(void); static FullTransactionId consume_xids_common(FullTransactionId untilxid, uint64 nxids); +static MultiXactId consume_multixids_common(uint64 nmxids); + /* * Consume the specified number of XIDs. */ @@ -151,6 +154,7 @@ consume_xids_common(FullTransactionId untilxid, uint64 nxids) } return lastxid; +#undef REPORT_INTERVAL } /* @@ -217,3 +221,89 @@ consume_xids_shortcut(void) return consumed; } + +/* + * Consume the specified number of multitransaction IDs. + */ +PG_FUNCTION_INFO_V1(consume_mxids); +Datum +consume_mxids(PG_FUNCTION_ARGS) +{ + int64 nmxids = PG_GETARG_INT64(0); + MultiXactId lastmxid; + + if (nmxids < 0) + elog(ERROR, "invalid nmxids argument: %lld", (long long) nmxids); + + if (nmxids == 0) + lastmxid = ReadNextMultiXactId(); + else + lastmxid = consume_multixids_common((uint64) nmxids); + + PG_RETURN_TRANSACTIONID(lastmxid); +} + + +/* + * Common functionality between the two public functions. XXX + */ +static MultiXactId +consume_multixids_common(uint64 nmxids) +{ + MultiXactId lastmxid; + uint64 last_reported_at = 0; + uint64 consumed = 0; + MultiXactMember member; + TransactionId xids[256]; + + /* Print a NOTICE every REPORT_INTERVAL xids */ +#define REPORT_INTERVAL (10 * 1000000 / 10) + + /* initialize 'lastmxid' with the system's current next XID */ + lastmxid = ReadNextMultiXactId(); + + xids[0] = GetTopTransactionId(); + for (int i = 1; i < Min(256, nmxids); i++) + { + xids[i] = XidFromFullTransactionId(GetNewTransactionId(true)); + } + + for (;;) + { + //uint64 mxids_left; + + CHECK_FOR_INTERRUPTS(); + + /* How many XIDs do we have left to consume? */ + if (nmxids > 0) + { + if (consumed >= nmxids) + break; + //mxids_left = nmxids - consumed; + } + + /* (no fast path) */ + + /* Slow path: Call GetNewTransactionId to allocate a new XID. */ + + member = (MultiXactMember) { + .xid = xids[consumed % 256], + .status = ((consumed / 256) % 2) ? MultiXactStatusForUpdate : MultiXactStatusForKeyShare, + }; + + lastmxid = MultiXactIdCreateFromMembers(1, &member); + consumed++; + + /* Report progress */ + if (consumed - last_reported_at >= REPORT_INTERVAL) + { + elog(NOTICE, "consumed %llu / %llu XIDs, latest %u", + (unsigned long long) consumed, (unsigned long long) nmxids, + lastmxid); + last_reported_at = consumed; + } + } + + return lastmxid; +#undef REPORT_INTERVAL +}