Re: Streaming I/O, vectored I/O (WIP)

From: Melanie Plageman <melanieplageman(at)gmail(dot)com>
To: Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
Cc: Heikki Linnakangas <hlinnaka(at)iki(dot)fi>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Andres Freund <andres(at)anarazel(dot)de>
Subject: Re: Streaming I/O, vectored I/O (WIP)
Date: 2023-11-29 01:21:28
Message-ID: 20231129012128.ord4p6nv4brsc4sp@liskov
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, Nov 29, 2023 at 01:17:19AM +1300, Thomas Munro wrote:

Thanks for posting a new version. I've included a review of 0004.

> I've included just the pg_prewarm example user for now while we
> discuss the basic infrastructure. The rest are rebased and in my
> public Github branch streaming-read (repo macdice/postgres) if anyone
> is interested (don't mind the red CI failures, they're just saying I
> ran out of monthly CI credits on the 29th, so close...)

I agree it makes sense to commit the interface with just prewarm as a
user. Then we can start new threads for the various streaming read users
(e.g. vacuum, sequential scan, bitmapheapscan).

> From db5de8ab5a1a804f41006239302fdce954cab331 Mon Sep 17 00:00:00 2001
> From: Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
> Date: Sat, 22 Jul 2023 17:31:54 +1200
> Subject: [PATCH v2 4/8] Provide vectored variant of ReadBuffer().
>
> diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
> index f7c67d504c..8ae3a72053 100644
> --- a/src/backend/storage/buffer/bufmgr.c
> +++ b/src/backend/storage/buffer/bufmgr.c
> @@ -1046,175 +1048,326 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
> if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
> flags |= EB_LOCK_FIRST;
>
> - return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
> - forkNum, strategy, flags);
> + *hit = false;
> +
> + return ExtendBufferedRel(bmr, forkNum, strategy, flags);
> }
>
> - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
> - smgr->smgr_rlocator.locator.spcOid,
> - smgr->smgr_rlocator.locator.dbOid,
> - smgr->smgr_rlocator.locator.relNumber,
> - smgr->smgr_rlocator.backend);
> + buffer = PrepareReadBuffer(bmr,
> + forkNum,
> + blockNum,
> + strategy,
> + hit,
> + &allocated);
> +
> + /* At this point we do NOT hold any locks. */
> +
> + if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK)
> + {
> + /* if we just want zeroes and a lock, we're done */
> + ZeroBuffer(buffer, mode);
> + }
> + else if (!*hit)
> + {
> + /* we might need to perform I/O */
> + CompleteReadBuffers(bmr,
> + &buffer,
> + forkNum,
> + blockNum,
> + 1,
> + mode == RBM_ZERO_ON_ERROR,
> + strategy);
> + }
> +
> + return buffer;
> +}
> +
> +/*
> + * Prepare to read a block. The buffer is pinned. If this is a 'hit', then
> + * the returned buffer can be used immediately. Otherwise, a physical read
> + * should be completed with CompleteReadBuffers(). PrepareReadBuffer()
> + * followed by CompleteReadBuffers() is equivalent ot ReadBuffer(), but the

ot -> to

> + * caller has the opportunity to coalesce reads of neighboring blocks into one
> + * CompleteReadBuffers() call.
> + *
> + * *found is set to true for a hit, and false for a miss.
> + *
> + * *allocated is set to true for a miss that allocates a buffer for the first
> + * time. If there are multiple calls to PrepareReadBuffer() for the same
> + * block before CompleteReadBuffers() or ReadBuffer_common() finishes the
> + * read, then only the first such call will receive *allocated == true, which
> + * the caller might use to issue just one prefetch hint.
> + */
> +Buffer
> +PrepareReadBuffer(BufferManagerRelation bmr,
> + ForkNumber forkNum,
> + BlockNumber blockNum,
> + BufferAccessStrategy strategy,
> + bool *found,
> + bool *allocated)
> +{
> + BufferDesc *bufHdr;
> + bool isLocalBuf;
> + IOContext io_context;
> + IOObject io_object;
>
> + Assert(blockNum != P_NEW);
> +
> + if (bmr.rel)
> + {
> + bmr.smgr = RelationGetSmgr(bmr.rel);
> + bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
> + }
> +
> + isLocalBuf = SmgrIsTemp(bmr.smgr);
> if (isLocalBuf)
> {
> - /*
> - * We do not use a BufferAccessStrategy for I/O of temporary tables.
> - * However, in some cases, the "strategy" may not be NULL, so we can't
> - * rely on IOContextForStrategy() to set the right IOContext for us.
> - * This may happen in cases like CREATE TEMPORARY TABLE AS...
> - */
> io_context = IOCONTEXT_NORMAL;
> io_object = IOOBJECT_TEMP_RELATION;
> - bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
> - if (found)
> - pgBufferUsage.local_blks_hit++;
> - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
> - mode == RBM_ZERO_ON_ERROR)
> - pgBufferUsage.local_blks_read++;
> }
> else
> {
> - /*
> - * lookup the buffer. IO_IN_PROGRESS is set if the requested block is
> - * not currently in memory.
> - */
> io_context = IOContextForStrategy(strategy);
> io_object = IOOBJECT_RELATION;
> - bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
> - strategy, &found, io_context);
> - if (found)
> - pgBufferUsage.shared_blks_hit++;
> - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
> - mode == RBM_ZERO_ON_ERROR)
> - pgBufferUsage.shared_blks_read++;

