From 5f439f6047df8ae9a94b27b5a94c73699ea15cfb Mon Sep 17 00:00:00 2001 From: Haoyu Huang Date: Wed, 20 May 2026 21:37:55 +0000 Subject: [PATCH] Online resize of the shared buffer pool This patch adds pg_resize_shared_buffers(), a SQL-callable coordinator that resizes the shared buffer pool while the server is running. The work is composed of two halves: 1. Buffer-manager infrastructure: a two-water-mark scheme (lowNBuffers/highNBuffers) protected by AccessNBuffersLock, a new PROCSIGNAL_BARRIER_SHBUF_RESIZE barrier, the BEGIN/END_NBUFFERS_ACCESS macros for safe iteration over the buffer array, and three new primitives -- BufferManagerShmemShrink/Expand/InitBuffers -- that either madvise(MADV_REMOVE) memory away or madvise(MADV_POPULATE_WRITE) it back in. These changes live across buf_init.c, bufmgr.c, freelist.c and their headers. 2. The coordinator (buf_resize.c) implements the SQL-callable function pg_resize_shared_buffers(text) returning a (key, value, unit) record set of timing/byte metrics. Shrink lowers lowNBuffers, runs the PROCSIGNAL barrier, evicts the doomed range, then frees memory and advances highNBuffers. Expand allocates and initializes new descriptors before atomically advancing both water marks under AccessNBuffersLock. Two new GUCs (defined in guc_parameters.dat): * max_shared_buffers (PGC_POSTMASTER): upper bound on highNBuffers, sized once at startup. NBuffersGUC backs the existing shared_buffers GUC and captures the starting pool size. * enable_dynamic_shared_buffers (PGC_POSTMASTER): off by default; when off, all of the new code paths are no-ops and the server behaves as before. Patch rebased onto upstream master from the v18-based development branch. --- contrib/pg_buffercache/pg_buffercache_pages.c | 158 +++-- contrib/pg_prewarm/autoprewarm.c | 6 +- src/backend/access/hash/hash.c | 2 +- src/backend/access/heap/heapam.c | 2 +- src/backend/access/table/tableam.c | 2 +- src/backend/access/transam/slru.c | 2 +- src/backend/access/transam/xlog.c | 11 +- src/backend/postmaster/checkpointer.c | 84 ++- src/backend/storage/aio/aio_init.c | 2 +- src/backend/storage/buffer/Makefile | 2 + src/backend/storage/buffer/buf_init.c | 457 +++++++++++++-- src/backend/storage/buffer/buf_resize.c | 544 ++++++++++++++++++ src/backend/storage/buffer/bufmgr.c | 210 ++++++- .../storage/buffer/dynamic_shared_buffers.c | 125 ++++ src/backend/storage/buffer/freelist.c | 71 ++- src/backend/storage/buffer/meson.build | 2 + src/backend/storage/ipc/procsignal.c | 17 + .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/init/globals.c | 4 +- src/backend/utils/misc/guc.c | 8 + src/backend/utils/misc/guc_parameters.dat | 20 +- src/include/catalog/pg_proc.dat | 9 + src/include/miscadmin.h | 18 +- src/include/storage/buf_internals.h | 5 +- src/include/storage/bufmgr.h | 13 +- src/include/storage/dynamic_shared_buffers.h | 103 ++++ src/include/storage/ipc.h | 41 ++ src/include/storage/lwlocklist.h | 1 + src/include/storage/procsignal.h | 2 + src/test/regress/expected/sysviews.out | 2 +- src/test/regress/sql/sysviews.sql | 2 +- src/tools/pgindent/typedefs.list | 1 + 32 files changed, 1763 insertions(+), 164 deletions(-) create mode 100644 src/backend/storage/buffer/buf_resize.c create mode 100644 src/backend/storage/buffer/dynamic_shared_buffers.c create mode 100644 src/include/storage/dynamic_shared_buffers.h diff --git a/contrib/pg_buffercache/pg_buffercache_pages.c b/contrib/pg_buffercache/pg_buffercache_pages.c index bf2e6c97220..905f14e0a04 100644 --- a/contrib/pg_buffercache/pg_buffercache_pages.c +++ b/contrib/pg_buffercache/pg_buffercache_pages.c @@ -15,6 +15,7 @@ #include "port/pg_numa.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" +#include "storage/ipc.h" #include "utils/rel.h" #include "utils/tuplestore.h" @@ -88,16 +89,84 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) TupleDesc expected_tupledesc; int i; - /* - * To smoothly support upgrades from version 1.0 of this extension - * transparently handle the (non-)existence of the pinning_backends - * column. We unfortunately have to get the result type for that... - we - * can't use the result type determined by the function definition without - * potentially crashing when somebody uses the old (or even wrong) - * function definition though. - */ - if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); + if (SRF_IS_FIRSTCALL()) + { + int i; + BEGIN_NBUFFERS_ACCESS(localNBuffers); + + funcctx = SRF_FIRSTCALL_INIT(); + + /* Switch context when allocating stuff to be used in later calls */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* Create a user function context for cross-call persistence */ + fctx = (BufferCachePagesContext *) palloc(sizeof(BufferCachePagesContext)); + + /* + * To smoothly support upgrades from version 1.0 of this extension + * transparently handle the (non-)existence of the pinning_backends + * column. We unfortunately have to get the result type for that... - + * we can't use the result type determined by the function definition + * without potentially crashing when somebody uses the old (or even + * wrong) function definition though. + */ + if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + if (expected_tupledesc->natts < NUM_BUFFERCACHE_PAGES_MIN_ELEM || + expected_tupledesc->natts > NUM_BUFFERCACHE_PAGES_ELEM) + elog(ERROR, "incorrect number of output arguments"); + + /* Construct a tuple descriptor for the result rows. */ + tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts); + TupleDescInitEntry(tupledesc, (AttrNumber) 1, "bufferid", + INT4OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 2, "relfilenode", + OIDOID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 3, "reltablespace", + OIDOID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 4, "reldatabase", + OIDOID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 5, "relforknumber", + INT2OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 6, "relblocknumber", + INT8OID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 7, "isdirty", + BOOLOID, -1, 0); + TupleDescInitEntry(tupledesc, (AttrNumber) 8, "usage_count", + INT2OID, -1, 0); + + if (expected_tupledesc->natts == NUM_BUFFERCACHE_PAGES_ELEM) + TupleDescInitEntry(tupledesc, (AttrNumber) 9, "pinning_backends", + INT4OID, -1, 0); + + fctx->tupdesc = BlessTupleDesc(tupledesc); + + + /* Allocate NBuffers worth of BufferCachePagesRec records. */ + fctx->record = (BufferCachePagesRec *) + MemoryContextAllocHuge(CurrentMemoryContext, + sizeof(BufferCachePagesRec) * localNBuffers); + + /* Set max calls and remember the user function context. */ + funcctx->max_calls = localNBuffers; + funcctx->user_fctx = fctx; + + /* Return to original context when allocating transient memory */ + MemoryContextSwitchTo(oldcontext); + + /* + * Scan through all the buffers, saving the relevant fields in the + * fctx->record structure. + * + * We don't hold the partition locks, so we don't get a consistent + * snapshot across all buffers, but we do grab the buffer header + * locks, so the information of each buffer is self-consistent. + */ + for (i = 0; i < localNBuffers; i++) + { + BufferDesc *bufHdr; + uint32 buf_state; if (expected_tupledesc->natts < NUM_BUFFERCACHE_PAGES_MIN_ELEM || expected_tupledesc->natts > NUM_BUFFERCACHE_PAGES_ELEM) @@ -132,18 +201,10 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) CHECK_FOR_INTERRUPTS(); - bufHdr = GetBufferDescriptor(i); - /* Lock each buffer header before inspecting. */ - buf_state = LockBufHdr(bufHdr); - - bufferid = BufferDescriptorGetBuffer(bufHdr); - relfilenumber = BufTagGetRelNumber(&bufHdr->tag); - reltablespace = bufHdr->tag.spcOid; - reldatabase = bufHdr->tag.dbOid; - forknum = BufTagGetForkNum(&bufHdr->tag); - blocknum = bufHdr->tag.blockNum; - usagecount = BUF_STATE_GET_USAGECOUNT(buf_state); - pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state); + UnlockBufHdr(bufHdr, buf_state); + } + END_NBUFFERS_ACCESS(localNBuffers); + } if (buf_state & BM_DIRTY) isdirty = true; @@ -248,6 +309,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) int max_entries; char *startptr, *endptr; + BEGIN_NBUFFERS_ACCESS(localNBuffers); /* If NUMA information is requested, initialize NUMA support. */ if (include_numa && pg_numa_init() == -1) @@ -278,7 +340,24 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) */ Assert((os_page_size % BLCKSZ == 0) || (BLCKSZ % os_page_size == 0)); - if (include_numa) + /* + * How many addresses we are going to query? Simply get the page for + * the first buffer, and first page after the last buffer, and count + * the pages from that. + */ + startptr = (char *) TYPEALIGN_DOWN(os_page_size, + BufferGetBlock(1)); + endptr = (char *) TYPEALIGN(os_page_size, + (char *) BufferGetBlock(localNBuffers) + BLCKSZ); + os_page_count = (endptr - startptr) / os_page_size; + + /* Used to determine the NUMA node for all OS pages at once */ + os_page_ptrs = palloc0(sizeof(void *) * os_page_count); + os_page_status = palloc(sizeof(int) * os_page_count); + + /* Fill pointers for all the memory pages. */ + idx = 0; + for (char *ptr = startptr; ptr < endptr; ptr += os_page_size) { void **os_page_ptrs = NULL; @@ -315,8 +394,8 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) Assert(idx == os_page_count); - elog(DEBUG1, "NUMA: NBuffers=%d os_page_count=" UINT64_FORMAT " " - "os_page_size=%zu", NBuffers, os_page_count, os_page_size); + elog(DEBUG1, "NUMA: NBuffers=%d os_page_count=" UINT64_FORMAT " " + "os_page_size=%zu", localNBuffers, os_page_count, os_page_size); /* * If we ever get 0xff back from kernel inquiry, then we probably @@ -366,7 +445,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) * without reallocating memory. */ pages_per_buffer = Max(1, BLCKSZ / os_page_size) + 1; - max_entries = NBuffers * pages_per_buffer; + max_entries = localNBuffers * pages_per_buffer; /* Allocate entries for BufferCacheOsPagesRec records. */ fctx->record = (BufferCacheOsPagesRec *) @@ -386,10 +465,14 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) * We don't hold the partition locks, so we don't get a consistent * snapshot across all buffers, but we do grab the buffer header * locks, so the information of each buffer is self-consistent. + * + * This loop touches and stores addresses into os_page_ptrs[] as input + * to one big move_pages(2) inquiry system call. Basically we ask for + * all memory pages for localNBuffers. */ startptr = (char *) TYPEALIGN_DOWN(os_page_size, (char *) BufferGetBlock(1)); idx = 0; - for (i = 0; i < NBuffers; i++) + for (i = 0; i < localNBuffers; i++) { char *buffptr = (char *) BufferGetBlock(i + 1); BufferDesc *bufHdr; @@ -440,9 +523,10 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) funcctx->max_calls = idx; funcctx->user_fctx = fctx; - /* Remember this backend touched the pages (only relevant for NUMA) */ - if (include_numa) - firstNumaTouch = false; + /* Remember this backend touched the pages */ + firstNumaTouch = false; + + END_NBUFFERS_ACCESS(localNBuffers); } funcctx = SRF_PERCALL_SETUP(); @@ -531,11 +615,12 @@ pg_buffercache_summary(PG_FUNCTION_ARGS) int32 buffers_dirty = 0; int32 buffers_pinned = 0; int64 usagecount_total = 0; + BEGIN_NBUFFERS_ACCESS(localNBuffers); if (get_call_result_type(fcinfo, NULL, &tupledesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); - for (int i = 0; i < NBuffers; i++) + for (int i = 0; i < localNBuffers; i++) { BufferDesc *bufHdr; uint64 buf_state; @@ -565,6 +650,7 @@ pg_buffercache_summary(PG_FUNCTION_ARGS) if (BUF_STATE_GET_REFCOUNT(buf_state) > 0) buffers_pinned++; } + END_NBUFFERS_ACCESS(localNBuffers); memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(buffers_used); @@ -593,10 +679,11 @@ pg_buffercache_usage_counts(PG_FUNCTION_ARGS) int pinned[BM_MAX_USAGE_COUNT + 1] = {0}; Datum values[NUM_BUFFERCACHE_USAGE_COUNTS_ELEM]; bool nulls[NUM_BUFFERCACHE_USAGE_COUNTS_ELEM] = {0}; + BEGIN_NBUFFERS_ACCESS(localNBuffers); InitMaterializedSRF(fcinfo, 0); - for (int i = 0; i < NBuffers; i++) + for (int i = 0; i < localNBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); uint64 buf_state = pg_atomic_read_u64(&bufHdr->state); @@ -613,6 +700,7 @@ pg_buffercache_usage_counts(PG_FUNCTION_ARGS) if (BUF_STATE_GET_REFCOUNT(buf_state) > 0) pinned[usage_count]++; } + END_NBUFFERS_ACCESS(localNBuffers); for (int i = 0; i < BM_MAX_USAGE_COUNT + 1; i++) { @@ -654,13 +742,15 @@ pg_buffercache_evict(PG_FUNCTION_ARGS) Buffer buf = PG_GETARG_INT32(0); bool buffer_flushed; + BEGIN_NBUFFERS_ACCESS(localNBuffers); + (void) localNBuffers; if (get_call_result_type(fcinfo, NULL, &tupledesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); pg_buffercache_superuser_check("pg_buffercache_evict"); - if (buf < 1 || buf > NBuffers) + if (buf < 1 || buf > GetLowNBuffers()) elog(ERROR, "bad buffer ID: %d", buf); values[0] = BoolGetDatum(EvictUnpinnedBuffer(buf, &buffer_flushed)); @@ -669,6 +759,8 @@ pg_buffercache_evict(PG_FUNCTION_ARGS) tuple = heap_form_tuple(tupledesc, values, nulls); result = HeapTupleGetDatum(tuple); + END_NBUFFERS_ACCESS(localNBuffers); + PG_RETURN_DATUM(result); } diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index ba0bc8e6d4a..b620c053dc6 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -672,6 +672,7 @@ apw_dump_now(bool is_bgworker, bool dump_unlogged) FILE *file; char transient_dump_file_path[MAXPGPATH]; pid_t pid; + BEGIN_NBUFFERS_ACCESS(localNBuffers); LWLockAcquire(&apw_state->lock, LW_EXCLUSIVE); pid = apw_state->pid_using_dumpfile; @@ -700,9 +701,9 @@ apw_dump_now(bool is_bgworker, bool dump_unlogged) * memory-efficient data structure.) */ block_info_array = (BlockInfoRecord *) - palloc_extended((sizeof(BlockInfoRecord) * NBuffers), MCXT_ALLOC_HUGE); + palloc_extended((sizeof(BlockInfoRecord) * localNBuffers), MCXT_ALLOC_HUGE); - for (num_blocks = 0, i = 0; i < NBuffers; i++) + for (num_blocks = 0, i = 0; i < localNBuffers; i++) { uint64 buf_state; @@ -733,6 +734,7 @@ apw_dump_now(bool is_bgworker, bool dump_unlogged) UnlockBufHdr(bufHdr); } + END_NBUFFERS_ACCESS(localNBuffers); snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE); file = AllocateFile(transient_dump_file_path, "w"); diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index 8d8cd30dc38..e43eca2bce1 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -176,7 +176,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo) */ sort_threshold = (maintenance_work_mem * (Size) 1024) / BLCKSZ; if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP) - sort_threshold = Min(sort_threshold, NBuffers); + sort_threshold = Min(sort_threshold, GetHighNBuffers()); else sort_threshold = Min(sort_threshold, NLocBuffer); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index abfd8e8970a..4d27d2b1817 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -394,7 +394,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * if you change this, consider changing that one, too. */ if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) && - scan->rs_nblocks > NBuffers / 4) + scan->rs_nblocks > GetHighNBuffers() / 4) { allow_strat = (scan->rs_base.rs_flags & SO_ALLOW_STRAT) != 0; allow_sync = (scan->rs_base.rs_flags & SO_ALLOW_SYNC) != 0; diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 68ff0966f1c..6afa9176174 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -420,7 +420,7 @@ table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) /* compare phs_syncscan initialization to similar logic in initscan */ bpscan->base.phs_syncscan = synchronize_seqscans && !RelationUsesLocalBuffers(rel) && - bpscan->phs_nblocks > NBuffers / 4; + bpscan->phs_nblocks > GetHighNBuffers() / 4; SpinLockInit(&bpscan->phs_mutex); bpscan->phs_startblock = InvalidBlockNumber; bpscan->phs_numblock = InvalidBlockNumber; diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index 47dd52d6749..a1fc6a4f7a0 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -236,7 +236,7 @@ SimpleLruAutotuneBuffers(int divisor, int max) { return Min(max - (max % SLRU_BANK_SIZE), Max(SLRU_BANK_SIZE, - NBuffers / divisor - (NBuffers / divisor) % SLRU_BANK_SIZE)); + GetMaxNBuffers() / divisor - (GetMaxNBuffers() / divisor) % SLRU_BANK_SIZE)); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index d34e34a56c5..cc67b27a780 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5026,7 +5026,10 @@ XLOGChooseNumBuffers(void) { int xbuffers; - xbuffers = NBuffers / 32; + /* + * Use the maximum buffer pool size. + */ + xbuffers = GetMaxNBuffers() / 32; if (xbuffers > (wal_segment_size / XLOG_BLCKSZ)) xbuffers = (wal_segment_size / XLOG_BLCKSZ); if (xbuffers < 8) @@ -7242,7 +7245,7 @@ LogCheckpointEnd(bool restartpoint, int flags) "estimate=%d kB; lsn=%X/%08X, redo lsn=%X/%08X", CheckpointFlagsString(flags), CheckpointStats.ckpt_bufs_written, - (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers, + (double) CheckpointStats.ckpt_bufs_written * 100 / GetHighNBuffers(), CheckpointStats.ckpt_slru_written, CheckpointStats.ckpt_segs_added, CheckpointStats.ckpt_segs_removed, @@ -7267,7 +7270,7 @@ LogCheckpointEnd(bool restartpoint, int flags) "estimate=%d kB; lsn=%X/%08X, redo lsn=%X/%08X", CheckpointFlagsString(flags), CheckpointStats.ckpt_bufs_written, - (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers, + (double) CheckpointStats.ckpt_bufs_written * 100 / GetHighNBuffers(), CheckpointStats.ckpt_slru_written, CheckpointStats.ckpt_segs_added, CheckpointStats.ckpt_segs_removed, @@ -7885,7 +7888,7 @@ CreateCheckPoint(int flags) update_checkpoint_display(flags, false, true); TRACE_POSTGRESQL_CHECKPOINT_DONE(CheckpointStats.ckpt_bufs_written, - NBuffers, + GetHighNBuffers(), CheckpointStats.ckpt_segs_added, CheckpointStats.ckpt_segs_removed, CheckpointStats.ckpt_segs_recycled); diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 087120db090..bbbc6c2f95d 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -161,6 +161,15 @@ const ShmemCallbacks CheckpointerShmemCallbacks = { /* Max number of requests the checkpointer request queue can hold */ #define MAX_CHECKPOINT_REQUESTS 10000000 +/* + * Queue size used under dynamic_shared_buffers. Local-file fsyncs are + * bypassed in ForwardSyncRequest under DSB (Neon's durability is the WAL + * stream), so the queue does not need to scale with the buffer pool. But + * we still need a real queue so SYNC_UNLINK_REQUEST is not silently + * dropped. + */ +#define DSB_CHECKPOINT_REQUESTS 4096 + /* * GUC parameters */ @@ -967,19 +976,29 @@ CheckpointerShmemRequest(void *arg) { Size size; + size = offsetof(CheckpointerShmemStruct, requests); + /* - * The size of the requests[] array is arbitrarily set equal to NBuffers. - * But there is a cap of MAX_CHECKPOINT_REQUESTS to prevent accumulating - * too many checkpoint requests in the ring buffer. + * The size of the requests[] array is arbitrarily set equal to the + * initial size of buffer pool. But there is a cap of + * MAX_CHECKPOINT_REQUESTS to prevent accumulating too many checkpoint + * requests in the ring buffer. + * + * Under dynamic_shared_buffers we use a small fixed cap instead -- + * sizing the queue on MaxNBuffers would waste a lot of shmem under + * auto-scale, but a real (non-zero) queue is still required so that + * SYNC_UNLINK_REQUEST can be forwarded to the checkpointer for delayed + * unlink processing. */ - size = offsetof(CheckpointerShmemStruct, requests); - size = add_size(size, mul_size(Min(NBuffers, - MAX_CHECKPOINT_REQUESTS), - sizeof(CheckpointerRequest))); - ShmemRequestStruct(.name = "Checkpointer Data", - .size = size, - .ptr = (void **) &CheckpointerShmem, - ); + if (enable_dynamic_shared_buffers) + size = add_size(size, mul_size(DSB_CHECKPOINT_REQUESTS, + sizeof(CheckpointerRequest))); + else + size = add_size(size, mul_size(Min(NBuffersGUC, + MAX_CHECKPOINT_REQUESTS), + sizeof(CheckpointerRequest))); + + return size; } /* @@ -1010,27 +1029,21 @@ ExecCheckpoint(ParseState *pstate, CheckPointStmt *stmt) foreach_ptr(DefElem, opt, stmt->options) { - if (strcmp(opt->defname, "mode") == 0) - { - char *mode = defGetString(opt); - - if (strcmp(mode, "spread") == 0) - fast = false; - else if (strcmp(mode, "fast") != 0) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unrecognized value for %s option \"%s\": \"%s\"", - "CHECKPOINT", "mode", mode), - parser_errposition(pstate, opt->location))); - } - else if (strcmp(opt->defname, "flush_unlogged") == 0) - unlogged = defGetBoolean(opt); + /* + * First time through, so initialize. Note that we zero the whole + * requests array; this is so that CompactCheckpointerRequestQueue can + * assume that any pad bytes in the request structs are zeroes. + */ + MemSet(CheckpointerShmem, 0, size); + SpinLockInit(&CheckpointerShmem->ckpt_lck); + + if (enable_dynamic_shared_buffers) + CheckpointerShmem->max_requests = DSB_CHECKPOINT_REQUESTS; else - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unrecognized %s option \"%s\"", - "CHECKPOINT", opt->defname), - parser_errposition(pstate, opt->location))); + CheckpointerShmem->max_requests = Min(NBuffersGUC, + MAX_CHECKPOINT_REQUESTS); + ConditionVariableInit(&CheckpointerShmem->start_cv); + ConditionVariableInit(&CheckpointerShmem->done_cv); } if (!has_privs_of_role(GetUserId(), ROLE_PG_CHECKPOINT)) @@ -1228,6 +1241,15 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type) if (AmCheckpointerProcess()) elog(ERROR, "ForwardSyncRequest must not be called in checkpointer"); + /* + * Queue unlinks and let the checkpointer drain them. + * + * Neon durability is provided by the WAL stream. + * SYNC_FORGET_REQUEST/SYNC_FILTER_REQUEST/SYNC_REQUEST can be dropped. + */ + if (enable_dynamic_shared_buffers && type != SYNC_UNLINK_REQUEST) + return true; + LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE); /* diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c index da30d792a88..c6b4bf975e3 100644 --- a/src/backend/storage/aio/aio_init.c +++ b/src/backend/storage/aio/aio_init.c @@ -109,7 +109,7 @@ AioChooseMaxConcurrency(void) /* Similar logic to LimitAdditionalPins() */ max_backends = MaxBackends + NUM_AUXILIARY_PROCS; - max_proportional_pins = NBuffers / max_backends; + max_proportional_pins = GetMaxNBuffers() / max_backends; max_proportional_pins = Max(max_proportional_pins, 1); diff --git a/src/backend/storage/buffer/Makefile b/src/backend/storage/buffer/Makefile index fd7c40dcb08..2567f32e131 100644 --- a/src/backend/storage/buffer/Makefile +++ b/src/backend/storage/buffer/Makefile @@ -14,8 +14,10 @@ include $(top_builddir)/src/Makefile.global OBJS = \ buf_init.o \ + buf_resize.o \ buf_table.o \ bufmgr.o \ + dynamic_shared_buffers.o \ freelist.o \ localbuf.o diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index 1407c930c56..757c00d03d6 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -14,12 +14,18 @@ */ #include "postgres.h" +#include +#ifdef __linux__ +#include +#endif + +#include "miscadmin.h" #include "storage/aio.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" -#include "storage/proclist.h" +#include "storage/pg_shmem.h" #include "storage/shmem.h" -#include "storage/subsystems.h" +#include "utils/memdebug.h" BufferDescPadded *BufferDescriptors; char *BufferBlocks; @@ -69,6 +75,208 @@ const ShmemCallbacks BufferManagerShmemCallbacks = { * multiple times. Check the PrivateRefCount infrastructure in bufmgr.c. */ +/* + * Initialize a single buffer descriptor. + * + * Buffers are exclusively found via clock sweep (the freelist was removed + * in commit 2c789405275). This function is called both from + * BufferManagerShmemInit at boot and from BufferManagerShmemInitBuffers + * during an online expand. + */ +static void +InitializeBuffer(int buf_id) +{ + BufferDesc *buf = GetBufferDescriptor(buf_id); + + ClearBufferTag(&buf->tag); + pg_atomic_init_u32(&buf->state, 0); + buf->wait_backend_pgprocno = INVALID_PROC_NUMBER; + buf->buf_id = buf_id; + pgaio_wref_clear(&buf->io_wref); + + LWLockInitialize(BufferDescriptorGetContentLock(buf), + LWTRANCHE_BUFFER_CONTENT); + + ConditionVariableInit(BufferDescriptorGetIOCV(buf)); +} + +/* + * Page size used both to lay out the buffer-pool arrays in shared memory and + * to align the per-slice madvise() ranges issued during expand/shrink. + */ +static Size +buffer_pool_madvise_alignment(void) +{ +#ifdef __linux__ + if (huge_pages == HUGE_PAGES_ON) + { + Size hugepagesize = 0; + + GetHugePageSize(&hugepagesize, NULL); + if (hugepagesize > 0) + return hugepagesize; + /* Conservative fallback if /proc/meminfo lookup failed. */ + return (Size) 2 * 1024 * 1024; + } +#endif + return (Size) sysconf(_SC_PAGESIZE); +} + +/* + * Return the exact byte length of the expanded range [lowNBuffers, + * highNBuffers) that this call touches (memset / madvise), or 0 when a no-op + * or on platforms where the Linux path is not used. On + * madvise(MADV_POPULATE_WRITE) failure, *success is set to false and 0 is + * returned; the new range is not guaranteed to be backed by physical memory, + * so callers should stop expanding rather than continue and risk a SIGBUS on + * first touch. + */ +static Size +BufferPoolArrayPhysicalExpand(void *baseptr, Size elem_size, + int lowNBuffers, int highNBuffers, + bool *success) +{ +#ifdef __linux__ + char *base; + Size off; + Size len; + uintptr_t region_start; + uintptr_t region_end; + uintptr_t ms; + uintptr_t me; + Size os_page_size = buffer_pool_madvise_alignment(); +#endif + + if (baseptr == NULL || elem_size == 0 || highNBuffers <= lowNBuffers) + return 0; + +#ifdef __linux__ + base = (char *) baseptr; + Assert(os_page_size != 0); + + off = mul_size((Size) lowNBuffers, elem_size); + len = mul_size((Size) (highNBuffers - lowNBuffers), elem_size); + + region_start = (uintptr_t) (base + off); + region_end = region_start + len; + + ms = TYPEALIGN_DOWN((Size) os_page_size, region_start); + me = TYPEALIGN((Size) os_page_size, region_end); + +#ifdef USE_VALGRIND + VALGRIND_MAKE_MEM_DEFINED((void *) region_start, len); +#endif + +#if defined(MADV_HUGEPAGE) && defined(MADV_POPULATE_WRITE) +#ifdef USE_ASSERT_CHECKING + if (mprotect((void *) ms, me - ms, PROT_READ | PROT_WRITE) < 0 && errno != ENOMEM) + elog(WARNING, "mprotect(PROT_READ|PROT_WRITE) before buffer pool expand: %m"); +#endif + /* + * If huge pages is on, MADV_HUGEPAGE advice will fail. + */ + if (huge_pages_status != HUGE_PAGES_ON && + madvise((void *) ms, me - ms, MADV_HUGEPAGE) < 0) + elog(WARNING, "madvise(MADV_HUGEPAGE) on expanded buffer pool array: %m"); + + if (madvise((void *) ms, me - ms, MADV_POPULATE_WRITE) < 0) + { + elog(WARNING, "madvise(MADV_POPULATE_WRITE) on expanded buffer pool array: %m"); + *success = false; + return 0; + } +#else + /* + * No MADV_POPULATE_WRITE on this platform: memset is the only way to + * force population. memset can't return a failure, so this path always + * "succeeds"; if the underlying mapping is actually unbacked the SIGBUS + * will hit during the memset itself. + */ + memset((void *) region_start, 0, len); +#endif + + return len; +#else + return 0; /* no local physical work off Linux */ +#endif +} + +/* + * Return the exact byte length passed to a successful MADV_REMOVE, or 0 if no + * page-aligned run was freed (no-op case). On madvise() failure, *success is + * set to false and 0 is returned. + * + * The released slice is [lowNBuffers, highNBuffers); we trim physical storage + * for the entire inactive tail [lowNBuffers, MaxNBuffers) so that incremental + * shrinks don't strand page-aligned spans above highNBuffers (see comment + * inside). + */ +static Size +BufferPoolArrayPhysicalShrink(void *baseptr, Size elem_size, + int lowNBuffers, int highNBuffers, + bool *success) +{ +#ifdef __linux__ + char *base; + Size off; + Size tail_len; + Size logical_len; + uintptr_t region_start; + uintptr_t region_end; + uintptr_t ms; + uintptr_t me; + Size os_page_size = buffer_pool_madvise_alignment(); +#endif + + if (baseptr == NULL || elem_size == 0 || lowNBuffers >= highNBuffers) + return 0; + +#ifdef __linux__ + base = (char *) baseptr; + Assert(os_page_size != 0); + + off = mul_size((Size) lowNBuffers, elem_size); + /* See function header: tail spans up to MaxNBuffers, not highNBuffers. */ + tail_len = mul_size((Size) (GetMaxNBuffers() - lowNBuffers), elem_size); + logical_len = mul_size((Size) (highNBuffers - lowNBuffers), elem_size); + + region_start = (uintptr_t) (base + off); + region_end = region_start + tail_len; + + /* + * MADV_REMOVE requires a page-aligned address and a multiple of the page + * size for length. Only full pages wholly inside the released logical + * range can be trimmed. + */ + ms = TYPEALIGN((Size) os_page_size, region_start); + me = TYPEALIGN_DOWN((Size) os_page_size, region_end); + + if (ms >= me) + return 0; + + if (madvise((void *) ms, me - ms, MADV_REMOVE) < 0) + { + elog(WARNING, "madvise(MADV_REMOVE) on buffer pool array tail failed: %m"); + *success = false; + return 0; + } + +#ifdef USE_VALGRIND + VALGRIND_MAKE_MEM_NOACCESS((void *) ms, me - ms); +#endif +#ifdef USE_ASSERT_CHECKING + /* + * Catch stray reads/writes after shrink. + */ + if (mprotect((void *) ms, me - ms, PROT_NONE) < 0 && errno != ENOMEM) + elog(WARNING, "mprotect(PROT_NONE) on buffer pool array tail: %m"); +#endif + + return logical_len; +#else + return 0; /* no local physical work off Linux */ +#endif +} /* * Register shared memory area for the buffer pool. @@ -76,26 +284,52 @@ const ShmemCallbacks BufferManagerShmemCallbacks = { static void BufferManagerShmemRequest(void *arg) { - ShmemRequestStruct(.name = "Buffer Descriptors", - .size = NBuffers * sizeof(BufferDescPadded), - /* Align descriptors to a cacheline boundary. */ - .alignment = PG_CACHE_LINE_SIZE, - .ptr = (void **) &BufferDescriptors, - ); + bool foundBufs, + foundDescs, + foundIOCV, + foundBufCkpt; + int max_nbuffers; + Size os_page_size = buffer_pool_madvise_alignment(); + Assert(os_page_size != 0); + + if (enable_dynamic_shared_buffers) + { + if (MaxNBuffers == 0) + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("max_shared_buffers must be set when enable_dynamic_shared_buffers is on"), + errhint("Set max_shared_buffers to a value at least as large as shared_buffers."))); + if (MaxNBuffers < NBuffersGUC) + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("max_shared_buffers (%d) must be at least shared_buffers (%d) when enable_dynamic_shared_buffers is on", + MaxNBuffers, NBuffersGUC))); + } + + max_nbuffers = GetMaxNBuffers(); + + /* Align descriptors for madvise (same granularity as buffer blocks). */ + BufferDescriptors = (BufferDescPadded *) + TYPEALIGN(os_page_size, + ShmemInitStruct("Buffer Descriptors", + max_nbuffers * sizeof(BufferDescPadded) + 2 * os_page_size, + &foundDescs)); ShmemRequestStruct(.name = "Buffer Blocks", .size = NBuffers * (Size) BLCKSZ, /* Align buffer pool on IO page size boundary. */ - .alignment = PG_IO_ALIGN_SIZE, - .ptr = (void **) &BufferBlocks, - ); + BufferBlocks = (char *) + TYPEALIGN(os_page_size, + ShmemInitStruct("Buffer Blocks", + max_nbuffers * (Size) BLCKSZ + 2 * os_page_size, + &foundBufs)); - ShmemRequestStruct(.name = "Buffer IO Condition Variables", - .size = NBuffers * sizeof(ConditionVariableMinimallyPadded), - /* Align descriptors to a cacheline boundary. */ - .alignment = PG_CACHE_LINE_SIZE, - .ptr = (void **) &BufferIOCVArray, - ); + /* Align I/O condition variables for madvise. */ + BufferIOCVArray = (ConditionVariableMinimallyPadded *) + TYPEALIGN(os_page_size, + ShmemInitStruct("Buffer IO Condition Variables", + max_nbuffers * sizeof(ConditionVariableMinimallyPadded) + 2 * os_page_size, + &foundIOCV)); /* * The array used to sort to-be-checkpointed buffer ids is located in @@ -104,11 +338,11 @@ BufferManagerShmemRequest(void *arg) * the checkpointer is restarted, memory allocation failures would be * painful. */ - ShmemRequestStruct(.name = "Checkpoint BufferIds", - .size = NBuffers * sizeof(CkptSortItem), - .ptr = (void **) &CkptBufferIds, - ); -} + CkptBufferIds = (CkptSortItem *) + TYPEALIGN(os_page_size, + ShmemInitStruct("Checkpoint BufferIds", + max_nbuffers * sizeof(CkptSortItem) + 2 * os_page_size, + &foundBufCkpt)); /* * Initialize shared buffer pool @@ -124,30 +358,179 @@ BufferManagerShmemInit(void *arg) */ for (int i = 0; i < NBuffers; i++) { - BufferDesc *buf = GetBufferDescriptor(i); - - ClearBufferTag(&buf->tag); + int i; - pg_atomic_init_u64(&buf->state, 0); - buf->wait_backend_pgprocno = INVALID_PROC_NUMBER; + if (enable_dynamic_shared_buffers) + { + bool success = true; - buf->buf_id = i; + /* + * Request physical memory for NBuffersGUC. A madvise failure + * here means we cannot eagerly populate the initial buffer + * pool; rather than start with possibly-unbacked memory and + * SIGBUS on first access, we PANIC so the postmaster fails + * to start cleanly. + */ + BufferManagerShmemExpand(0, NBuffersGUC, &success); + if (!success) + elog(PANIC, "could not populate initial shared buffer pool: madvise(MADV_POPULATE_WRITE) failed"); + } - pgaio_wref_clear(&buf->io_wref); - - proclist_init(&buf->lock_waiters); - ConditionVariableInit(BufferDescriptorGetIOCV(buf)); + /* + * Initialize all the buffer headers for the active pool size. + * The clock sweep is the sole replacement mechanism, so there is + * no freelist to link them into. + */ + for (i = 0; i < NBuffersGUC; i++) + InitializeBuffer(i); } /* Initialize per-backend file flush context */ WritebackContextInit(&BackendWritebackContext, &backend_flush_after); + + /* + * Initialize the DSB water marks. DSBCtrl is NULL in special contexts + * such as the WAL redo process, where DSB is not used. + */ + if (DSBCtrl != NULL && !foundDescs) + { + pg_atomic_write_u32(&DSBCtrl->lowNBuffers, NBuffersGUC); + pg_atomic_write_u32(&DSBCtrl->highNBuffers, NBuffersGUC); + } } -static void -BufferManagerShmemAttach(void *arg) +/* + * BufferManagerShmemSize + * + * All buffer arrays are allocated in the single shared-memory heap. We size + * them to GetMaxNBuffers() so the pool fits its upper bound. + */ +Size +BufferManagerShmemSize(void) { - /* Initialize per-backend file flush context */ - WritebackContextInit(&BackendWritebackContext, - &backend_flush_after); + Size size = 0; + Size os_page_size = buffer_pool_madvise_alignment(); + int max_nbuffers = GetMaxNBuffers(); + Assert(os_page_size != 0); + + /* size of buffer descriptors, plus alignment padding for madvise */ + size = add_size(size, mul_size(max_nbuffers, sizeof(BufferDescPadded))); + size = add_size(size, mul_size(2, os_page_size)); + + /* size of data pages, plus alignment padding */ + size = add_size(size, mul_size(2, os_page_size)); + size = add_size(size, mul_size(max_nbuffers, (Size) BLCKSZ)); + + /* size of stuff controlled by freelist.c */ + size = add_size(size, StrategyShmemSize()); + + /* size of I/O condition variables, plus alignment padding for madvise */ + size = add_size(size, mul_size(max_nbuffers, + sizeof(ConditionVariableMinimallyPadded))); + size = add_size(size, mul_size(2, os_page_size)); + + /* size of checkpoint sort array in bufmgr.c, plus alignment padding */ + size = add_size(size, mul_size(max_nbuffers, sizeof(CkptSortItem))); + size = add_size(size, mul_size(2, os_page_size)); + + return size; +} + +/* + * Allocate backing memory pages from the OS for the buffer-pool slice + * [lowNBuffers, highNBuffers). + * + * Returns the sum, over the four buffer-pool arrays, of the exact byte length + * each BufferPoolArrayPhysicalExpand call touched. *success is set to true on + * success, or false if any madvise(MADV_POPULATE_WRITE) call failed. + */ +Size +BufferManagerShmemExpand(int lowNBuffers, int highNBuffers, bool *success) +{ + int max_nbuffers = GetMaxNBuffers(); + Size total = 0; + + if (highNBuffers > max_nbuffers) + elog(PANIC, "buffer pool expand exceeds allocation (low=%d high=%d allocated=%d)", + lowNBuffers, highNBuffers, max_nbuffers); + + Assert(lowNBuffers < highNBuffers); + + *success = true; + + total = add_size(total, BufferPoolArrayPhysicalExpand(BufferDescriptors, sizeof(BufferDescPadded), + lowNBuffers, highNBuffers, success)); + if (!*success) + return total; + + total = add_size(total, BufferPoolArrayPhysicalExpand(BufferBlocks, (Size) BLCKSZ, + lowNBuffers, highNBuffers, success)); + if (!*success) + return total; + + total = add_size(total, BufferPoolArrayPhysicalExpand(BufferIOCVArray, + sizeof(ConditionVariableMinimallyPadded), + lowNBuffers, highNBuffers, success)); + if (!*success) + return total; + + total = add_size(total, BufferPoolArrayPhysicalExpand(CkptBufferIds, sizeof(CkptSortItem), + lowNBuffers, highNBuffers, success)); + return total; +} + +void +BufferManagerShmemInitBuffers(int lowNBuffers, int highNBuffers) +{ + int i; + + /* Clock sweep will pick up the new buffers; nothing else to do. */ + for (i = lowNBuffers; i < highNBuffers; i++) + InitializeBuffer(i); +} + +/* + * Release the buffer-pool slice [lowNBuffers, highNBuffers) back to the OS + * when shrinking the pool. + * + * Returns the sum, over the four buffer-pool arrays, of the exact byte length + * each BufferPoolArrayPhysicalShrink call touched. *success is set to true on + * success, or false if any madvise() call failed; on failure we stop after the + * failing array (so total reflects only the arrays that were fully released). + */ +Size +BufferManagerShmemShrink(int lowNBuffers, int highNBuffers, bool *success) +{ + int max_nbuffers = GetMaxNBuffers(); + Size total = 0; + + if (highNBuffers > max_nbuffers) + elog(PANIC, "buffer pool shrink exceeds allocation (low=%d high=%d allocated=%d)", + lowNBuffers, highNBuffers, max_nbuffers); + + Assert(lowNBuffers < highNBuffers); + + *success = true; + + total = add_size(total, BufferPoolArrayPhysicalShrink(BufferDescriptors, sizeof(BufferDescPadded), + lowNBuffers, highNBuffers, success)); + if (!*success) + return total; + + total = add_size(total, BufferPoolArrayPhysicalShrink(BufferBlocks, (Size) BLCKSZ, + lowNBuffers, highNBuffers, success)); + if (!*success) + return total; + + total = add_size(total, BufferPoolArrayPhysicalShrink(BufferIOCVArray, + sizeof(ConditionVariableMinimallyPadded), + lowNBuffers, highNBuffers, success)); + if (!*success) + return total; + + total = add_size(total, BufferPoolArrayPhysicalShrink(CkptBufferIds, sizeof(CkptSortItem), + lowNBuffers, highNBuffers, success)); + + return total; } diff --git a/src/backend/storage/buffer/buf_resize.c b/src/backend/storage/buffer/buf_resize.c new file mode 100644 index 00000000000..afb5268f636 --- /dev/null +++ b/src/backend/storage/buffer/buf_resize.c @@ -0,0 +1,544 @@ +/*------------------------------------------------------------------------- + * + * buf_resize.c + * Online resize coordinator for the shared buffer pool. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/buffer/buf_resize.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include +#include + +#include "fmgr.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "portability/instr_time.h" +#include "postmaster/bgwriter.h" +#include "postmaster/interrupt.h" +#include "storage/buf_internals.h" +#include "storage/bufmgr.h" +#include "storage/dynamic_shared_buffers.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/pg_shmem.h" +#include "storage/pmsignal.h" +#include "storage/procsignal.h" +#include "storage/shmem.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/injection_point.h" + +PG_FUNCTION_INFO_V1(pg_resize_shared_buffers); + +/* + * `coordinator_active` tells the cleanup callback whether *this* backend + * currently holds the resize_in_progress flag. + * + * `cleanup_registered` ensures we only call before_shmem_exit() once per + * backend lifetime. + * + * `inflight_expand_target` is non-zero when DoExpand starts. The cleanup + * callback uses it to surface a WARNING if an expand was interrupted. + */ +static volatile bool coordinator_active = false; +static volatile bool cleanup_registered = false; +static volatile int inflight_expand_target = 0; + +/* + * Emit a (key, value, unit) tuple to the function's result set. If value_null + * is true, value and unit are emitted as NULL. + * + * Used to return tuples from the pg_resize_shared_buffers() function. The + * tupledesc of the returned rows must match the function's OUT arguments. + */ +static void +EmitResizeMetricRow(ReturnSetInfo *rsinfo, const char *key, double value, + const char *unit, bool value_null) +{ + Datum values[3]; + bool nulls[3]; + + values[0] = CStringGetTextDatum(key); + nulls[0] = false; + if (value_null) + { + nulls[1] = true; + values[1] = (Datum) 0; + nulls[2] = true; + values[2] = (Datum) 0; + } + else + { + nulls[1] = false; + nulls[2] = false; + values[1] = Float8GetDatum(value); + values[2] = CStringGetTextDatum(unit); + } + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); +} + +/* Rounded timing row; value is in seconds, unit is "seconds". */ +static void +EmitResizeTimeRow(ReturnSetInfo *rsinfo, const char *key, double elapsed_sec) +{ + double t = round(elapsed_sec * 100.0) / 100.0; + + EmitResizeMetricRow(rsinfo, key, t, "seconds", false); +} + +static void +EmitResizeBytesRow(ReturnSetInfo *rsinfo, const char *key, double bytes) +{ + EmitResizeMetricRow(rsinfo, key, bytes, "bytes", false); +} + +static void +SharedBufferResizeBarrier(ProcSignalBarrierType barrier, const char *barrier_name) +{ + WaitForProcSignalBarrier(EmitProcSignalBarrier(barrier)); + elog(LOG, "all backends acknowledged %s barrier", barrier_name); +} + +/* + * Parse a user-supplied size string (e.g. "256MB", "32768") into a number of + * shared buffers. Raises ERROR on invalid input or out-of-range size. + */ +static int +ParseNewSize(const char *new_size_str) +{ + const char *hintmsg = NULL; + int new_size; + + if (!parse_int(new_size_str, &new_size, GUC_UNIT_BLOCKS, &hintmsg)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for shared_buffers: \"%s\"", new_size_str), + hintmsg ? errhint("%s", _(hintmsg)) : 0)); + + if (new_size < MIN_SHARED_BUFFERS) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("shared_buffers must be at least %d, got %d", + MIN_SHARED_BUFFERS, new_size))); + + if (new_size > GetMaxNBuffers()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("shared_buffers (%d) cannot exceed max_shared_buffers (%d)", + new_size, GetMaxNBuffers()))); + + return new_size; +} + +/* + * Sleep up to `timeout_ms` milliseconds. + */ +static void +ResizeWaitMs(int timeout_ms) +{ + int rc; + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + timeout_ms, + WAIT_EVENT_PG_SLEEP); + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); +} + +/* + * Shrink protocol: lower lowNBuffers first to restrict allocations, evict + * the [low, high) range, then drop highNBuffers to lowNBuffers, and only then + * release the OS-level memory backing that range. + * + * - pre: lowNBuffers == highNBuffers == old_size > new_size + * - post (success): + * lowNBuffers == highNBuffers == new_size + * - post (interrupted before highNBuffers is lowered): + * lowNBuffers == new_size, highNBuffers == old_size + * (recoverable: ResetResizeInProgress() rolls lowNBuffers back to high) + * - post (madvise(MADV_REMOVE) failure during BufferManagerShmemShrink): + * lowNBuffers == highNBuffers == new_size. We cannot roll back the shrink. + * + * Raises ERROR on unrecoverable failure. + */ +static void +DoShrink(ReturnSetInfo *rsinfo, int old_size, int new_size) +{ + instr_time phase_start; + instr_time phase_end; + Size mem_bytes; + bool shrink_success; + + Assert(new_size < old_size); + Assert(pg_atomic_read_u32(&DSBCtrl->lowNBuffers) == + pg_atomic_read_u32(&DSBCtrl->highNBuffers)); + + CHECK_FOR_INTERRUPTS(); + + /* + * Reset the clock-sweep cursor before lowering the low water mark. The + * existing cursor may point above new_size. Once we publish the new + * lowNBuffers, ClockSweepTick() may otherwise immediately wrap past + * the new buffers via modulo arithmetic. Resetting to 0 means the + * next sweep starts from the bottom of the surviving range. + */ + StrategyReset(old_size, new_size); + + elog(LOG, "[Shrink Barrier]: restricting allocations to %d buffers", new_size); + INSTR_TIME_SET_CURRENT(phase_start); + /* + * Wait for all backends to acknowledge the new lowNBuffers. After the + * barrier returns, all new buffer allocations will land in [0, lowNBuffers) + * range. For buffers in [lowNBuffers, highNBuffers), backends can + * hold pins and create new pins on buffers already pinned. + * The EvictExtraBuffers() loop below will wait for all buffers in + * [lowNBuffers, highNBuffers) to be unpinned. + */ + SharedBufferResizeBarrier(PROCSIGNAL_BARRIER_SHBUF_RESIZE, CppAsString(PROCSIGNAL_BARRIER_SHBUF_RESIZE)); + INSTR_TIME_SET_CURRENT(phase_end); + INSTR_TIME_SUBTRACT(phase_end, phase_start); + EmitResizeTimeRow(rsinfo, "Barrier", INSTR_TIME_GET_DOUBLE(phase_end)); + elog(LOG, "[Shrink Barrier]: Restricted allocations to %d buffers in %f seconds", new_size, INSTR_TIME_GET_DOUBLE(phase_end)); + + INJECTION_POINT("buf-resize-shrink-after-barrier", NULL); + + /* + * Evict all pages in [lowNBuffers, highNBuffers). + */ + elog(LOG, "[Shrink]: evicting buffers %u..%u", new_size, old_size); + INSTR_TIME_SET_CURRENT(phase_start); + { + instr_time last_log; + + INSTR_TIME_SET_CURRENT(last_log); + while (!EvictExtraBuffers(new_size, old_size)) + { + instr_time now; + + ResizeWaitMs(100); + + INSTR_TIME_SET_CURRENT(now); + INSTR_TIME_SUBTRACT(now, last_log); + if (INSTR_TIME_GET_DOUBLE(now) >= 5.0) + { + elog(LOG, "still waiting for buffers to be unpinned"); + INSTR_TIME_SET_CURRENT(last_log); + } + } + } + INSTR_TIME_SET_CURRENT(phase_end); + INSTR_TIME_SUBTRACT(phase_end, phase_start); + EmitResizeTimeRow(rsinfo, "Buffer relocation", INSTR_TIME_GET_DOUBLE(phase_end)); + elog(LOG, "[Shrink]: evicted %d buffers in %f seconds", old_size - new_size, INSTR_TIME_GET_DOUBLE(phase_end)); + + CHECK_FOR_INTERRUPTS(); + /* + * All the victim buffers are now empty and won't be allocated by backends. + * Take AccessNBuffersLock in exclusive mode so we wait for any backend + * still iterating with the old highNBuffers and prevent new + * ones from starting until we published the new highNBuffers. + */ + INSTR_TIME_SET_CURRENT(phase_start); + LWLockAcquire(&DSBCtrl->AccessNBuffersLock, LW_EXCLUSIVE); + pg_atomic_write_u32(&DSBCtrl->highNBuffers, new_size); + LWLockRelease(&DSBCtrl->AccessNBuffersLock); + + INJECTION_POINT("buf-resize-shrink-before-madvise", NULL); + + /* + * Release the memory. + */ + mem_bytes = BufferManagerShmemShrink(new_size, old_size, &shrink_success); + INSTR_TIME_SET_CURRENT(phase_end); + INSTR_TIME_SUBTRACT(phase_end, phase_start); + EmitResizeTimeRow(rsinfo, "Shrink shmem", INSTR_TIME_GET_DOUBLE(phase_end)); + EmitResizeBytesRow(rsinfo, "Shrink shmem", (double) mem_bytes); + elog(LOG, "[Shrink]: released %zu bytes of memory in %f seconds", mem_bytes, INSTR_TIME_GET_DOUBLE(phase_end)); + + if (!shrink_success) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("shared_buffers shrink from %d to %d failed", + old_size, new_size), + errdetail("madvise(MADV_REMOVE) failed while releasing buffer-pool memory; the failure is not recoverable."), + errhint("Check the server log for the underlying madvise() error."))); +} + +/* + * Expand protocol: allocate memory for the [old_size, new_size) range, + * initialize the new buffer descriptors, then publish both new lowNBuffers + * and highNBuffers atomically under the exclusive lock. + * + * - pre: lowNBuffers == highNBuffers == old_size < new_size + * - post (success): + * lowNBuffers == highNBuffers == new_size + * - post (madvise(MADV_POPULATE_WRITE) failure during BufferManagerShmemExpand): + * lowNBuffers == highNBuffers == old_size (water marks NOT advanced) + * Some bytes in [old_size, new_size) of the four buffer-pool arrays may + * have been allocated from the OS but never published to backends. + * + * Raises ERROR on madvise failure. + */ +static void +DoExpand(ReturnSetInfo *rsinfo, int old_size, int new_size) +{ + instr_time phase_start; + instr_time phase_end; + Size mem_bytes; + bool expand_success; + + Assert(new_size > old_size); + Assert(pg_atomic_read_u32(&DSBCtrl->lowNBuffers) == + pg_atomic_read_u32(&DSBCtrl->highNBuffers)); + + INSTR_TIME_SET_CURRENT(phase_start); + + inflight_expand_target = new_size; + + /* + * Allocate physical memory and initialize the new buffer descriptors + * BEFORE acquiring AccessNBuffersLock. Backends iterating the buffer + * pool only look at [0, highNBuffers); since highNBuffers is still at + * old_size, the new range is invisible to them, so it is safe to touch + * without the lock. + */ + mem_bytes = BufferManagerShmemExpand(old_size, new_size, &expand_success); + if (!expand_success) + { + INSTR_TIME_SET_CURRENT(phase_end); + INSTR_TIME_SUBTRACT(phase_end, phase_start); + EmitResizeTimeRow(rsinfo, "Expand shmem", INSTR_TIME_GET_DOUBLE(phase_end)); + EmitResizeBytesRow(rsinfo, "Expand shmem", (double) mem_bytes); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("shared_buffers expand from %d to %d failed", + old_size, new_size), + errdetail("madvise(MADV_POPULATE_WRITE) failed while populating buffer-pool memory; the new range was not made visible to backends."), + errhint("Check the server log for the underlying madvise() error and retry."))); + } + + BufferManagerShmemInitBuffers(old_size, new_size); + + INJECTION_POINT("buf-resize-expand-before-publish", NULL); + + /* + * Hold AccessNBuffersLock in exclusive mode while we publish the new + * water marks. Backends taking the lock in shared mode (e.g. via + * BEGIN_NBUFFERS_ACCESS) either run entirely before this critical + * section and see lowNBuffers == highNBuffers == old_size, or entirely + * after and see lowNBuffers == highNBuffers == new_size with valid + * memory; they never observe the partially initialized intermediate + * state. Concurrent atomics readers (clock sweep / freelist) may + * briefly see lowNBuffers < highNBuffers between the two writes below; + * that is fine because both bounds are now backed by initialized + * memory, so a clock sweep wrapping into the [old_size, new_size) range + * is safe. + */ + LWLockAcquire(&DSBCtrl->AccessNBuffersLock, LW_EXCLUSIVE); + /* + * Reset the clock-sweep cursor to the start of the new buffers so the + * next clock pass tries the freshly added empty buffers before + * re-scanning existing ones with usage_count == 0. + */ + StrategyReset(old_size, new_size); + LWLockRelease(&DSBCtrl->AccessNBuffersLock); + + /* + * The expand is complete. + */ + inflight_expand_target = 0; + + INSTR_TIME_SET_CURRENT(phase_end); + INSTR_TIME_SUBTRACT(phase_end, phase_start); + EmitResizeTimeRow(rsinfo, "Expand shmem", INSTR_TIME_GET_DOUBLE(phase_end)); + EmitResizeBytesRow(rsinfo, "Expand shmem", (double) mem_bytes); + elog(LOG, "[Expand]: expanded buffer pool memory with %zu bytes in %f seconds", mem_bytes, INSTR_TIME_GET_DOUBLE(phase_end)); +} + +/* + * Cleanup callback. Runs from the transaction-abort PG_CATCH path *and* from + * before_shmem_exit() if the backend dies while holding the resize slot. + * + * Rollback policy: + * - Partial shrink (lowNBuffers < highNBuffers): restore lowNBuffers to + * highNBuffers so the buffer pool is consistent at the larger size. + * Memory for [lowNBuffers, highNBuffers) is still mapped, so rolling + * back is safe. + * - Partial expand: BufferManagerShmemExpand() may have populated some of + * [old_size, inflight_expand_target) without publishing the new water + * marks. This is wasteful but harmless. We surface a WARNING so operators + * know to retry the resize. + */ +static void +ResetResizeInProgress(int code, Datum arg) +{ + uint32 high; + uint32 low; + int expand_target; + bool shrink_failed = false; + bool expand_failed = false; + + if (!coordinator_active || DSBCtrl == NULL) + return; + + Assert(DSBCtrl->resize_in_progress); + Assert(DSBCtrl->coordinator_pid == MyProcPid); + + coordinator_active = false; + + high = pg_atomic_read_u32(&DSBCtrl->highNBuffers); + low = pg_atomic_read_u32(&DSBCtrl->lowNBuffers); + if (low < high) + { + shrink_failed = true; + pg_atomic_write_u32(&DSBCtrl->lowNBuffers, high); + } + + expand_target = inflight_expand_target; + if (expand_target != 0) + { + expand_failed = true; + inflight_expand_target = 0; + } + + ReleaseResizeCoordinator(); + + /* + * Emit user-visible warnings AFTER all critical cleanup. + */ + if (shrink_failed) + ereport(WARNING, + (errmsg("shared_buffers shrink was interrupted; rolling back lowNBuffers from %u to %u", + (unsigned) low, (unsigned) high))); + + if (expand_failed) + ereport(WARNING, + (errmsg("shared_buffers expand to %d was interrupted", + expand_target), + errdetail("Some memory in [%u, %d) may have been allocated from the OS but not made visible to backends. It will sit unused in shmem until a future successful resize re-initializes that range.", + (unsigned) high, expand_target))); +} + +Datum +pg_resize_shared_buffers(PG_FUNCTION_ARGS) +{ + instr_time total_start; + instr_time total_end; + + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + int old_size; + int new_size; + char *new_size_str; + + INSTR_TIME_SET_CURRENT(total_start); + + if (!enable_dynamic_shared_buffers) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("shared buffer pool resizing requires enable_dynamic_shared_buffers"))); + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to resize shared_buffers"))); + + if (PG_NARGS() != 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("pg_resize_shared_buffers requires exactly one argument (the new shared_buffers value)"))); + if (PG_ARGISNULL(0)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("new_size argument to pg_resize_shared_buffers must not be NULL"))); + + /* + * Restrict callers to regular client backends. + */ + Assert(MyBackendType == B_BACKEND); + + /* + * Parse the requested size first so we fail fast on bad input before + * claiming the resize_in_progress flag. + */ + new_size_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); + new_size = ParseNewSize(new_size_str); + + InitMaterializedSRF(fcinfo, 0); + + /* + * Register the FATAL-exit cleanup once per backend lifetime. + */ + if (!cleanup_registered) + { + before_shmem_exit(ResetResizeInProgress, (Datum) 0); + cleanup_registered = true; + } + + if (!ClaimResizeCoordinator()) + { + elog(LOG, "shared buffer resizing is already in progress"); + EmitResizeMetricRow(rsinfo, "resize already in progress", 0, NULL, true); + return (Datum) 0; + } + + /* + * Mark this backend as the local coordinator so the cleanup callback + * knows to release the shared slot on error / exit. + */ + coordinator_active = true; + + PG_TRY(); + { + INJECTION_POINT("buf-resize-after-claim", NULL); + + old_size = pg_atomic_read_u32(&DSBCtrl->lowNBuffers); + /* + * The highNBuffers should be equal to lowNBuffers. + */ + Assert(pg_atomic_read_u32(&DSBCtrl->highNBuffers) == old_size); + + if (old_size == new_size) + { + elog(LOG, "shared buffers are already at %d, no need to resize", old_size); + EmitResizeTimeRow(rsinfo, "No resize", 0.0); + } + else + { + elog(LOG, "resizing shared buffers from %d to %d", old_size, new_size); + + if (new_size < old_size) + DoShrink(rsinfo, old_size, new_size); + else + DoExpand(rsinfo, old_size, new_size); + + Assert(pg_atomic_read_u32(&DSBCtrl->lowNBuffers) == (uint32) new_size); + Assert(pg_atomic_read_u32(&DSBCtrl->highNBuffers) == (uint32) new_size); + + INSTR_TIME_SET_CURRENT(total_end); + INSTR_TIME_SUBTRACT(total_end, total_start); + EmitResizeTimeRow(rsinfo, "Total Resize Time", INSTR_TIME_GET_DOUBLE(total_end)); + elog(LOG, "successfully resized shared buffers to %d", new_size); + } + } + PG_CATCH(); + { + ResetResizeInProgress(0, (Datum) 0); + PG_RE_THROW(); + } + PG_END_TRY(); + + ResetResizeInProgress(0, (Datum) 0); + return (Datum) 0; +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index cc398db124d..6a7302917a1 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -58,6 +58,7 @@ #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/pg_shmem.h" #include "storage/proc.h" #include "storage/proclist.h" #include "storage/procsignal.h" @@ -92,7 +93,7 @@ * being dropped. For the relations with size below this threshold, we find * the buffers by doing lookups in BufMapping table. */ -#define BUF_DROP_FULL_SCAN_THRESHOLD (uint64) (NBuffers / 32) +#define BUF_DROP_FULL_SCAN_THRESHOLD (uint64) (GetHighNBuffers() / 32) /* * This is separated out from PrivateRefCountEntry to allow for copying all @@ -633,7 +634,9 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy, static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf); static void UnpinBufferNoOwner(BufferDesc *buf); -static void BufferSync(int flags); +static bool EvictUnpinnedBufferInternal(BufferDesc *desc, bool *buffer_flushed); +static void BufferSync(int flags, int localNBuffers); +static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); @@ -847,6 +850,12 @@ ReadRecentBuffer(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockN } else { + /* + * Reject any recent_buffer that points above the live low watermark. + */ + if (recent_buffer > GetLowNBuffers()) + return false; + bufHdr = GetBufferDescriptor(recent_buffer - 1); /* @@ -2707,6 +2716,7 @@ uint32 GetAdditionalPinLimit(void) { uint32 estimated_pins_held; + uint32 limit; /* * We get the number of "overflowed" pins for free, but don't know the @@ -2715,11 +2725,17 @@ GetAdditionalPinLimit(void) */ estimated_pins_held = PrivateRefCountOverflowed + REFCOUNT_ARRAY_ENTRIES; + /* + * Consult get_pin_limit_hook so the per-backend limit tracks the live + * buffer pool size. + */ + limit = enable_dynamic_shared_buffers && get_pin_limit_hook ? get_pin_limit_hook() : MaxProportionalPins; + /* Is this backend already holding more than its fair share? */ - if (estimated_pins_held > MaxProportionalPins) + if (estimated_pins_held > limit) return 0; - return MaxProportionalPins - estimated_pins_held; + return limit - estimated_pins_held; } /* @@ -3558,7 +3574,7 @@ TrackNewBufferPin(Buffer buf) * currently have no effect here. */ static void -BufferSync(int flags) +BufferSync(int flags, int localNBuffers) { uint64 buf_state; int buf_id; @@ -3599,7 +3615,7 @@ BufferSync(int flags) * certainly need to be written for the next checkpoint attempt, too. */ num_to_scan = 0; - for (buf_id = 0; buf_id < NBuffers; buf_id++) + for (buf_id = 0; buf_id < localNBuffers; buf_id++) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); uint64 set_bits = 0; @@ -3628,7 +3644,7 @@ BufferSync(int flags) set_bits, 0, 0); - /* Check for barrier events in case NBuffers is large. */ + /* Check for barrier events in case the buffer pool is large. */ if (ProcSignalBarrierPending) ProcessProcSignalBarrier(); } @@ -3638,7 +3654,7 @@ BufferSync(int flags) WritebackContextInit(&wb_context, &checkpoint_flush_after); - TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan); + TRACE_POSTGRESQL_BUFFER_SYNC_START(localNBuffers, num_to_scan); /* * Sort buffers that need to be written to reduce the likelihood of random @@ -3822,7 +3838,7 @@ BufferSync(int flags) */ CheckpointStats.ckpt_bufs_written += num_written; - TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan); + TRACE_POSTGRESQL_BUFFER_SYNC_DONE(localNBuffers, num_written, num_to_scan); } /* @@ -3881,10 +3897,28 @@ BgBufferSync(WritebackContext *wb_context) uint32 new_recent_alloc; /* - * Find out where the clock-sweep currently is, and how many buffer - * allocations have happened since our last call. + * Snapshot of lowNBuffers from the previous invocation. Whenever the + * value changes a buffer-pool resize has happened: the smoothed + * allocation rate / clean-buffer density we accumulated for the old size + * are no longer meaningful, so we invalidate saved_info_valid and start + * fresh. */ - strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc); + static int saved_low_nbuffers = 0; + int current_low_nbuffers; + + BEGIN_NBUFFERS_ACCESS(localNBuffers); + + strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc, + ¤t_low_nbuffers); + if (current_low_nbuffers != saved_low_nbuffers) + { +#ifdef BGW_DEBUG + elog(DEBUG2, "invalidated background writer state after pool resize: %d -> %d buffers", + saved_low_nbuffers, current_low_nbuffers); +#endif + saved_info_valid = false; + saved_low_nbuffers = current_low_nbuffers; + } /* Report buffer alloc counts to pgstat */ PendingBgWriterStats.buf_alloc += recent_alloc; @@ -3897,6 +3931,7 @@ BgBufferSync(WritebackContext *wb_context) if (bgwriter_lru_maxpages <= 0) { saved_info_valid = false; + END_NBUFFERS_ACCESS(localNBuffers); return true; } @@ -3913,7 +3948,7 @@ BgBufferSync(WritebackContext *wb_context) int32 passes_delta = strategy_passes - prev_strategy_passes; strategy_delta = strategy_buf_id - prev_strategy_buf_id; - strategy_delta += (long) passes_delta * NBuffers; + strategy_delta += (long) passes_delta * localNBuffers; Assert(strategy_delta >= 0); @@ -3932,7 +3967,7 @@ BgBufferSync(WritebackContext *wb_context) next_to_clean >= strategy_buf_id) { /* on same pass, but ahead or at least not behind */ - bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id); + bufs_to_lap = localNBuffers - (next_to_clean - strategy_buf_id); #ifdef BGW_DEBUG elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d", next_passes, next_to_clean, @@ -3954,7 +3989,7 @@ BgBufferSync(WritebackContext *wb_context) #endif next_to_clean = strategy_buf_id; next_passes = strategy_passes; - bufs_to_lap = NBuffers; + bufs_to_lap = localNBuffers; } } else @@ -3970,7 +4005,7 @@ BgBufferSync(WritebackContext *wb_context) strategy_delta = 0; next_to_clean = strategy_buf_id; next_passes = strategy_passes; - bufs_to_lap = NBuffers; + bufs_to_lap = localNBuffers; } /* Update saved info for next time */ @@ -3996,7 +4031,7 @@ BgBufferSync(WritebackContext *wb_context) * strategy point and where we've scanned ahead to, based on the smoothed * density estimate. */ - bufs_ahead = NBuffers - bufs_to_lap; + bufs_ahead = localNBuffers - bufs_to_lap; reusable_buffers_est = (float) bufs_ahead / smoothed_density; /* @@ -4034,7 +4069,7 @@ BgBufferSync(WritebackContext *wb_context) * the BGW will be called during the scan_whole_pool time; slice the * buffer pool into that many sections. */ - min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay)); + min_scan_buffers = (int) (localNBuffers / (scan_whole_pool_milliseconds / BgWriterDelay)); if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est)) { @@ -4062,7 +4097,7 @@ BgBufferSync(WritebackContext *wb_context) int sync_state = SyncOneBuffer(next_to_clean, true, wb_context); - if (++next_to_clean >= NBuffers) + if (++next_to_clean >= localNBuffers) { next_to_clean = 0; next_passes++; @@ -4116,6 +4151,8 @@ BgBufferSync(WritebackContext *wb_context) #endif } + END_NBUFFERS_ACCESS(localNBuffers); + /* Return true if OK to hibernate */ return (bufs_to_lap == 0 && recent_alloc == 0); } @@ -4231,7 +4268,7 @@ InitBufferManagerAccess(void) * allow plenty of pins. LimitAdditionalPins() and * GetAdditionalPinLimit() can be used to check the remaining balance. */ - MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS); + MaxProportionalPins = GetMaxNBuffers() / (MaxBackends + NUM_AUXILIARY_PROCS); memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray)); memset(&PrivateRefCountArrayKeys, 0, sizeof(PrivateRefCountArrayKeys)); @@ -4371,6 +4408,12 @@ AssertNotCatalogBufferLock(Buffer buffer, BufferLockMode mode) if (mode != BUFFER_LOCK_EXCLUSIVE) return; + if (!((BufferDescPadded *) lock > BufferDescriptors && + (BufferDescPadded *) lock < BufferDescriptors + GetMaxNBuffers())) + return; /* not a buffer lock */ + + bufHdr = (BufferDesc *) + ((char *) lock - offsetof(BufferDesc, content_lock)); tag = bufHdr->tag; /* @@ -4440,7 +4483,9 @@ DebugPrintBufferRefcount(Buffer buffer) void CheckPointBuffers(int flags) { - BufferSync(flags); + BEGIN_NBUFFERS_ACCESS(localNBuffers); + BufferSync(flags, localNBuffers); + END_NBUFFERS_ACCESS(localNBuffers); } /* @@ -4779,6 +4824,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, RelFileLocatorBackend rlocator; BlockNumber nForkBlock[MAX_FORKNUM]; uint64 nBlocksToInvalidate = 0; + BEGIN_NBUFFERS_ACCESS(localNBuffers); rlocator = smgr_reln->smgr_rlocator; @@ -4842,7 +4888,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, return; } - for (i = 0; i < NBuffers; i++) + for (i = 0; i < localNBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -4880,6 +4926,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, if (j >= nforks) UnlockBufHdr(bufHdr); } + END_NBUFFERS_ACCESS(localNBuffers); } /* --------------------------------------------------------------------- @@ -4901,6 +4948,7 @@ DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators) RelFileLocator *locators; bool cached = true; bool use_bsearch; + BEGIN_NBUFFERS_ACCESS(localNBuffers); if (nlocators == 0) return; @@ -5003,7 +5051,7 @@ DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators) if (use_bsearch) qsort(locators, n, sizeof(RelFileLocator), rlocator_comparator); - for (i = 0; i < NBuffers; i++) + for (i = 0; i < localNBuffers; i++) { RelFileLocator *rlocator = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -5046,6 +5094,7 @@ DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators) else UnlockBufHdr(bufHdr); } + END_NBUFFERS_ACCESS(localNBuffers); pfree(locators); pfree(rels); @@ -5130,7 +5179,8 @@ DropDatabaseBuffers(Oid dbid) * database isn't our own. */ - for (i = 0; i < NBuffers; i++) + BEGIN_NBUFFERS_ACCESS(localNBuffers); + for (i = 0; i < localNBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -5147,6 +5197,7 @@ DropDatabaseBuffers(Oid dbid) else UnlockBufHdr(bufHdr); } + END_NBUFFERS_ACCESS(localNBuffers); } /* --------------------------------------------------------------------- @@ -5174,7 +5225,9 @@ FlushRelationBuffers(Relation rel) BufferDesc *bufHdr; SMgrRelation srel = RelationGetSmgr(rel); - if (RelationUsesLocalBuffers(rel)) + BEGIN_NBUFFERS_ACCESS(localNBuffers); + + if (RelationUsesLocalBuffers(rel) || am_wal_redo_postgres) { for (i = 0; i < NLocBuffer; i++) { @@ -5213,10 +5266,12 @@ FlushRelationBuffers(Relation rel) } } + END_NBUFFERS_ACCESS(localNBuffers); + return; } - for (i = 0; i < NBuffers; i++) + for (i = 0; i < localNBuffers; i++) { uint64 buf_state; @@ -5244,6 +5299,7 @@ FlushRelationBuffers(Relation rel) else UnlockBufHdr(bufHdr); } + END_NBUFFERS_ACCESS(localNBuffers); } /* --------------------------------------------------------------------- @@ -5261,6 +5317,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels) int i; SMgrSortArray *srels; bool use_bsearch; + BEGIN_NBUFFERS_ACCESS(localNBuffers); if (nrels == 0) return; @@ -5286,7 +5343,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels) if (use_bsearch) qsort(srels, nrels, sizeof(SMgrSortArray), rlocator_comparator); - for (i = 0; i < NBuffers; i++) + for (i = 0; i < localNBuffers; i++) { SMgrSortArray *srelent = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -5339,6 +5396,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels) else UnlockBufHdr(bufHdr); } + END_NBUFFERS_ACCESS(localNBuffers); pfree(srels); } @@ -5537,7 +5595,8 @@ FlushDatabaseBuffers(Oid dbid) int i; BufferDesc *bufHdr; - for (i = 0; i < NBuffers; i++) + BEGIN_NBUFFERS_ACCESS(localNBuffers); + for (i = 0; i < localNBuffers; i++) { uint64 buf_state; @@ -5565,6 +5624,7 @@ FlushDatabaseBuffers(Oid dbid) else UnlockBufHdr(bufHdr); } + END_NBUFFERS_ACCESS(localNBuffers); } /* @@ -7991,11 +8051,13 @@ void EvictAllUnpinnedBuffers(int32 *buffers_evicted, int32 *buffers_flushed, int32 *buffers_skipped) { + BEGIN_NBUFFERS_ACCESS(localNBuffers); + *buffers_evicted = 0; *buffers_skipped = 0; *buffers_flushed = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= localNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state; @@ -8020,6 +8082,7 @@ EvictAllUnpinnedBuffers(int32 *buffers_evicted, int32 *buffers_flushed, if (buffer_flushed) (*buffers_flushed)++; } + END_NBUFFERS_ACCESS(localNBuffers); } /* @@ -8041,13 +8104,15 @@ void EvictRelUnpinnedBuffers(Relation rel, int32 *buffers_evicted, int32 *buffers_flushed, int32 *buffers_skipped) { + BEGIN_NBUFFERS_ACCESS(localNBuffers); + Assert(!RelationUsesLocalBuffers(rel)); *buffers_skipped = 0; *buffers_evicted = 0; *buffers_flushed = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= localNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state = pg_atomic_read_u64(&(desc->state)); @@ -8082,6 +8147,7 @@ EvictRelUnpinnedBuffers(Relation rel, int32 *buffers_evicted, if (buffer_flushed) (*buffers_flushed)++; } + END_NBUFFERS_ACCESS(localNBuffers); } /* @@ -8965,3 +9031,87 @@ const PgAioHandleCallbacks aio_local_buffer_readv_cb = { .complete_local = local_buffer_readv_complete, .report = buffer_readv_report, }; + +/* + * When shrinking the shared buffer pool, evict every buffer in the + * range [lowNBuffers, highNBuffers). + * + * Returns true once every buffer in the range is empty. Returns false if + * any buffer was still pinned. + */ +bool +EvictExtraBuffers(int lowNBuffers, int highNBuffers) +{ + bool result = true; + + Assert(lowNBuffers < highNBuffers); + + /* + * If the buffer being evicted is locked, this function will need to + * wait. This function should not be called from a Postmaster since it can + * not wait on a lock. + */ + Assert(IsUnderPostmaster); + + for (int buf_id = lowNBuffers; buf_id < highNBuffers; buf_id++) + { + BufferDesc *desc = GetBufferDescriptor(buf_id); + uint32 buf_state; + bool buffer_flushed; + + /* Make sure we can pin the buffer (PinBuffer_Locked contract). */ + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + buf_state = LockBufHdr(desc); + + if (BUF_STATE_GET_REFCOUNT(buf_state) > 0) + { + UnlockBufHdr(desc, buf_state); + result = false; + continue; + } + + if (!(buf_state & BM_VALID)) + { + /* + * Buffer is not valid, but it might still have a BufTable + * entry: a previous read IO may have failed, or the backend + * that started allocating this slot was cancelled before the + * read completed (e.g. autovacuum cancellation). In that case + * the descriptor is left with BM_TAG_VALID set, refcount=0, + * and the hash table still mapping tag -> buf_id. + * + * If we leave that BufTable entry behind, a later expand that + * re-initializes this slot (clearing tag to InvalidBlockNumber + * and BM_TAG_VALID) will desynchronize BufTable from the + * descriptor, and the next reader of the original block will + * fail the BufferGetBlockNumber assertion in + * CheckReadBuffersOperation. Drop the stale entry now. + */ + if (buf_state & BM_TAG_VALID) + { + PinBuffer_Locked(desc); + if (!InvalidateVictimBuffer(desc)) + { + /* Lost a race with another pinner; retry later. */ + result = false; + } + UnpinBuffer(desc); + } + else + { + UnlockBufHdr(desc, buf_state); + } + continue; + } + + if (!EvictUnpinnedBufferInternal(desc, &buffer_flushed)) + { + elog(WARNING, "could not evict buffer %d, it is pinned", buf_id); + result = false; + } + } + + return result; +} diff --git a/src/backend/storage/buffer/dynamic_shared_buffers.c b/src/backend/storage/buffer/dynamic_shared_buffers.c new file mode 100644 index 00000000000..efc535d365c --- /dev/null +++ b/src/backend/storage/buffer/dynamic_shared_buffers.c @@ -0,0 +1,125 @@ +/*------------------------------------------------------------------------- + * + * dynamic_shared_buffers.c + * Coordination state and helpers for resizing shared_buffers at runtime. + * + * The dynamic shared buffer (DSB) machinery lets shared_buffers grow and + * shrink while the cluster is running. + * + * See pgxn/neon/README.md ("Dynamic shared buffer") for the full design and + * the resize protocol. + * + * IDENTIFICATION + * src/backend/storage/buffer/dynamic_shared_buffers.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/dynamic_shared_buffers.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/procsignal.h" +#include "storage/shmem.h" +#include "storage/spin.h" + +DynamicSharedBuffersControl *DSBCtrl = NULL; + +/* + * AcquireNBuffersLock + * + * Take AccessNBuffersLock in shared mode and return the current high water + * mark. Callers iterate up to that bound. The lock keeps the resize + * coordinator (which acquires the lock exclusively) waiting until we drop it. + */ +int +AcquireNBuffersLock(void) +{ + if (!enable_dynamic_shared_buffers) + return GetHighNBuffers(); + + if (DSBCtrl != NULL) + LWLockAcquire(&DSBCtrl->AccessNBuffersLock, LW_SHARED); + return GetHighNBuffers(); +} + +void +ReleaseNBuffersLock(void) +{ + if (!enable_dynamic_shared_buffers) + return; + if (DSBCtrl != NULL) + LWLockRelease(&DSBCtrl->AccessNBuffersLock); +} + +/* + * Try to claim coordinator status for a buffer-pool resize. + * + * Returns true if we are now the coordinator, or false if another backend + * is performing a resize. + */ +bool +ClaimResizeCoordinator(void) +{ + bool claimed = false; + + Assert(DSBCtrl != NULL); + + SpinLockAcquire(&DSBCtrl->coordinator_lock); + if (!DSBCtrl->resize_in_progress) + { + DSBCtrl->resize_in_progress = true; + DSBCtrl->coordinator_pid = MyProcPid; + claimed = true; + } + SpinLockRelease(&DSBCtrl->coordinator_lock); + + return claimed; +} + +/* + * Release the coordinator slot acquired by ClaimResizeCoordinator(). + */ +void +ReleaseResizeCoordinator(void) +{ + Assert(DSBCtrl != NULL); + + SpinLockAcquire(&DSBCtrl->coordinator_lock); + Assert(DSBCtrl->resize_in_progress); + Assert(DSBCtrl->coordinator_pid == MyProcPid); + DSBCtrl->resize_in_progress = false; + DSBCtrl->coordinator_pid = InvalidPid; + SpinLockRelease(&DSBCtrl->coordinator_lock); +} + +/* + * DSBControlInit + * + * Allocate and initialize the DynamicSharedBuffersControl structure in shared + * memory. Must be called before BufferManagerShmemInit so that DSBCtrl is + * available when the buffer pool is set up. + */ +void +DSBControlInit(void) +{ + bool foundDSBCtrl; + + DSBCtrl = (DynamicSharedBuffersControl *) + ShmemInitStruct("DSB Control", sizeof(DynamicSharedBuffersControl), + &foundDSBCtrl); + + if (!foundDSBCtrl) + { + pg_atomic_init_u32(&DSBCtrl->lowNBuffers, NBuffersGUC); + pg_atomic_init_u32(&DSBCtrl->highNBuffers, NBuffersGUC); + + SpinLockInit(&DSBCtrl->coordinator_lock); + DSBCtrl->resize_in_progress = false; + DSBCtrl->coordinator_pid = InvalidPid; + + LWLockInitialize(&DSBCtrl->AccessNBuffersLock, + LWTRANCHE_ACCESS_NBUFFERS); + } +} diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index fdb5bad7910..ee2eb520595 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -37,7 +37,7 @@ typedef struct /* * clock-sweep hand: index of next buffer to consider grabbing. Note that * this isn't a concrete buffer - we only ever increase the value. So, to - * get an actual buffer, it needs to be used modulo NBuffers. + * get an actual buffer, it needs to be used modulo lowNBuffers. */ pg_atomic_uint32 nextVictimBuffer; @@ -110,6 +110,7 @@ static inline uint32 ClockSweepTick(void) { uint32 victim; + int lowNBuffers; /* * Atomically move hand ahead one buffer - if there's several processes @@ -118,13 +119,14 @@ ClockSweepTick(void) */ victim = pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1); + lowNBuffers = GetLowNBuffers(); - if (victim >= NBuffers) + if (victim >= lowNBuffers) { uint32 originalVictim = victim; /* always wrap what we look up in BufferDescriptors */ - victim = victim % NBuffers; + victim = victim % lowNBuffers; /* * If we're the one that just caused a wraparound, force @@ -152,7 +154,7 @@ ClockSweepTick(void) */ SpinLockAcquire(&StrategyControl->buffer_strategy_lock); - wrapped = expected % NBuffers; + wrapped = expected % GetLowNBuffers(); success = pg_atomic_compare_exchange_u32(&StrategyControl->nextVictimBuffer, &expected, wrapped); @@ -237,7 +239,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_r pg_atomic_fetch_add_u32(&StrategyControl->numBufferAllocs, 1); /* Use the "clock sweep" algorithm to find a free buffer */ - trycounter = NBuffers; + trycounter = GetLowNBuffers(); for (;;) { uint64 old_buf_state; @@ -290,7 +292,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_r if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state, local_buf_state)) { - trycounter = NBuffers; + trycounter = GetLowNBuffers(); break; } } @@ -328,14 +330,17 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_r * being read. */ int -StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc) +StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc, + int *low_nbuffers) { uint32 nextVictimBuffer; int result; + int lowNBuffers; SpinLockAcquire(&StrategyControl->buffer_strategy_lock); nextVictimBuffer = pg_atomic_read_u32(&StrategyControl->nextVictimBuffer); - result = nextVictimBuffer % NBuffers; + lowNBuffers = GetLowNBuffers(); + result = nextVictimBuffer % lowNBuffers; if (complete_passes) { @@ -345,13 +350,15 @@ StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc) * Additionally add the number of wraparounds that happened before * completePasses could be incremented. C.f. ClockSweepTick(). */ - *complete_passes += nextVictimBuffer / NBuffers; + *complete_passes += nextVictimBuffer / lowNBuffers; } if (num_buf_alloc) { *num_buf_alloc = pg_atomic_exchange_u32(&StrategyControl->numBufferAllocs, 0); } + if (low_nbuffers) + *low_nbuffers = lowNBuffers; SpinLockRelease(&StrategyControl->buffer_strategy_lock); return result; } @@ -522,10 +529,15 @@ GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb) if (ring_buffers == 0) return NULL; - /* Cap to 1/8th of shared_buffers */ - ring_buffers = Min(NBuffers / 8, ring_buffers); + /* + * Cap to 1/8th of shared_buffers. Using GetLowNBuffers() here is fine even + * though it is a non-critical sizing decision: the strategy survives a + * resize because the ring size is fixed once the strategy is created. + */ + ring_buffers = Min(GetLowNBuffers() / 8, ring_buffers); - /* NBuffers should never be less than 16, so this shouldn't happen */ + /* shared_buffers should never be less than MIN_SHARED_BUFFERS, + * so this shouldn't happen */ Assert(ring_buffers > 0); /* Allocate the object and initialize all elements to zeroes */ @@ -574,7 +586,7 @@ int GetAccessStrategyPinLimit(BufferAccessStrategy strategy) { if (strategy == NULL) - return NBuffers; + return GetLowNBuffers(); switch (strategy->btype) { @@ -768,3 +780,36 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_r return true; } + +/* + * StrategyReset -- reset the clock-sweep cursor for a buffer pool resize. + * + * Called by pg_resize_shared_buffers() at two distinct points: + * + * - Just before publishing a lower lowNBuffers (shrink). The existing + * cursor may already point above new_size; resetting to 0 makes the + * next clock sweep start from the bottom of the surviving range and + * avoids ClockSweepTick() wrapping past the new buffers via modulo + * arithmetic right as the bound moves. + * + * - At the end of an expand, after the new descriptors are initialized, + * to point the cursor at the start of the freshly added range so the + * next sweep tries the empty buffers before re-scanning existing ones + * with usage_count == 0. + */ +void +StrategyReset(int old_size, int new_size) +{ + SpinLockAcquire(&StrategyControl->buffer_strategy_lock); + if (new_size > old_size) + { + /* expand: point cursor at start of new range */ + pg_atomic_write_u32(&StrategyControl->nextVictimBuffer, old_size); + } + else + { + /* shrink: rewind cursor to the bottom of the surviving range */ + pg_atomic_write_u32(&StrategyControl->nextVictimBuffer, 0); + } + SpinLockRelease(&StrategyControl->buffer_strategy_lock); +} diff --git a/src/backend/storage/buffer/meson.build b/src/backend/storage/buffer/meson.build index ed84bf08971..845a26b8db3 100644 --- a/src/backend/storage/buffer/meson.build +++ b/src/backend/storage/buffer/meson.build @@ -2,8 +2,10 @@ backend_sources += files( 'buf_init.c', + 'buf_resize.c', 'buf_table.c', 'bufmgr.c', + 'dynamic_shared_buffers.c', 'freelist.c', 'localbuf.c', ) diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 264e4c22ca6..b8045c02997 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -223,6 +223,18 @@ ProcSignalInit(const uint8 *cancel_key, int cancel_key_len) on_shmem_exit(CleanupProcSignalState, (Datum) 0); } +/* + * IsProcSignalInitialized + * Return true if this process has registered itself with the + * ProcSignal subsystem (via ProcSignalInit) and not yet released its + * slot in CleanupProcSignalState. + */ +bool +IsProcSignalInitialized(void) +{ + return MyProcSignalSlot != NULL; +} + /* * CleanupProcSignalState * Remove current process from ProcSignal mechanism @@ -590,6 +602,11 @@ ProcessProcSignalBarrier(void) case PROCSIGNAL_BARRIER_CHECKSUM_OFF: processed = AbsorbDataChecksumsBarrier(type); break; + case PROCSIGNAL_BARRIER_SHBUF_RESIZE: + /* Just acknowledge; the resize coordinator only needs + * confirmation that all backends have observed the + * updated lowNBuffers. */ + break; } /* diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 560659f9568..ada14fc2a67 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -417,6 +417,7 @@ XactSLRU "Waiting to access the transaction status SLRU cache." ParallelVacuumDSA "Waiting for parallel vacuum dynamic shared memory allocation." AioUringCompletion "Waiting for another process to complete IO via io_uring." ShmemIndex "Waiting to find or allocate space in shared memory." +AccessNBuffers "Waiting to access the current shared buffer count during dynamic shared buffer resize." # No "ABI_compatibility" region here as WaitEventLWLock has its own C code. diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index bbd28d14d99..97cfd41d864 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -141,7 +141,9 @@ int max_parallel_maintenance_workers = 2; * MaxBackends is computed by PostmasterMain after modules have had a chance to * register background workers. */ -int NBuffers = 16384; +int NBuffersGUC = 16384; +bool enable_dynamic_shared_buffers = false; +int MaxNBuffers = 0; int MaxConnections = 100; int max_worker_processes = 8; int max_parallel_workers = 8; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 774bbc9be5f..e5ed335ce2d 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -41,6 +41,7 @@ #include "miscadmin.h" #include "parser/scansup.h" #include "port/pg_bitutils.h" +#include "storage/dynamic_shared_buffers.h" #include "storage/fd.h" #include "storage/lwlock.h" #include "storage/shmem.h" @@ -5391,6 +5392,13 @@ ShowGUCOption(const struct config_generic *record, bool use_units) { const struct config_int *conf = &record->_int; + /* + * Set NBuffersGUC here so that both SHOW shared_buffers (use_units==true) + * and pg_settings (use_units==false) reflect the current shared buffer pool size. + */ + if (conf->variable == &NBuffersGUC) + NBuffersGUC = GetLowNBuffers(); + if (conf->show_hook) val = conf->show_hook(); else diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index afaa058b046..15d0ed35c5b 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2704,12 +2704,28 @@ { name => 'shared_buffers', type => 'int', context => 'PGC_POSTMASTER', group => 'RESOURCES_MEM', short_desc => 'Sets the number of shared memory buffers used by the server.', flags => 'GUC_UNIT_BLOCKS', - variable => 'NBuffers', + variable => 'NBuffersGUC', boot_val => '16384', - min => '16', + min => 'MIN_SHARED_BUFFERS', + max => 'INT_MAX / 2', +}, + +{ name => 'max_shared_buffers', type => 'int', context => 'PGC_POSTMASTER', group => 'RESOURCES_MEM', + short_desc => 'Sets the upper limit for the shared_buffers value.', + long_desc => 'If set above zero, it must be at least shared_buffers.', + flags => 'GUC_UNIT_BLOCKS', + variable => 'MaxNBuffers', + boot_val => '0', + min => '0', max => 'INT_MAX / 2', }, +{ name => 'enable_dynamic_shared_buffers', type => 'bool', context => 'PGC_POSTMASTER', group => 'RESOURCES_MEM', + short_desc => 'Enables dynamic resizing of the shared buffer pool.', + variable => 'enable_dynamic_shared_buffers', + boot_val => 'false', +}, + { name => 'shared_memory_size', type => 'int', context => 'PGC_INTERNAL', group => 'PRESET_OPTIONS', short_desc => 'Shows the size of the server\'s main shared memory area (rounded up to the nearest MB).', flags => 'GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE | GUC_UNIT_MB | GUC_RUNTIME_COMPUTED', diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index be157a5fbe9..904470a3f34 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12693,4 +12693,13 @@ proname => 'hashoid8extended', prorettype => 'int8', proargtypes => 'oid8 int8', prosrc => 'hashoid8extended' }, +# Online shared buffer pool resizing (see src/backend/storage/buffer/buf_resize.c) +{ oid => '6500', descr => 'resize the shared buffer pool to a new size', + proname => 'pg_resize_shared_buffers', provolatile => 'v', proretset => 't', + prorettype => 'record', proargtypes => 'text', + proallargtypes => '{text,text,float8,text}', + proargmodes => '{i,o,o,o}', + proargnames => '{new_size,key,value,unit}', + prosrc => 'pg_resize_shared_buffers' }, + ] diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 8ccdf61246b..e93eaaf6b1c 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -175,12 +175,28 @@ extern PGDLLIMPORT bool ExitOnAnyError; extern PGDLLIMPORT char *DataDir; extern PGDLLIMPORT int data_directory_mode; -extern PGDLLIMPORT int NBuffers; +extern PGDLLIMPORT int NBuffersGUC; +extern PGDLLIMPORT int MaxNBuffers; extern PGDLLIMPORT int MaxBackends; extern PGDLLIMPORT int MaxConnections; extern PGDLLIMPORT int max_worker_processes; extern PGDLLIMPORT int max_parallel_workers; extern PGDLLIMPORT int autovacuum_max_parallel_workers; +extern PGDLLIMPORT bool enable_dynamic_shared_buffers; + +/* + * GetMaxNBuffers + * + * Before DSB is introduced, PG does not recognize max_shared_buffers GUC. + * When max_shared_buffers is not set, it is resolved to NBuffersGUC. + */ +static inline int +GetMaxNBuffers(void) +{ + if (enable_dynamic_shared_buffers) + return MaxNBuffers; + return NBuffersGUC; +} extern PGDLLIMPORT int commit_timestamp_buffers; extern PGDLLIMPORT int multixact_member_buffers; diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 89615a254a3..1c5989c2472 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -584,9 +584,12 @@ extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_ring); -extern int StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc); +extern int StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc, + int *low_nbuffers); extern void StrategyNotifyBgWriter(int bgwprocno); +extern void StrategyReset(int old_size, int new_size); + /* buf_table.c */ extern uint32 BufTableHashCode(BufferTag *tagPtr); extern int BufTableLookup(BufferTag *tagPtr, uint32 hashcode); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 6837b35fc6d..3dbaf364133 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -14,11 +14,13 @@ #ifndef BUFMGR_H #define BUFMGR_H +#include "miscadmin.h" #include "port/pg_iovec.h" #include "storage/aio_types.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" +#include "storage/dynamic_shared_buffers.h" #include "storage/relfilelocator.h" #include "utils/relcache.h" #include "utils/snapmgr.h" @@ -159,7 +161,7 @@ typedef struct ReadBuffersOperation ReadBuffersOperation; typedef struct WritebackContext WritebackContext; /* in globals.c ... this duplicates miscadmin.h */ -extern PGDLLIMPORT int NBuffers; +extern PGDLLIMPORT int NBuffersGUC; /* in bufmgr.c */ extern PGDLLIMPORT bool zero_damaged_pages; @@ -371,6 +373,13 @@ extern void MarkDirtyAllUnpinnedBuffers(int32 *buffers_dirtied, int32 *buffers_already_dirty, int32 *buffers_skipped); +extern Size BufferManagerShmemExpand(int lowNBuffers, int highNBuffers, bool *success); +extern void BufferManagerShmemInitBuffers(int lowNBuffers, int highNBuffers); +extern Size BufferManagerShmemShrink(int lowNBuffers, int highNBuffers, bool *success); + +/* in bufmgr.c */ +extern bool EvictExtraBuffers(int lowNBuffers, int highNBuffers); + /* in localbuf.c */ extern void AtProcExit_LocalBuffers(void); @@ -418,7 +427,7 @@ extern void FreeAccessStrategy(BufferAccessStrategy strategy); static inline bool BufferIsValid(Buffer bufnum) { - Assert(bufnum <= NBuffers); + Assert(bufnum <= (Buffer) GetMaxNBuffers()); Assert(bufnum >= -NLocBuffer); return bufnum != InvalidBuffer; diff --git a/src/include/storage/dynamic_shared_buffers.h b/src/include/storage/dynamic_shared_buffers.h new file mode 100644 index 00000000000..9d57b25e15d --- /dev/null +++ b/src/include/storage/dynamic_shared_buffers.h @@ -0,0 +1,103 @@ +/*------------------------------------------------------------------------- + * + * dynamic_shared_buffers.h + * Dynamic shared buffer (DSB) coordination state and helpers. + * + * This header collects the neon-specific machinery that lets shared_buffers + * grow and shrink at runtime. + * + * See pgxn/neon/README.md ("Dynamic shared buffer") for the full design and + * the resize protocol. + * + * src/include/storage/dynamic_shared_buffers.h + * + *------------------------------------------------------------------------- + */ +#ifndef DYNAMIC_SHARED_BUFFERS_H +#define DYNAMIC_SHARED_BUFFERS_H + +#include "port/atomics.h" +#include "storage/lwlock.h" +#include "storage/spin.h" + +/* + * Minimum allowed value of the shared_buffers GUC, and the smallest size that + * pg_resize_shared_buffers() can shrink to. + */ +#define MIN_SHARED_BUFFERS 16 + +/* + * DynamicSharedBuffersControl is shared between backends and helps to + * coordinate shared buffer pool resize. See pgxn/neon/README.md + * ("Dynamic shared buffer") for the full design and protocol. + */ +typedef struct +{ + /* + * When a resize is in progress, `resize_in_progress` is set and + * `coordinator_pid` is the PID of the backend performing the resize. + */ + bool resize_in_progress; + pid_t coordinator_pid; + slock_t coordinator_lock; /* protects the two fields above */ + + pg_atomic_uint32 lowNBuffers; /* low water mark: backends allocate + * buffers from [0, lowNBuffers). */ + pg_atomic_uint32 highNBuffers; /* high water mark: buffer descriptor + * memory in [0, highNBuffers) is + * allocated and initialized. */ + LWLock AccessNBuffersLock; /* Backends hold this in shared mode while + * iterating the buffer pool up to + * highNBuffers; the resize coordinator + * acquires it exclusively to mutate the + * buffer pool memory and publish a new + * highNBuffers. */ +} DynamicSharedBuffersControl; + +extern PGDLLIMPORT DynamicSharedBuffersControl *DSBCtrl; + +extern PGDLLIMPORT int NBuffersGUC; + +extern bool IsProcSignalInitialized(void); + +/* + * GetHighNBuffers returns the high water mark. + */ +static inline int +GetHighNBuffers(void) +{ + if (DSBCtrl == NULL) + return NBuffersGUC; + return pg_atomic_read_u32(&DSBCtrl->highNBuffers); +} + +/* + * GetLowNBuffers returns the low water mark. + */ +static inline int +GetLowNBuffers(void) +{ + if (DSBCtrl == NULL) + return NBuffersGUC; + /* + * This must only be called from a process that is subscribed to the + * SHBUF_RESIZE ProcSignal barrier (i.e. one that has finished + * ProcSignalInit) -- otherwise a concurrent + * shrink could free the buffer memory in [new_low, old_low) without + * waiting for us. + */ + Assert(IsProcSignalInitialized()); + return pg_atomic_read_u32(&DSBCtrl->lowNBuffers); +} + +extern void DSBControlInit(void); + +/* + * Try to claim coordinator status for a buffer-pool resize. Returns true if + * we became the coordinator (caller must eventually call + * ReleaseResizeCoordinator()), false if a resize was already in progress. + */ +extern bool ClaimResizeCoordinator(void); +extern void ReleaseResizeCoordinator(void); + +#endif /* DYNAMIC_SHARED_BUFFERS_H */ diff --git a/src/include/storage/ipc.h b/src/include/storage/ipc.h index b205b00e7a1..728bedd1cd8 100644 --- a/src/include/storage/ipc.h +++ b/src/include/storage/ipc.h @@ -64,6 +64,47 @@ typedef void (*shmem_startup_hook_type) (void); /* ipc.c */ extern PGDLLIMPORT bool proc_exit_inprogress; extern PGDLLIMPORT bool shmem_exit_inprogress; +extern int AcquireNBuffersLock(void); +extern void ReleaseNBuffersLock(void); + +/*---------- + * BEGIN_NBUFFERS_ACCESS / END_NBUFFERS_ACCESS + * + * The lock is released at scope exit via __attribute__((cleanup)), so: + * - early `return`, `break`, or `goto` between BEGIN and END does NOT leak. + * - END_NBUFFERS_ACCESS(name) is idempotent: it releases the lock and sets + * a sentinel so the cleanup at scope exit skips a second release. Use it + * when you want to drop the lock before the enclosing block ends. + * - ereport(ERROR) bypasses the cleanup attribute, but LWLockReleaseAll() + * during transaction abort still releases AccessNBuffersLock, so no leak. + * + * Usage: + * BEGIN_NBUFFERS_ACCESS(localNBuffers); + * for (int i = 0; i < localNBuffers; i++) + * ... use buffer i ... + * END_NBUFFERS_ACCESS(localNBuffers); + * + *---------- + */ +static inline void +nbuffers_lock_auto_release(const bool *released) +{ + if (!*released) + ReleaseNBuffersLock(); +} + +#define BEGIN_NBUFFERS_ACCESS(name) \ + bool name##_released __attribute__((cleanup(nbuffers_lock_auto_release))) = false; \ + int name = AcquireNBuffersLock() + +#define END_NBUFFERS_ACCESS(name) \ + do { \ + if (!name##_released) \ + { \ + ReleaseNBuffersLock(); \ + name##_released = true; \ + } \ + } while (0) pg_noreturn extern void proc_exit(int code); extern void shmem_exit(int code); diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index d7eb648bd27..6c9b47bc368 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -140,3 +140,4 @@ PG_LWLOCKTRANCHE(XACT_SLRU, XactSLRU) PG_LWLOCKTRANCHE(PARALLEL_VACUUM_DSA, ParallelVacuumDSA) PG_LWLOCKTRANCHE(AIO_URING_COMPLETION, AioUringCompletion) PG_LWLOCKTRANCHE(SHMEM_INDEX, ShmemIndex) +PG_LWLOCKTRANCHE(ACCESS_NBUFFERS, AccessNBuffers) diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index aaa158bfd66..e496c0bca7c 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -54,6 +54,7 @@ typedef enum PROCSIGNAL_BARRIER_CHECKSUM_INPROGRESS_ON, PROCSIGNAL_BARRIER_CHECKSUM_INPROGRESS_OFF, PROCSIGNAL_BARRIER_CHECKSUM_ON, + PROCSIGNAL_BARRIER_SHBUF_RESIZE, /* shared buffer resize barrier */ } ProcSignalBarrierType; /* @@ -70,6 +71,7 @@ typedef enum * prototypes for functions in procsignal.c */ extern void ProcSignalInit(const uint8 *cancel_key, int cancel_key_len); +extern bool IsProcSignalInitialized(void); extern int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber); extern void SendCancelRequest(int backendPID, const uint8 *cancel_key, int cancel_key_len); diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 132b56a5864..a748e7bb626 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -152,7 +152,7 @@ select count(*) = 0 as ok from pg_stat_recovery; -- This is to record the prevailing planner enable_foo settings during -- a regression test run. -select name, setting from pg_settings where name like 'enable%'; +select name, setting from pg_settings where name like 'enable%' and name <> 'enable_dynamic_shared_buffers'; name | setting --------------------------------+--------- enable_async_append | on diff --git a/src/test/regress/sql/sysviews.sql b/src/test/regress/sql/sysviews.sql index 507e400ad4a..5366f8649a3 100644 --- a/src/test/regress/sql/sysviews.sql +++ b/src/test/regress/sql/sysviews.sql @@ -81,7 +81,7 @@ select count(*) = 0 as ok from pg_stat_recovery; -- This is to record the prevailing planner enable_foo settings during -- a regression test run. -select name, setting from pg_settings where name like 'enable%'; +select name, setting from pg_settings where name like 'enable%' and name <> 'enable_dynamic_shared_buffers'; -- There are always wait event descriptions for various types. InjectionPoint -- may be present or absent, depending on history since last postmaster start. diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8cf40c87043..140bed5bcfd 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -715,6 +715,7 @@ DumpableObject DumpableObjectType DumpableObjectWithAcl DynamicFileList +DynamicSharedBuffersControl DynamicZoneAbbrev ECDerivesEntry ECDerivesKey base-commit: 0392fb900eb89f52988cccd33046443c39c70d1c -- 2.54.0