diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index b22563b..fd7c323 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -846,6 +846,69 @@ brin_summarize_new_values(PG_FUNCTION_ARGS) } /* + * SQL-callable interface to mark a range as no longer summarized + */ +Datum +brin_desummarize_range(PG_FUNCTION_ARGS) +{ + Oid indexoid = PG_GETARG_OID(0); + BlockNumber heapBlk = PG_GETARG_INT64(1); + Oid heapoid; + Relation heapRel; + Relation indexRel; + bool done; + + /* + * We must lock table before index to avoid deadlocks. However, if the + * passed indexoid isn't an index then IndexGetRelation() will fail. + * Rather than emitting a not-very-helpful error message, postpone + * complaining, expecting that the is-it-an-index test below will fail. + */ + heapoid = IndexGetRelation(indexoid, true); + if (OidIsValid(heapoid)) + heapRel = heap_open(heapoid, ShareUpdateExclusiveLock); + else + heapRel = NULL; + + indexRel = index_open(indexoid, ShareUpdateExclusiveLock); + + /* Must be a BRIN index */ + if (indexRel->rd_rel->relkind != RELKIND_INDEX || + indexRel->rd_rel->relam != BRIN_AM_OID) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a BRIN index", + RelationGetRelationName(indexRel)))); + + /* User must own the index (comparable to privileges needed for VACUUM) */ + if (!pg_class_ownercheck(indexoid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + RelationGetRelationName(indexRel)); + + /* + * Since we did the IndexGetRelation call above without any lock, it's + * barely possible that a race against an index drop/recreation could have + * netted us the wrong table. Recheck. + */ + if (heapRel == NULL || heapoid != IndexGetRelation(indexoid, false)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("could not open parent table of index %s", + RelationGetRelationName(indexRel)))); + + /* the revmap does the hard work */ + do { + done = brinRevmapDesummarizeRange(indexRel, heapBlk); + } + while (!done); + + relation_close(indexRel, ShareUpdateExclusiveLock); + relation_close(heapRel, ShareUpdateExclusiveLock); + + PG_RETURN_VOID(); +} + +/* * Build a BrinDesc used to create or scan a BRIN index */ BrinDesc * diff --git a/src/backend/access/brin/brin_revmap.c b/src/backend/access/brin/brin_revmap.c index 0de6999..00744df 100644 --- a/src/backend/access/brin/brin_revmap.c +++ b/src/backend/access/brin/brin_revmap.c @@ -168,9 +168,12 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange, iptr = (ItemPointerData *) contents->rm_tids; iptr += HEAPBLK_TO_REVMAP_INDEX(pagesPerRange, heapBlk); - ItemPointerSet(iptr, - ItemPointerGetBlockNumber(&tid), - ItemPointerGetOffsetNumber(&tid)); + if (ItemPointerIsValid(&tid)) + ItemPointerSet(iptr, + ItemPointerGetBlockNumber(&tid), + ItemPointerGetOffsetNumber(&tid)); + else + ItemPointerSetInvalid(iptr); } /* @@ -301,6 +304,138 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk, } /* + * Delete an index tuple, marking a page range as unsummarized. + * + * Index must be locked in ShareUpdateExclusiveLock mode. + * + * Return FALSE if caller should retry. + */ +bool +brinRevmapDesummarizeRange(Relation idxrel, BlockNumber heapBlk) +{ + BrinRevmap *revmap; + BlockNumber pagesPerRange; + RevmapContents *contents; + ItemPointerData *iptr; + ItemPointerData invalidIptr; + BlockNumber revmapBlk; + Buffer revmapBuf; + Buffer regBuf; + Page revmapPg; + Page regPg; + OffsetNumber revmapOffset; + OffsetNumber regOffset; + ItemId lp; + BrinTuple *tup; + + revmap = brinRevmapInitialize(idxrel, &pagesPerRange, NULL); + + revmapBlk = revmap_get_blkno(revmap, heapBlk); + if (!BlockNumberIsValid(revmapBlk)) + { + /* revmap page doesn't exist: range not summarized, we're done */ + brinRevmapTerminate(revmap); + return true; + } + + revmapBuf = revmap_get_buffer(revmap, heapBlk); + revmapOffset = HEAPBLK_TO_REVMAP_INDEX(revmap->rm_pagesPerRange, heapBlk); + + /* Lock the revmap page, obtain the index tuple pointer from it */ + brinLockRevmapPageForUpdate(revmap, heapBlk); + revmapPg = BufferGetPage(revmapBuf); + + contents = (RevmapContents *) PageGetContents(revmapPg); + iptr = contents->rm_tids; + iptr += revmapOffset; + + if (!ItemPointerIsValid(iptr)) + { + /* no index tuple: range not summarized, we're done */ + brinRevmapTerminate(revmap); + return true; + } + + regBuf = ReadBuffer(idxrel, ItemPointerGetBlockNumber(iptr)); + LockBuffer(regBuf, BUFFER_LOCK_EXCLUSIVE); + regPg = BufferGetPage(regBuf); + + /* if this is no longer a regular page, tell caller to start over */ + if (!BRIN_IS_REGULAR_PAGE(regPg)) + { + LockBuffer(revmapBuf, BUFFER_LOCK_UNLOCK); + LockBuffer(regBuf, BUFFER_LOCK_UNLOCK); + brinRevmapTerminate(revmap); + return false; + } + + regOffset = ItemPointerGetOffsetNumber(iptr); + if (regOffset > PageGetMaxOffsetNumber(regPg)) + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("corrupted BRIN index: inconsistent range map"))); + + lp = PageGetItemId(regPg, regOffset); + if (!ItemIdIsUsed(lp)) + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("corrupted BRIN index: inconsistent range map"))); + tup = (BrinTuple *) PageGetItem(regPg, lp); + /* XXX apply sanity checks? Might as well delete a bogus tuple ... */ + + /* + * We're only removing data, not reading it, so there's no need to + * TestForOldSnapshot here. + */ + + /* + * Because of SUE lock, this function shouldn't run concurrently with + * summarization. Placeholder tuples can only exist as leftovers from + * crashed summarization, so if we detect any, we complain but proceed. + */ + if (BrinTupleIsPlaceholder(tup)) + ereport(WARNING, + (errmsg("leftover placeholder tuple detected in BRIN index \"%s\", deleting", + RelationGetRelationName(idxrel)))); + + START_CRIT_SECTION(); + + ItemPointerSetInvalid(&invalidIptr); + brinSetHeapBlockItemptr(revmapBuf, revmap->rm_pagesPerRange, heapBlk, + invalidIptr); + PageIndexTupleDeleteNoCompact(regPg, regOffset); + /* XXX record free space in FSM? */ + + MarkBufferDirty(regBuf); + MarkBufferDirty(revmapBuf); + + if (RelationNeedsWAL(idxrel)) + { + xl_brin_desummarize xlrec; + XLogRecPtr recptr; + + xlrec.heapBlk = heapBlk; + xlrec.regOffset = regOffset; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfBrinDesummarize); + XLogRegisterBuffer(0, revmapBuf, 0); + XLogRegisterBuffer(1, regBuf, REGBUF_STANDARD); + recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_DESUMMARIZE); + PageSetLSN(revmapPg, recptr); + PageSetLSN(regPg, recptr); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(regBuf); + LockBuffer(revmapBuf, BUFFER_LOCK_UNLOCK); + brinRevmapTerminate(revmap); + + return true; +} + +/* * Given a heap block number, find the corresponding physical revmap block * number and return it. If the revmap page hasn't been allocated yet, return * InvalidBlockNumber. diff --git a/src/backend/access/brin/brin_xlog.c b/src/backend/access/brin/brin_xlog.c index f416bac..8f5b5ce 100644 --- a/src/backend/access/brin/brin_xlog.c +++ b/src/backend/access/brin/brin_xlog.c @@ -254,6 +254,46 @@ brin_xlog_revmap_extend(XLogReaderState *record) UnlockReleaseBuffer(metabuf); } +static void +brin_xlog_desummarize_page(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_brin_desummarize *xlrec; + Buffer buffer; + XLogRedoAction action; + + xlrec = (xl_brin_desummarize *) XLogRecGetData(record); + + /* Update the revmap */ + action = XLogReadBufferForRedo(record, 0, &buffer); + if (action == BLK_NEEDS_REDO) + { + ItemPointerData iptr; + + ItemPointerSetInvalid(&iptr); + brinSetHeapBlockItemptr(buffer, xlrec->pagesPerRange, xlrec->heapBlk, iptr); + + PageSetLSN(BufferGetPage(buffer), lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* remove the leftover entry from the regular page */ + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) + { + Page regPg = BufferGetPage(buffer); + + PageIndexTupleDeleteNoCompact(regPg, xlrec->regOffset); + + PageSetLSN(regPg, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + void brin_redo(XLogReaderState *record) { @@ -276,6 +316,9 @@ brin_redo(XLogReaderState *record) case XLOG_BRIN_REVMAP_EXTEND: brin_xlog_revmap_extend(record); break; + case XLOG_BRIN_DESUMMARIZE: + brin_xlog_desummarize_page(record); + break; default: elog(PANIC, "brin_redo: unknown op code %u", info); } diff --git a/src/backend/access/rmgrdesc/brindesc.c b/src/backend/access/rmgrdesc/brindesc.c index b58cb5b..8eb5275 100644 --- a/src/backend/access/rmgrdesc/brindesc.c +++ b/src/backend/access/rmgrdesc/brindesc.c @@ -61,6 +61,13 @@ brin_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "targetBlk %u", xlrec->targetBlk); } + else if (info == XLOG_BRIN_DESUMMARIZE) + { + xl_brin_desummarize *xlrec = (xl_brin_desummarize *) rec; + + appendStringInfo(buf, "pagesPerRange %u, heapBlk %u, page offset %u", + xlrec->pagesPerRange, xlrec->heapBlk, xlrec->regOffset); + } } const char * @@ -91,6 +98,9 @@ brin_identify(uint8 info) case XLOG_BRIN_REVMAP_EXTEND: id = "REVMAP_EXTEND"; break; + case XLOG_BRIN_DESUMMARIZE: + id = "DESUMMARIZE"; + break; } return id; diff --git a/src/include/access/brin_revmap.h b/src/include/access/brin_revmap.h index 2ec4169..7fdcf87 100644 --- a/src/include/access/brin_revmap.h +++ b/src/include/access/brin_revmap.h @@ -36,5 +36,6 @@ extern void brinSetHeapBlockItemptr(Buffer rmbuf, BlockNumber pagesPerRange, extern BrinTuple *brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk, Buffer *buf, OffsetNumber *off, Size *size, int mode, Snapshot snapshot); +extern bool brinRevmapDesummarizeRange(Relation idxrel, BlockNumber heapBlk); #endif /* BRIN_REVMAP_H */ diff --git a/src/include/access/brin_xlog.h b/src/include/access/brin_xlog.h index 33ceb34..89ed334 100644 --- a/src/include/access/brin_xlog.h +++ b/src/include/access/brin_xlog.h @@ -33,7 +33,7 @@ #define XLOG_BRIN_UPDATE 0x20 #define XLOG_BRIN_SAMEPAGE_UPDATE 0x30 #define XLOG_BRIN_REVMAP_EXTEND 0x40 -#define XLOG_BRIN_REVMAP_VACUUM 0x50 +#define XLOG_BRIN_DESUMMARIZE 0x50 #define XLOG_BRIN_OPMASK 0x70 /* @@ -124,6 +124,24 @@ typedef struct xl_brin_revmap_extend #define SizeOfBrinRevmapExtend (offsetof(xl_brin_revmap_extend, targetBlk) + \ sizeof(BlockNumber)) +/* + * This is what we need to know about a range de-summarization + * + * Backup block 0: revmap page + * Backup block 1: regular page + */ +typedef struct xl_brin_desummarize +{ + BlockNumber pagesPerRange; + /* page number location to set to invalid */ + OffsetNumber heapBlk; + /* offset of item to delete in regular index page */ + OffsetNumber regOffset; +} xl_brin_desummarize; + +#define SizeOfBrinDesummarize (offsetof(xl_brin_desummarize, regOffset) + \ + sizeof(OffsetNumber)) + extern void brin_redo(XLogReaderState *record); extern void brin_desc(StringInfo buf, XLogReaderState *record); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index a4cc86d..e35dae5 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -564,6 +564,8 @@ DATA(insert OID = 335 ( brinhandler PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 DESCR("brin index access method handler"); DATA(insert OID = 3952 ( brin_summarize_new_values PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 23 "2205" _null_ _null_ _null_ _null_ _null_ brin_summarize_new_values _null_ _null_ _null_ )); DESCR("brin: standalone scan new table pages"); +DATA(insert OID = 4014 ( brin_desummarize_range PGNSP PGUID 12 1 0 0 0 f f f f t f v s 2 0 2278 "2205 20" _null_ _null_ _null_ _null_ _null_ brin_desummarize_range _null_ _null_ _null_ )); +DESCR("brin: desummarize page range"); DATA(insert OID = 338 ( amvalidate PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ amvalidate _null_ _null_ _null_ )); DESCR("validate an operator class");