You've lost this test in your new version. You can do the same thing
(avoid counting zeroed buffers as blocks read) by moving this
pgBufferUsage.shared/local_blks_read++ back into ReadBuffer_common()
where you know if you called ZeroBuffer() or CompleteReadBuffers().

> }
>
> - /* At this point we do NOT hold any locks. */
> + TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
> + bmr.smgr->smgr_rlocator.locator.spcOid,
> + bmr.smgr->smgr_rlocator.locator.dbOid,
> + bmr.smgr->smgr_rlocator.locator.relNumber,
> + bmr.smgr->smgr_rlocator.backend);
>
> - /* if it was already in the buffer pool, we're done */
> - if (found)
> + ResourceOwnerEnlarge(CurrentResourceOwner);
> + if (isLocalBuf)
> + {
> + bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, found, allocated);
> + if (*found)
> + pgBufferUsage.local_blks_hit++;
> + else
> + pgBufferUsage.local_blks_read++;

See comment above.

> + }
> + else
> + {
> + bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum,
> + strategy, found, allocated, io_context);
> + if (*found)
> + pgBufferUsage.shared_blks_hit++;
> + else
> + pgBufferUsage.shared_blks_read++;
> + }
> + if (bmr.rel)
> + {
> + pgstat_count_buffer_read(bmr.rel);

This is double-counting reads. You've left the call in
ReadBufferExtended() as well as adding this here. It should be fine to
remove it from ReadBufferExtended(). Because you test bmr.rel, leaving
the call here in PrepareReadBuffer() wouldn't have an effect on
ReadBuffer_common() callers who don't pass a relation (like recovery).
The other current callers of ReadBuffer_common() (by way of
ExtendBufferedRelTo()) who do pass a relation are visibility map and
freespace map extension, and I don't think we track relation stats for
the VM and FSM.

This does continue the practice of counting zeroed buffers as reads in
table-level stats. But, that is the same as master.

> - * if we have gotten to this point, we have allocated a buffer for the
> - * page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
> - * if it's a shared buffer.
> - */
> - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
> +/*
> + * Complete a set reads prepared with PrepareReadBuffers(). The buffers must
> + * cover a cluster of neighboring block numbers.
> + *
> + * Typically this performs one physical vector read covering the block range,
> + * but if some of the buffers have already been read in the meantime by any
> + * backend, zero or multiple reads may be performed.
> + */
> +void
> +CompleteReadBuffers(BufferManagerRelation bmr,
> + Buffer *buffers,
> + ForkNumber forknum,
> + BlockNumber blocknum,
> + int nblocks,
> + bool zero_on_error,
> + BufferAccessStrategy strategy)
> +{
...
> - pgstat_count_io_op_time(io_object, io_context,
> - IOOP_READ, io_start, 1);
> + /* We found a buffer that we have to read in. */
> + io_buffers[0] = buffers[i];
> + io_pages[0] = BufferGetBlock(buffers[i]);
> + io_first_block = blocknum + i;
> + io_buffers_len = 1;
>
> - /* check for garbage data */
> - if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
> - PIV_LOG_WARNING | PIV_REPORT_STAT))
> + /*
> + * How many neighboring-on-disk blocks can we can scatter-read into
> + * other buffers at the same time?
> + */
> + while ((i + 1) < nblocks &&
> + CompleteReadBuffersCanStartIO(buffers[i + 1]))
> + {
> + /* Must be consecutive block numbers. */
> + Assert(BufferGetBlockNumber(buffers[i + 1]) ==
> + BufferGetBlockNumber(buffers[i]) + 1);
> +
> + io_buffers[io_buffers_len] = buffers[++i];
> + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
> + }
> +
> + io_start = pgstat_prepare_io_time();
> + smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len);
> + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, 1);

