From 433e82c94fd1c1b502a2b22e9c3874c1e766c05c Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 15 Aug 2025 11:01:52 -0400 Subject: [PATCH v1] bufmgr: aio: Prototype for not waiting for already-in-progress IO Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- src/include/storage/bufmgr.h | 1 + src/backend/storage/buffer/bufmgr.c | 133 ++++++++++++++++++++++++++-- 2 files changed, 125 insertions(+), 9 deletions(-) diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 41fdc1e7693..7ddb867bc99 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -137,6 +137,7 @@ struct ReadBuffersOperation int flags; int16 nblocks; int16 nblocks_done; + bool foreign_io; PgAioWaitRef io_wref; PgAioReturn io_return; }; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index fd7e21d96d3..de755fd53ad 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1557,6 +1557,41 @@ ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); } +static inline bool +ReadBuffersIOAlreadyInProgress(ReadBuffersOperation *operation, Buffer buffer) +{ + BufferDesc *desc; + uint32 buf_state; + PgAioWaitRef iow; + + pgaio_wref_clear(&iow); + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u32(&desc->state); + if (buf_state & BM_IO_IN_PROGRESS) + iow = desc->io_wref; + } + else + { + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + + if (buf_state & BM_IO_IN_PROGRESS) + iow = desc->io_wref; + UnlockBufHdr(desc, buf_state); + } + + if (pgaio_wref_valid(&iow)) + { + operation->io_wref = iow; + return true; + } + + return false; +} + /* * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. */ @@ -1689,7 +1724,7 @@ WaitReadBuffers(ReadBuffersOperation *operation) * * we first check if we already know the IO is complete. */ - if (aio_ret->result.status == PGAIO_RS_UNKNOWN && + if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) && !pgaio_wref_check_done(&operation->io_wref)) { instr_time io_start = pgstat_prepare_io_time(track_io_timing); @@ -1708,11 +1743,38 @@ WaitReadBuffers(ReadBuffersOperation *operation) Assert(pgaio_wref_check_done(&operation->io_wref)); } - /* - * We now are sure the IO completed. Check the results. This - * includes reporting on errors if there were any. - */ - ProcessReadBuffersResult(operation); + if (operation->foreign_io) + { + Buffer buffer = operation->buffers[operation->nblocks_done]; + BufferDesc *desc; + uint32 buf_state; + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u32(&desc->state); + } + else + { + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + UnlockBufHdr(desc, buf_state); + } + + if (buf_state & BM_VALID) + { + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + } + } + else + { + /* + * We now are sure the IO completed. Check the results. This + * includes reporting on errors if there were any. + */ + ProcessReadBuffersResult(operation); + } } /* @@ -1798,6 +1860,56 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) io_object = IOOBJECT_RELATION; } + /* + * If AIO is in progress, be it in this backend or another backend, we + * just associate the wait reference with the operation and wait in + * WaitReadBuffers(). This turns out to be important for performance in + * two workloads: + * + * 1) A read stream that has to read the same block multiple times within + * the readahead distance. This can happen e.g. for the table accesses + * of an index scan. + * + * 2) Concurrent scans by multiple backends on the same relation. + * + * If we were to synchronously wait for the in-progress IO, we'd not be + * able to keep enough I/O in flight. + * + * + * If we do find there is ongoing I/O for the buffer, we set up a 1-block + * ReadBuffersOperation that WaitReadBuffers then can wait on. + */ + if (1 && ReadBuffersIOAlreadyInProgress(operation, buffers[nblocks_done])) + { + /* FIXME: probably need to wait if io_method == sync? */ + + *nblocks_progress = 1; + did_start_io = true; + operation->foreign_io = true; + + if (0) + elog(LOG, "using foreign IO path"); + + /* FIXME: trace point */ + + /* + * FIXME: how should this be accounted for in stats? Account as a hit + * for now, quite likely *we* started this IO. + */ + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_hit += 1; + else + pgBufferUsage.shared_blks_hit += 1; + + if (operation->rel) + pgstat_count_buffer_hit(operation->rel); + pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageHit; + + return true; + } + /* * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR * flag. The reason for that is that, hopefully, zero_damaged_pages isn't @@ -1855,9 +1967,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) /* * Check if we can start IO on the first to-be-read buffer. * - * If an I/O is already in progress in another backend, we want to wait - * for the outcome: either done, or something went wrong and we will - * retry. + * If a synchronous I/O in progress in another backend (it can't be this + * backend), we want to wait for the outcome: either done, or something + * went wrong and we will retry. */ if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) { @@ -1970,6 +2082,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) if (VacuumCostActive) VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + operation->foreign_io = false; *nblocks_progress = io_buffers_len; did_start_io = true; } @@ -5986,6 +6099,8 @@ WaitIO(BufferDesc *buf) */ if (pgaio_wref_valid(&iow)) { + if (0) + elog(LOG, "foreign wait"); pgaio_wref_wait(&iow); /* -- 2.48.1.76.g4e746b1a31.dirty