From f6cb591ba520351ab7f0e7cbf9d6df3dacda6b44 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Sat, 22 Jul 2023 17:31:54 +1200 Subject: [PATCH v3 1/2] Streaming Read API --- contrib/pg_prewarm/pg_prewarm.c | 40 +- src/backend/access/transam/xlogutils.c | 2 +- src/backend/postmaster/bgwriter.c | 8 +- src/backend/postmaster/checkpointer.c | 15 +- src/backend/storage/Makefile | 2 +- src/backend/storage/aio/Makefile | 14 + src/backend/storage/aio/meson.build | 5 + src/backend/storage/aio/streaming_read.c | 435 ++++++++++++++++++ src/backend/storage/buffer/bufmgr.c | 560 +++++++++++++++-------- src/backend/storage/buffer/localbuf.c | 14 +- src/backend/storage/meson.build | 1 + src/backend/storage/smgr/smgr.c | 49 +- src/include/storage/bufmgr.h | 22 + src/include/storage/smgr.h | 4 +- src/include/storage/streaming_read.h | 45 ++ src/include/utils/rel.h | 6 - src/tools/pgindent/typedefs.list | 2 + 17 files changed, 986 insertions(+), 238 deletions(-) create mode 100644 src/backend/storage/aio/Makefile create mode 100644 src/backend/storage/aio/meson.build create mode 100644 src/backend/storage/aio/streaming_read.c create mode 100644 src/include/storage/streaming_read.h diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c index 8541e4d6e4..9617bf130b 100644 --- a/contrib/pg_prewarm/pg_prewarm.c +++ b/contrib/pg_prewarm/pg_prewarm.c @@ -20,6 +20,7 @@ #include "miscadmin.h" #include "storage/bufmgr.h" #include "storage/smgr.h" +#include "storage/streaming_read.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -38,6 +39,25 @@ typedef enum static PGIOAlignedBlock blockbuffer; +struct pg_prewarm_streaming_read_private +{ + BlockNumber blocknum; + int64 last_block; +}; + +static BlockNumber +pg_prewarm_streaming_read_next(PgStreamingRead *pgsr, + void *pgsr_private, + void *per_buffer_data) +{ + struct pg_prewarm_streaming_read_private *p = pgsr_private; + + if (p->blocknum <= p->last_block) + return p->blocknum++; + + return InvalidBlockNumber; +} + /* * pg_prewarm(regclass, mode text, fork text, * first_block int8, last_block int8) @@ -183,18 +203,36 @@ pg_prewarm(PG_FUNCTION_ARGS) } else if (ptype == PREWARM_BUFFER) { + struct pg_prewarm_streaming_read_private p; + PgStreamingRead *pgsr; + /* * In buffer mode, we actually pull the data into shared_buffers. */ + + /* Set up the private state for our streaming buffer read callback. */ + p.blocknum = first_block; + p.last_block = last_block; + + pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT, + &p, + 0, + NULL, + BMR_REL(rel), + forkNumber, + pg_prewarm_streaming_read_next); + for (block = first_block; block <= last_block; ++block) { Buffer buf; CHECK_FOR_INTERRUPTS(); - buf = ReadBufferExtended(rel, forkNumber, block, RBM_NORMAL, NULL); + buf = pg_streaming_read_buffer_get_next(pgsr, NULL); ReleaseBuffer(buf); ++blocks_done; } + Assert(pg_streaming_read_buffer_get_next(pgsr, NULL) == InvalidBuffer); + pg_streaming_read_free(pgsr); } /* Close relation, release lock. */ diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index aa8667abd1..8775b5789b 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -657,7 +657,7 @@ XLogDropDatabase(Oid dbid) * This is unnecessarily heavy-handed, as it will close SMgrRelation * objects for other databases as well. DROP DATABASE occurs seldom enough * that it's not worth introducing a variant of smgrclose for just this - * purpose. XXX: Or should we rather leave the smgr entries dangling? + * purpose. */ smgrcloseall(); diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index d7d6cc0cd7..13e5376619 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -246,10 +246,12 @@ BackgroundWriterMain(void) if (FirstCallSinceLastCheckpoint()) { /* - * After any checkpoint, close all smgr files. This is so we - * won't hang onto smgr references to deleted files indefinitely. + * After any checkpoint, free all smgr objects. Otherwise we + * would never do so for dropped relations, as the bgwriter does + * not process shared invalidation messages or call + * AtEOXact_SMgr(). */ - smgrcloseall(); + smgrdestroyall(); } /* diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 5e949fc885..5d843b6142 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -469,10 +469,12 @@ CheckpointerMain(void) ckpt_performed = CreateRestartPoint(flags); /* - * After any checkpoint, close all smgr files. This is so we - * won't hang onto smgr references to deleted files indefinitely. + * After any checkpoint, free all smgr objects. Otherwise we + * would never do so for dropped relations, as the checkpointer + * does not process shared invalidation messages or call + * AtEOXact_SMgr(). */ - smgrcloseall(); + smgrdestroyall(); /* * Indicate checkpoint completion to any waiting backends. @@ -958,11 +960,8 @@ RequestCheckpoint(int flags) */ CreateCheckPoint(flags | CHECKPOINT_IMMEDIATE); - /* - * After any checkpoint, close all smgr files. This is so we won't - * hang onto smgr references to deleted files indefinitely. - */ - smgrcloseall(); + /* Free all smgr objects, as CheckpointerMain() normally would. */ + smgrdestroyall(); return; } diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile index 8376cdfca2..eec03f6f2b 100644 --- a/src/backend/storage/Makefile +++ b/src/backend/storage/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/storage top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync +SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile new file mode 100644 index 0000000000..bcab44c802 --- /dev/null +++ b/src/backend/storage/aio/Makefile @@ -0,0 +1,14 @@ +# +# Makefile for storage/aio +# +# src/backend/storage/aio/Makefile +# + +subdir = src/backend/storage/aio +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + streaming_read.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build new file mode 100644 index 0000000000..39aef2a84a --- /dev/null +++ b/src/backend/storage/aio/meson.build @@ -0,0 +1,5 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +backend_sources += files( + 'streaming_read.c', +) diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c new file mode 100644 index 0000000000..19605090fe --- /dev/null +++ b/src/backend/storage/aio/streaming_read.c @@ -0,0 +1,435 @@ +#include "postgres.h" + +#include "storage/streaming_read.h" +#include "utils/rel.h" + +/* + * Element type for PgStreamingRead's circular array of block ranges. + * + * For hits, need_to_complete is false and there is just one block per + * range, already pinned and ready for use. + * + * For misses, need_to_complete is true and buffers[] holds a range of + * blocks that are contiguous in storage (though the buffers may not be + * contiguous in memory), so we can complete them with a single call to + * CompleteReadBuffers(). + */ +typedef struct PgStreamingReadRange +{ + bool advice_issued; + bool need_complete; + BlockNumber blocknum; + int nblocks; + int per_buffer_data_index[MAX_BUFFERS_PER_TRANSFER]; + Buffer buffers[MAX_BUFFERS_PER_TRANSFER]; +} PgStreamingReadRange; + +struct PgStreamingRead +{ + int max_ios; + int ios_in_progress; + int ios_in_progress_trigger; + int max_pinned_buffers; + int pinned_buffers; + int pinned_buffers_trigger; + int next_tail_buffer; + bool finished; + void *pgsr_private; + PgStreamingReadBufferCB callback; + BufferAccessStrategy strategy; + BufferManagerRelation bmr; + ForkNumber forknum; + + bool advice_enabled; + + /* Next expected block, for detecting sequential access. */ + BlockNumber seq_blocknum; + + /* Space for optional per-buffer private data. */ + size_t per_buffer_data_size; + void *per_buffer_data; + int per_buffer_data_next; + + /* Circular buffer of ranges. */ + int size; + int head; + int tail; + PgStreamingReadRange ranges[FLEXIBLE_ARRAY_MEMBER]; +}; + +static PgStreamingRead * +pg_streaming_read_buffer_alloc_internal(int flags, + void *pgsr_private, + size_t per_buffer_data_size, + BufferAccessStrategy strategy) +{ + PgStreamingRead *pgsr; + int size; + int max_ios; + uint32 max_pinned_buffers; + + + /* + * Decide how many assumed I/Os we will allow to run concurrently. That + * is, advice to the kernel to tell it that we will soon read. This + * number also affects how far we look ahead for opportunities to start + * more I/Os. + */ + if (flags & PGSR_FLAG_MAINTENANCE) + max_ios = maintenance_io_concurrency; + else + max_ios = effective_io_concurrency; + + /* + * The desired level of I/O concurrency controls how far ahead we are + * willing to look ahead. We also clamp it to at least + * MAX_BUFFER_PER_TRANFER so that we can have a chance to build up a full + * sized read, even when max_ios is zero. + */ + max_pinned_buffers = Max(max_ios * 4, MAX_BUFFERS_PER_TRANSFER); + + /* + * The *_io_concurrency GUCs, we might have 0. We want to allow at least + * one, to keep our gating logic simple. + */ + max_ios = Max(max_ios, 1); + + /* + * Don't allow this backend to pin too many buffers. For now we'll apply + * the limit for the shared buffer pool and the local buffer pool, without + * worrying which it is. + */ + LimitAdditionalPins(&max_pinned_buffers); + LimitAdditionalLocalPins(&max_pinned_buffers); + Assert(max_pinned_buffers > 0); + + /* + * pgsr->ranges is a circular buffer. When it is empty, head == tail. + * When it is full, there is an empty element between head and tail. Head + * can also be empty (nblocks == 0), therefore we need two extra elements + * for non-occupied ranges, on top of max_pinned_buffers to allow for the + * maxmimum possible number of occupied ranges of the smallest possible + * size of one. + */ + size = max_pinned_buffers + 2; + + pgsr = (PgStreamingRead *) + palloc0(offsetof(PgStreamingRead, ranges) + + sizeof(pgsr->ranges[0]) * size); + + pgsr->max_ios = max_ios; + pgsr->per_buffer_data_size = per_buffer_data_size; + pgsr->max_pinned_buffers = max_pinned_buffers; + pgsr->pgsr_private = pgsr_private; + pgsr->strategy = strategy; + pgsr->size = size; + +#ifdef USE_PREFETCH + + /* + * This system supports prefetching advice. As long as direct I/O isn't + * enabled, and the caller hasn't promised sequential access, we can use + * it. + */ + if ((io_direct_flags & IO_DIRECT_DATA) == 0 && + (flags & PGSR_FLAG_SEQUENTIAL) == 0) + pgsr->advice_enabled = true; +#endif + + /* + * We want to avoid creating ranges that are smaller than they could be + * just because we hit max_pinned_buffers. We only look ahead when the + * number of pinned buffers falls below this trigger number, or put + * another way, we stop looking ahead when we wouldn't be able to build a + * "full sized" range. + */ + pgsr->pinned_buffers_trigger = + Max(1, (int) max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER); + + /* Space the callback to store extra data along with each block. */ + if (per_buffer_data_size) + pgsr->per_buffer_data = palloc(per_buffer_data_size * max_pinned_buffers); + + return pgsr; +} + +/* + * Create a new streaming read object that can be used to perform the + * equivalent of a series of ReadBuffer() calls for one fork of one relation. + * Internally, it generates larger vectored reads where possible by looking + * ahead. + */ +PgStreamingRead * +pg_streaming_read_buffer_alloc(int flags, + void *pgsr_private, + size_t per_buffer_data_size, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + PgStreamingReadBufferCB next_block_cb) +{ + PgStreamingRead *result; + + result = pg_streaming_read_buffer_alloc_internal(flags, + pgsr_private, + per_buffer_data_size, + strategy); + result->callback = next_block_cb; + result->bmr = bmr; + result->forknum = forknum; + + return result; +} + +/* + * Start building a new range. This is called after the previous one + * reached maximum size, or the callback's next block can't be merged with it. + * + * Since the previous head range has now reached its full potential size, this + * is also a good time to issue 'prefetch' advice, because we know that'll + * soon be reading. In future, we could start an actual I/O here. + */ +static PgStreamingReadRange * +pg_streaming_read_new_range(PgStreamingRead *pgsr) +{ + PgStreamingReadRange *head_range; + + head_range = &pgsr->ranges[pgsr->head]; + Assert(head_range->nblocks > 0); + + /* + * If a call to CompleteReadBuffers() will be needed, and we can issue + * advice to the kernel to get the read started. We suppress it if the + * access pattern appears to be completely sequential, though, because on + * some systems that interfers with the kernel's own sequential read ahead + * heurstics and hurts performance. + */ + if (pgsr->advice_enabled) + { + BlockNumber blocknum = head_range->blocknum; + int nblocks = head_range->nblocks; + + if (head_range->need_complete && blocknum != pgsr->seq_blocknum) + { + SMgrRelation smgr = + pgsr->bmr.smgr ? pgsr->bmr.smgr : + RelationGetSmgr(pgsr->bmr.rel); + + Assert(!head_range->advice_issued); + + smgrprefetch(smgr, pgsr->forknum, blocknum, nblocks); + + /* + * Count this as an I/O that is concurrently in progress, though + * we don't really know if the kernel generates a physical I/O. + */ + head_range->advice_issued = true; + pgsr->ios_in_progress++; + } + + /* Remember the block after this range, for sequence detection. */ + pgsr->seq_blocknum = blocknum + nblocks; + } + + /* Create a new head range. There must be space. */ + Assert(pgsr->size > pgsr->max_pinned_buffers); + Assert((pgsr->head + 1) % pgsr->size != pgsr->tail); + if (++pgsr->head == pgsr->size) + pgsr->head = 0; + head_range = &pgsr->ranges[pgsr->head]; + head_range->nblocks = 0; + + return head_range; +} + +static void +pg_streaming_read_look_ahead(PgStreamingRead *pgsr) +{ + /* + * If we're finished or can't start more I/O, then don't look ahead. + */ + if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios) + return; + + /* + * We'll also wait until the number of pinned buffers falls below our + * trigger level, so that we have the chance to create a full range. + */ + if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger) + return; + + do + { + BufferManagerRelation bmr; + ForkNumber forknum; + BlockNumber blocknum; + Buffer buffer; + bool found; + bool need_complete; + PgStreamingReadRange *head_range; + void *per_buffer_data; + + /* Do we have a full-sized range? */ + head_range = &pgsr->ranges[pgsr->head]; + if (head_range->nblocks == lengthof(head_range->buffers)) + { + Assert(head_range->need_complete); + head_range = pg_streaming_read_new_range(pgsr); + + /* + * Give up now if I/O is saturated, or we wouldn't be able form + * another full range after this due to the pin limit. + */ + if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger || + pgsr->ios_in_progress == pgsr->max_ios) + break; + } + + per_buffer_data = (char *) pgsr->per_buffer_data + + pgsr->per_buffer_data_size * pgsr->per_buffer_data_next; + + /* Find out which block the callback wants to read next. */ + blocknum = pgsr->callback(pgsr, pgsr->pgsr_private, per_buffer_data); + if (blocknum == InvalidBlockNumber) + { + pgsr->finished = true; + break; + } + bmr = pgsr->bmr; + forknum = pgsr->forknum; + + Assert(pgsr->pinned_buffers < pgsr->max_pinned_buffers); + + buffer = PrepareReadBuffer(bmr, + forknum, + blocknum, + pgsr->strategy, + &found); + pgsr->pinned_buffers++; + + need_complete = !found; + + /* Is there a head range that we can't extend? */ + head_range = &pgsr->ranges[pgsr->head]; + if (head_range->nblocks > 0 && + (!need_complete || + !head_range->need_complete || + head_range->blocknum + head_range->nblocks != blocknum)) + { + /* Yes, time to start building a new one. */ + head_range = pg_streaming_read_new_range(pgsr); + Assert(head_range->nblocks == 0); + } + + if (head_range->nblocks == 0) + { + /* Initialize a new range beginning at this block. */ + head_range->blocknum = blocknum; + head_range->need_complete = need_complete; + head_range->advice_issued = false; + } + else + { + /* We can extend an existing range by one block. */ + Assert(head_range->blocknum + head_range->nblocks == blocknum); + Assert(head_range->need_complete); + } + + head_range->per_buffer_data_index[head_range->nblocks] = pgsr->per_buffer_data_next++; + head_range->buffers[head_range->nblocks] = buffer; + head_range->nblocks++; + + if (pgsr->per_buffer_data_next == pgsr->max_pinned_buffers) + pgsr->per_buffer_data_next = 0; + + } while (pgsr->pinned_buffers < pgsr->max_pinned_buffers && + pgsr->ios_in_progress < pgsr->max_ios); + + if (pgsr->ranges[pgsr->head].nblocks > 0) + pg_streaming_read_new_range(pgsr); +} + +Buffer +pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_data) +{ + pg_streaming_read_look_ahead(pgsr); + + /* See if we have one buffer to return. */ + while (pgsr->tail != pgsr->head) + { + PgStreamingReadRange *tail_range; + + tail_range = &pgsr->ranges[pgsr->tail]; + + /* + * Do we need to perform an I/O before returning the buffers from this + * range? + */ + if (tail_range->need_complete) + { + CompleteReadBuffers(pgsr->bmr, + tail_range->buffers, + pgsr->forknum, + tail_range->blocknum, + tail_range->nblocks, + false, + pgsr->strategy); + tail_range->need_complete = false; + + /* + * We don't really know if the kernel generated an physical I/O + * when we issued advice, let alone when it finished, but it has + * certainly finished after a read call returns. + */ + if (tail_range->advice_issued) + pgsr->ios_in_progress--; + } + + /* Are there more buffers available in this range? */ + if (pgsr->next_tail_buffer < tail_range->nblocks) + { + int buffer_index; + Buffer buffer; + + buffer_index = pgsr->next_tail_buffer++; + buffer = tail_range->buffers[buffer_index]; + + Assert(BufferIsValid(buffer)); + + /* We are giving away ownership of this pinned buffer. */ + Assert(pgsr->pinned_buffers > 0); + pgsr->pinned_buffers--; + + if (per_buffer_data) + *per_buffer_data = (char *) pgsr->per_buffer_data + + tail_range->per_buffer_data_index[buffer_index] * + pgsr->per_buffer_data_size; + + return buffer; + } + + /* Advance tail to next range, if there is one. */ + if (++pgsr->tail == pgsr->size) + pgsr->tail = 0; + pgsr->next_tail_buffer = 0; + } + + Assert(pgsr->pinned_buffers == 0); + + return InvalidBuffer; +} + +void +pg_streaming_read_free(PgStreamingRead *pgsr) +{ + Buffer buffer; + + /* Stop looking ahead, and unpin anything that wasn't consumed. */ + pgsr->finished = true; + while ((buffer = pg_streaming_read_buffer_get_next(pgsr, NULL)) != InvalidBuffer) + ReleaseBuffer(buffer); + + if (pgsr->per_buffer_data) + pfree(pgsr->per_buffer_data); + pfree(pgsr); +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 7d601bef6d..2157a97b97 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -472,7 +472,7 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref) ) -static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, +static Buffer ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool *hit); @@ -501,7 +501,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); -static bool StartBufferIO(BufferDesc *buf, bool forInput); +static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, bool forget_owner); static void AbortBufferIO(Buffer buffer); @@ -795,15 +795,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot access temporary tables of other sessions"))); - /* - * Read the buffer, and update pgstat counters to reflect a cache hit or - * miss. - */ - pgstat_count_buffer_read(reln); - buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence, + buf = ReadBuffer_common(BMR_REL(reln), forkNum, blockNum, mode, strategy, &hit); - if (hit) - pgstat_count_buffer_hit(reln); + return buf; } @@ -827,8 +821,9 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, SMgrRelation smgr = smgropen(rlocator, InvalidBackendId); - return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT : - RELPERSISTENCE_UNLOGGED, forkNum, blockNum, + return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT : + RELPERSISTENCE_UNLOGGED), + forkNum, blockNum, mode, strategy, &hit); } @@ -1002,7 +997,7 @@ ExtendBufferedRelTo(BufferManagerRelation bmr, bool hit; Assert(extended_by == 0); - buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence, + buffer = ReadBuffer_common(bmr, fork, extend_to - 1, mode, strategy, &hit); } @@ -1016,18 +1011,11 @@ ExtendBufferedRelTo(BufferManagerRelation bmr, * *hit is set to true if the request was satisfied from shared buffer cache. */ static Buffer -ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, +ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool *hit) { - BufferDesc *bufHdr; - Block bufBlock; - bool found; - IOContext io_context; - IOObject io_object; - bool isLocalBuf = SmgrIsTemp(smgr); - - *hit = false; + Buffer buffer; /* * Backward compatibility path, most code should use ExtendBufferedRel() @@ -1046,175 +1034,339 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) flags |= EB_LOCK_FIRST; - return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence), - forkNum, strategy, flags); + *hit = false; + + return ExtendBufferedRel(bmr, forkNum, strategy, flags); } - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend); + buffer = PrepareReadBuffer(bmr, + forkNum, + blockNum, + strategy, + hit); + + /* At this point we do NOT hold any locks. */ + if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK) + { + /* if we just want zeroes and a lock, we're done */ + ZeroBuffer(buffer, mode); + } + else if (!*hit) + { + /* we might need to perform I/O */ + CompleteReadBuffers(bmr, + &buffer, + forkNum, + blockNum, + 1, + mode == RBM_ZERO_ON_ERROR, + strategy); + } + + return buffer; +} + +/* + * Prepare to read a block. The buffer is pinned. If this is a 'hit', then + * the returned buffer can be used immediately. Otherwise, a physical read + * should be completed with CompleteReadBuffers(), or the buffer should be + * zeroed with ZeroBuffer(). PrepareReadBuffer() followed by + * CompleteReadBuffers() or ZeroBuffer() is equivalent to ReadBuffer(), but + * the caller has the opportunity to combine reads of multiple neighboring + * blocks into one CompleteReadBuffers() call. + * + * *foundPtr is set to true for a hit, and false for a miss. + */ +Buffer +PrepareReadBuffer(BufferManagerRelation bmr, + ForkNumber forkNum, + BlockNumber blockNum, + BufferAccessStrategy strategy, + bool *foundPtr) +{ + BufferDesc *bufHdr; + bool isLocalBuf; + IOContext io_context; + IOObject io_object; + + Assert(blockNum != P_NEW); + + if (bmr.rel) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; + } + + isLocalBuf = SmgrIsTemp(bmr.smgr); if (isLocalBuf) { - /* - * We do not use a BufferAccessStrategy for I/O of temporary tables. - * However, in some cases, the "strategy" may not be NULL, so we can't - * rely on IOContextForStrategy() to set the right IOContext for us. - * This may happen in cases like CREATE TEMPORARY TABLE AS... - */ io_context = IOCONTEXT_NORMAL; io_object = IOOBJECT_TEMP_RELATION; - bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found); - if (found) - pgBufferUsage.local_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.local_blks_read++; } else { - /* - * lookup the buffer. IO_IN_PROGRESS is set if the requested block is - * not currently in memory. - */ io_context = IOContextForStrategy(strategy); io_object = IOOBJECT_RELATION; - bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum, - strategy, &found, io_context); - if (found) - pgBufferUsage.shared_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.shared_blks_read++; } - /* At this point we do NOT hold any locks. */ + TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend); - /* if it was already in the buffer pool, we're done */ - if (found) + ResourceOwnerEnlarge(CurrentResourceOwner); + if (isLocalBuf) + { + bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr); + if (*foundPtr) + pgBufferUsage.local_blks_hit++; + } + else + { + bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum, + strategy, foundPtr, io_context); + if (*foundPtr) + pgBufferUsage.shared_blks_hit++; + } + if (bmr.rel) + { + /* + * While pgBufferUsage's "read" counter isn't bumped unless we reach + * CompleteReadBuffers() (so, not for hits, and not for buffers that + * are zeroed instead), the per-relation stats always count them. + */ + pgstat_count_buffer_read(bmr.rel); + if (*foundPtr) + pgstat_count_buffer_hit(bmr.rel); + } + if (*foundPtr) { - /* Just need to update stats before we exit */ - *hit = true; VacuumPageHit++; pgstat_count_io_op(io_object, io_context, IOOP_HIT); - if (VacuumCostActive) VacuumCostBalance += VacuumCostPageHit; TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + true); + } - /* - * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked - * on return. - */ - if (!isLocalBuf) - { - if (mode == RBM_ZERO_AND_LOCK) - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE); - else if (mode == RBM_ZERO_AND_CLEANUP_LOCK) - LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr)); - } + return BufferDescriptorGetBuffer(bufHdr); +} - return BufferDescriptorGetBuffer(bufHdr); +static inline bool +CompleteReadBuffersCanStartIO(Buffer buffer, bool nowait) +{ + if (BufferIsLocal(buffer)) + { + BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1); + + return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0; } + else + return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); +} - /* - * if we have gotten to this point, we have allocated a buffer for the - * page but its contents are not yet valid. IO_IN_PROGRESS is set for it, - * if it's a shared buffer. - */ - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ +/* + * Complete a set reads prepared with PrepareReadBuffers(). The buffers must + * cover a cluster of neighboring block numbers. + * + * Typically this performs one physical vector read covering the block range, + * but if some of the buffers have already been read in the meantime by any + * backend, zero or multiple reads may be performed. + */ +void +CompleteReadBuffers(BufferManagerRelation bmr, + Buffer *buffers, + ForkNumber forknum, + BlockNumber blocknum, + int nblocks, + bool zero_on_error, + BufferAccessStrategy strategy) +{ + bool isLocalBuf; + IOContext io_context; + IOObject io_object; - bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); + if (bmr.rel) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; + } + + isLocalBuf = SmgrIsTemp(bmr.smgr); + if (isLocalBuf) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(strategy); + io_object = IOOBJECT_RELATION; + } /* - * Read in the page, unless the caller intends to overwrite it and just - * wants us to allocate a buffer. + * We count all these blocks as read by this backend. This is traditional + * behavior, but might turn out to be not true if we find that someone + * else has beaten us and completed the read of some of these blocks. In + * that case the system globally double-counts, but we traditionally don't + * count this as a "hit", and we don't have a separate counter for "miss, + * but another backend completed the read". */ - if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) - MemSet((char *) bufBlock, 0, BLCKSZ); + if (isLocalBuf) + pgBufferUsage.local_blks_read += nblocks; else + pgBufferUsage.shared_blks_read += nblocks; + + for (int i = 0; i < nblocks; ++i) { - instr_time io_start = pgstat_prepare_io_time(track_io_timing); + int io_buffers_len; + Buffer io_buffers[MAX_BUFFERS_PER_TRANSFER]; + void *io_pages[MAX_BUFFERS_PER_TRANSFER]; + instr_time io_start; + BlockNumber io_first_block; - smgrread(smgr, forkNum, blockNum, bufBlock); +#ifdef USE_ASSERT_CHECKING - pgstat_count_io_op_time(io_object, io_context, - IOOP_READ, io_start, 1); + /* + * We could get all the information from buffer headers, but it can be + * expensive to access buffer header cache lines so we make the caller + * provide all the information we need, and assert that it is + * consistent. + */ + { + RelFileLocator xlocator; + ForkNumber xforknum; + BlockNumber xblocknum; + + BufferGetTag(buffers[i], &xlocator, &xforknum, &xblocknum); + Assert(RelFileLocatorEquals(bmr.smgr->smgr_rlocator.locator, xlocator)); + Assert(xforknum == forknum); + Assert(xblocknum == blocknum + i); + } +#endif + + /* + * Skip this block if someone else has already completed it. If an + * I/O is already in progress in another backend, this will wait for + * the outcome: either done, or something went wrong and we will + * retry. + */ + if (!CompleteReadBuffersCanStartIO(buffers[i], false)) + { + /* + * Report this as a 'hit' for this backend, even though it must + * have started out as a miss in PrepareReadBuffer(). + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + true); + continue; + } + + /* We found a buffer that we need to read in. */ + io_buffers[0] = buffers[i]; + io_pages[0] = BufferGetBlock(buffers[i]); + io_first_block = blocknum + i; + io_buffers_len = 1; - /* check for garbage data */ - if (!PageIsVerifiedExtended((Page) bufBlock, blockNum, - PIV_LOG_WARNING | PIV_REPORT_STAT)) + /* + * How many neighboring-on-disk blocks can we can scatter-read into + * other buffers at the same time? In this case we don't wait if we + * see an I/O already in progress. We already hold BM_IO_IN_PROGRESS + * for the head block, so we should get on with that I/O as soon as + * possible. We'll come back to this block again, above. + */ + while ((i + 1) < nblocks && + CompleteReadBuffersCanStartIO(buffers[i + 1], true)) + { + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i + 1]) == + BufferGetBlockNumber(buffers[i]) + 1); + + io_buffers[io_buffers_len] = buffers[++i]; + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); + } + + io_start = pgstat_prepare_io_time(track_io_timing); + smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, + io_buffers_len); + + /* Verify each block we read, and terminate the I/O. */ + for (int j = 0; j < io_buffers_len; ++j) { - if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages) + BufferDesc *bufHdr; + Block bufBlock; + + if (isLocalBuf) { - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s; zeroing out page", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); - MemSet((char *) bufBlock, 0, BLCKSZ); + bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1); + bufBlock = LocalBufHdrGetBlock(bufHdr); } else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); - } - } - - /* - * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer - * content lock before marking the page as valid, to make sure that no - * other backend sees the zeroed page before the caller has had a chance - * to initialize it. - * - * Since no-one else can be looking at the page contents yet, there is no - * difference between an exclusive lock and a cleanup-strength lock. (Note - * that we cannot use LockBuffer() or LockBufferForCleanup() here, because - * they assert that the buffer is already valid.) - */ - if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) && - !isLocalBuf) - { - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); - } + { + bufHdr = GetBufferDescriptor(io_buffers[j] - 1); + bufBlock = BufHdrGetBlock(bufHdr); + } - if (isLocalBuf) - { - /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + /* check for garbage data */ + if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j, + PIV_LOG_WARNING | PIV_REPORT_STAT)) + { + if (zero_on_error || zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s; zeroing out page", + io_first_block + j, + relpath(bmr.smgr->smgr_rlocator, forknum)))); + memset(bufBlock, 0, BLCKSZ); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s", + io_first_block + j, + relpath(bmr.smgr->smgr_rlocator, forknum)))); + } - buf_state |= BM_VALID; - pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); - } - else - { - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID, true); - } + /* Terminate I/O and set BM_VALID. */ + if (isLocalBuf) + { + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); - VacuumPageMiss++; - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss; + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + /* Set BM_VALID, terminate IO, and wake up any waiters */ + TerminateBufferIO(bufHdr, false, BM_VALID, true); + } - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); + /* Report I/Os as completing individually. */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + false); + } - return BufferDescriptorGetBuffer(bufHdr); + VacuumPageMiss += io_buffers_len; + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + } } /* @@ -1228,11 +1380,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * * The returned buffer is pinned and is already marked as holding the * desired page. If it already did have the desired page, *foundPtr is - * set true. Otherwise, *foundPtr is set false and the buffer is marked - * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it. - * - * *foundPtr is actually redundant with the buffer's BM_VALID flag, but - * we keep it for simplicity in ReadBuffer. + * set true. Otherwise, *foundPtr is set false. A read should be + * performed with CompleteReadBuffers(). * * io_context is passed as an output parameter to avoid calling * IOContextForStrategy() when there is a shared buffers hit and no IO @@ -1291,19 +1440,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called PrepareReadBuffer() but not yet CompleteReadBuffers(). */ - if (StartBufferIO(buf, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return buf; @@ -1368,19 +1508,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called PrepareReadBuffer() but not yet CompleteReadBuffers(). */ - if (StartBufferIO(existing_buf_hdr, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return existing_buf_hdr; @@ -1412,15 +1543,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, LWLockRelease(newPartitionLock); /* - * Buffer contents are currently invalid. Try to obtain the right to - * start I/O. If StartBufferIO returns false, then someone else managed - * to read it before we did, so there's nothing left for BufferAlloc() to - * do. + * Buffer contents are currently invalid. */ - if (StartBufferIO(victim_buf_hdr, true)) - *foundPtr = false; - else - *foundPtr = true; + *foundPtr = false; return victim_buf_hdr; } @@ -1774,7 +1899,7 @@ again: * pessimistic, but outside of toy-sized shared_buffers it should allow * sufficient pins. */ -static void +void LimitAdditionalPins(uint32 *additional_pins) { uint32 max_backends; @@ -2043,7 +2168,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, buf_state &= ~BM_VALID; UnlockBufHdr(existing_hdr, buf_state); - } while (!StartBufferIO(existing_hdr, true)); + } while (!StartBufferIO(existing_hdr, true, false)); } else { @@ -2066,7 +2191,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, LWLockRelease(partition_lock); /* XXX: could combine the locked operations in it with the above */ - StartBufferIO(victim_buf_hdr, true); + StartBufferIO(victim_buf_hdr, true, false); } } @@ -2381,7 +2506,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) else { /* - * If we previously pinned the buffer, it must surely be valid. + * If we previously pinned the buffer, it is likely to be valid, but + * it may not be if PrepareReadBuffer() was called and + * CompleteReadBuffers() hasn't been called yet. We'll check by + * loading the flags without locking. This is racy, but it's OK to + * return false spuriously: when CompleteReadBuffers() calls + * StartBufferIO(), it'll see that it's now valid. * * Note: We deliberately avoid a Valgrind client request here. * Individual access methods can optionally superimpose buffer page @@ -2390,7 +2520,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) * that the buffer page is legitimately non-accessible here. We * cannot meddle with that. */ - result = true; + result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0; } ref->refcount++; @@ -3458,7 +3588,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false)) + if (!StartBufferIO(buf, false, false)) return; /* Setup error traceback support for ereport() */ @@ -4845,6 +4975,46 @@ ConditionalLockBuffer(Buffer buffer) LW_EXCLUSIVE); } +/* + * Zero a buffer, and lock it as RBM_ZERO_AND_LOCK or + * RBM_ZERO_AND_CLEANUP_LOCK would. The buffer must be already pinned. It + * does not have to be valid, but it is valid and locked on return. + */ +void +ZeroBuffer(Buffer buffer, ReadBufferMode mode) +{ + BufferDesc *bufHdr; + uint32 buf_state; + + Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); + + if (BufferIsLocal(buffer)) + bufHdr = GetLocalBufferDescriptor(-buffer - 1); + else + { + bufHdr = GetBufferDescriptor(buffer - 1); + if (mode == RBM_ZERO_AND_LOCK) + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + else + LockBufferForCleanup(buffer); + } + + memset(BufferGetPage(buffer), 0, BLCKSZ); + + if (BufferIsLocal(buffer)) + { + buf_state = pg_atomic_read_u32(&bufHdr->state); + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + buf_state = LockBufHdr(bufHdr); + buf_state |= BM_VALID; + UnlockBufHdr(bufHdr, buf_state); + } +} + /* * Verify that this backend is pinning the buffer exactly once. * @@ -5197,9 +5367,15 @@ WaitIO(BufferDesc *buf) * * Returns true if we successfully marked the buffer as I/O busy, * false if someone else already did the work. + * + * If nowait is true, then we don't wait for an I/O to be finished by another + * backend. In that case, false indicates either that the I/O was already + * finished, or is still in progress. This is useful for callers that want to + * find out if they can perform the I/O as part of a larger operation, without + * waiting for the answer or distinguishing the reasons why not. */ static bool -StartBufferIO(BufferDesc *buf, bool forInput) +StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) { uint32 buf_state; @@ -5212,6 +5388,8 @@ StartBufferIO(BufferDesc *buf, bool forInput) if (!(buf_state & BM_IO_IN_PROGRESS)) break; UnlockBufHdr(buf, buf_state); + if (nowait) + return false; WaitIO(buf); } diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 1be4f4f8da..717b8f58da 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -109,10 +109,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, * LocalBufferAlloc - * Find or create a local buffer for the given page of the given relation. * - * API is similar to bufmgr.c's BufferAlloc, except that we do not need - * to do any locking since this is all local. Also, IO_IN_PROGRESS - * does not get set. Lastly, we support only default access strategy - * (hence, usage_count is always advanced). + * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do + * any locking since this is all local. We support only default access + * strategy (hence, usage_count is always advanced). */ BufferDesc * LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, @@ -288,7 +287,7 @@ GetLocalVictimBuffer(void) } /* see LimitAdditionalPins() */ -static void +void LimitAdditionalLocalPins(uint32 *additional_pins) { uint32 max_pins; @@ -298,9 +297,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins) /* * In contrast to LimitAdditionalPins() other backends don't play a role - * here. We can allow up to NLocBuffer pins in total. + * here. We can allow up to NLocBuffer pins in total, but it might not be + * initialized yet so read num_temp_buffers. */ - max_pins = (NLocBuffer - NLocalPinnedBuffers); + max_pins = (num_temp_buffers - NLocalPinnedBuffers); if (*additional_pins >= max_pins) *additional_pins = max_pins; diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build index 40345bdca2..739d13293f 100644 --- a/src/backend/storage/meson.build +++ b/src/backend/storage/meson.build @@ -1,5 +1,6 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group +subdir('aio') subdir('buffer') subdir('file') subdir('freespace') diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 563a0be5c7..0d7272e796 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -147,7 +147,9 @@ smgrshutdown(int code, Datum arg) /* * smgropen() -- Return an SMgrRelation object, creating it if need be. * - * This does not attempt to actually open the underlying file. + * This does not attempt to actually open the underlying files. The returned + * object remains valid at least until AtEOXact_SMgr() is called, or until + * smgrdestroy() is called in non-transaction backends. */ SMgrRelation smgropen(RelFileLocator rlocator, BackendId backend) @@ -259,10 +261,10 @@ smgrexists(SMgrRelation reln, ForkNumber forknum) } /* - * smgrclose() -- Close and delete an SMgrRelation object. + * smgrdestroy() -- Delete an SMgrRelation object. */ void -smgrclose(SMgrRelation reln) +smgrdestroy(SMgrRelation reln) { SMgrRelation *owner; ForkNumber forknum; @@ -289,12 +291,14 @@ smgrclose(SMgrRelation reln) } /* - * smgrrelease() -- Release all resources used by this object. + * smgrclose() -- Release all resources used by this object. * - * The object remains valid. + * The object remains valid, but is moved to the unknown list where it will + * be destroyed by AtEOXact_SMgr(). It may be re-owned if it is accessed by a + * relation before then. */ void -smgrrelease(SMgrRelation reln) +smgrclose(SMgrRelation reln) { for (ForkNumber forknum = 0; forknum <= MAX_FORKNUM; forknum++) { @@ -302,15 +306,20 @@ smgrrelease(SMgrRelation reln) reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber; } reln->smgr_targblock = InvalidBlockNumber; + + if (reln->smgr_owner) + { + *reln->smgr_owner = NULL; + reln->smgr_owner = NULL; + dlist_push_tail(&unowned_relns, &reln->node); + } } /* - * smgrreleaseall() -- Release resources used by all objects. - * - * This is called for PROCSIGNAL_BARRIER_SMGRRELEASE. + * smgrcloseall() -- Close all objects. */ void -smgrreleaseall(void) +smgrcloseall(void) { HASH_SEQ_STATUS status; SMgrRelation reln; @@ -322,14 +331,17 @@ smgrreleaseall(void) hash_seq_init(&status, SMgrRelationHash); while ((reln = (SMgrRelation) hash_seq_search(&status)) != NULL) - smgrrelease(reln); + smgrclose(reln); } /* - * smgrcloseall() -- Close all existing SMgrRelation objects. + * smgrdestroyall() -- Destroy all SMgrRelation objects. + * + * It must be known that there are no pointers to SMgrRelations, other than + * those registered with smgrsetowner(). */ void -smgrcloseall(void) +smgrdestroyall(void) { HASH_SEQ_STATUS status; SMgrRelation reln; @@ -341,7 +353,7 @@ smgrcloseall(void) hash_seq_init(&status, SMgrRelationHash); while ((reln = (SMgrRelation) hash_seq_search(&status)) != NULL) - smgrclose(reln); + smgrdestroy(reln); } /* @@ -733,7 +745,8 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum) * AtEOXact_SMgr * * This routine is called during transaction commit or abort (it doesn't - * particularly care which). All transient SMgrRelation objects are closed. + * particularly care which). All transient SMgrRelation objects are + * destroyed. * * We do this as a compromise between wanting transient SMgrRelations to * live awhile (to amortize the costs of blind writes of multiple blocks) @@ -747,7 +760,7 @@ AtEOXact_SMgr(void) dlist_mutable_iter iter; /* - * Zap all unowned SMgrRelations. We rely on smgrclose() to remove each + * Zap all unowned SMgrRelations. We rely on smgrdestroy() to remove each * one from the list. */ dlist_foreach_modify(iter, &unowned_relns) @@ -757,7 +770,7 @@ AtEOXact_SMgr(void) Assert(rel->smgr_owner == NULL); - smgrclose(rel); + smgrdestroy(rel); } } @@ -768,6 +781,6 @@ AtEOXact_SMgr(void) bool ProcessBarrierSmgrRelease(void) { - smgrreleaseall(); + smgrcloseall(); return true; } diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index d51d46d335..a38f1acb37 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -14,6 +14,7 @@ #ifndef BUFMGR_H #define BUFMGR_H +#include "port/pg_iovec.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" @@ -158,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount; #define BUFFER_LOCK_SHARE 1 #define BUFFER_LOCK_EXCLUSIVE 2 +/* + * Maximum number of buffers for multi-buffer I/O functions. This is set to + * allow 128kB transfers, unless BLCKSZ and IOV_MAX imply a a smaller maximum. + */ +#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ) /* * prototypes for functions in bufmgr.c @@ -177,6 +183,18 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent); +extern Buffer PrepareReadBuffer(BufferManagerRelation bmr, + ForkNumber forkNum, + BlockNumber blockNum, + BufferAccessStrategy strategy, + bool *foundPtr); +extern void CompleteReadBuffers(BufferManagerRelation bmr, + Buffer *buffers, + ForkNumber forknum, + BlockNumber blocknum, + int nblocks, + bool zero_on_error, + BufferAccessStrategy strategy); extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); extern bool BufferIsExclusiveLocked(Buffer buffer); @@ -247,9 +265,13 @@ extern void LockBufferForCleanup(Buffer buffer); extern bool ConditionalLockBufferForCleanup(Buffer buffer); extern bool IsBufferCleanupOK(Buffer buffer); extern bool HoldingBufferPinThatDelaysRecovery(void); +extern void ZeroBuffer(Buffer buffer, ReadBufferMode mode); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern void LimitAdditionalPins(uint32 *additional_pins); +extern void LimitAdditionalLocalPins(uint32 *additional_pins); + /* in buf_init.c */ extern void InitBufferPool(void); extern Size BufferShmemSize(void); diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 527cd2a056..d8ffe397fa 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -85,8 +85,8 @@ extern void smgrclearowner(SMgrRelation *owner, SMgrRelation reln); extern void smgrclose(SMgrRelation reln); extern void smgrcloseall(void); extern void smgrcloserellocator(RelFileLocatorBackend rlocator); -extern void smgrrelease(SMgrRelation reln); -extern void smgrreleaseall(void); +extern void smgrdestroy(SMgrRelation reln); +extern void smgrdestroyall(void); extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo); extern void smgrdosyncall(SMgrRelation *rels, int nrels); extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo); diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h new file mode 100644 index 0000000000..40c3408c54 --- /dev/null +++ b/src/include/storage/streaming_read.h @@ -0,0 +1,45 @@ +#ifndef STREAMING_READ_H +#define STREAMING_READ_H + +#include "storage/bufmgr.h" +#include "storage/fd.h" +#include "storage/smgr.h" + +/* Default tuning, reasonable for many users. */ +#define PGSR_FLAG_DEFAULT 0x00 + +/* + * I/O streams that are performing maintenance work on behalf of potentially + * many users. + */ +#define PGSR_FLAG_MAINTENANCE 0x01 + +/* + * We usually avoid issuing prefetch advice automatically when sequential + * access is detected, but this flag explicitly disables it, for cases that + * might not be correctly detected. Explicit advice is known to perform worse + * than letting the kernel (at least Linux) detect sequential access. + */ +#define PGSR_FLAG_SEQUENTIAL 0x02 + +struct PgStreamingRead; +typedef struct PgStreamingRead PgStreamingRead; + +/* Callback that returns the next block number to read. */ +typedef BlockNumber (*PgStreamingReadBufferCB) (PgStreamingRead *pgsr, + void *pgsr_private, + void *per_buffer_private); + +extern PgStreamingRead *pg_streaming_read_buffer_alloc(int flags, + void *pgsr_private, + size_t per_buffer_private_size, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + PgStreamingReadBufferCB next_block_cb); + +extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr); +extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private); +extern void pg_streaming_read_free(PgStreamingRead *pgsr); + +#endif diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index a584b1ddff..6636cc82c0 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -561,12 +561,6 @@ typedef struct ViewOptions * * Very little code is authorized to touch rel->rd_smgr directly. Instead * use this function to fetch its value. - * - * Note: since a relcache flush can cause the file handle to be closed again, - * it's unwise to hold onto the pointer returned by this function for any - * long period. Recommended practice is to just re-execute RelationGetSmgr - * each time you need to access the SMgrRelation. It's quite cheap in - * comparison to whatever an smgr function is going to do. */ static inline SMgrRelation RelationGetSmgr(Relation rel) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 29fd1cae64..018ebbcbaa 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2089,6 +2089,8 @@ PgStat_TableCounts PgStat_TableStatus PgStat_TableXactStatus PgStat_WalStats +PgStreamingRead +PgStreamingReadRange PgXmlErrorContext PgXmlStrictness Pg_finfo_record -- 2.37.2