I'd pass io_buffers_len as cnt to pgstat_count_io_op_time(). op_bytes
will be BLCKSZ and multiplying that by the number of reads should
produce the number of bytes read.

> diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
> index 4efb34b75a..ee9307b612 100644
> --- a/src/backend/storage/buffer/localbuf.c
> +++ b/src/backend/storage/buffer/localbuf.c
> @@ -116,7 +116,7 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
> */
> BufferDesc *
> LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
> - bool *foundPtr)
> + bool *foundPtr, bool *allocPtr)
> {
> BufferTag newTag; /* identity of requested block */
> LocalBufferLookupEnt *hresult;
> @@ -144,6 +144,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
> Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
>
> *foundPtr = PinLocalBuffer(bufHdr, true);
> + *allocPtr = false;
> }
> else
> {
> @@ -170,6 +171,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
> pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
>
> *foundPtr = false;
> + *allocPtr = true;
> }

I would prefer you use consistent naming for
allocPtr/allocatedPtr/allocated. I also think that all the functions
taking it as an output argument should explain what it is
(BufferAlloc()/LocalBufferAlloc(), etc). I found myself doing a bit of
digging around to figure it out. You have a nice comment about it above
PrepareReadBuffer(). I think you may need to resign yourself to
restating that bit (or some version of it) for all of the functions
taking it as an argument.

>
> diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
> index 41e26d3e20..e29ca85077 100644
> --- a/src/include/storage/bufmgr.h
> +++ b/src/include/storage/bufmgr.h
> @@ -14,6 +14,8 @@
> #ifndef BUFMGR_H
> #define BUFMGR_H
>
> +#include "pgstat.h"

I don't know what we are supposed to do, but I would have included this
in bufmgr.c (where I actually needed it) instead of including it here.

> +#include "port/pg_iovec.h"
> #include "storage/block.h"
> #include "storage/buf.h"
> #include "storage/bufpage.h"
> @@ -47,6 +49,8 @@ typedef enum
> RBM_ZERO_AND_CLEANUP_LOCK, /* Like RBM_ZERO_AND_LOCK, but locks the page
> * in "cleanup" mode */
> RBM_ZERO_ON_ERROR, /* Read, but return an all-zeros page on error */

> + RBM_WILL_ZERO, /* Don't read from disk, caller will call
> + * ZeroBuffer() */

It's confusing that this (RBM_WILL_ZERO) is part of this commit since it
isn't used in this commit.

> RBM_NORMAL_NO_LOG, /* Don't log page as invalid during WAL
> * replay; otherwise same as RBM_NORMAL */
> } ReadBufferMode;

- Melanie

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message jian he 2023-11-29 01:37:55 Re: [PATCH] ltree hash functions
Previous Message Peter Geoghegan 2023-11-29 00:57:52 Re: Optimizing nbtree ScalarArrayOp execution, allowing multi-column ordered scans, skip scan