From 1b50526e266f2413e04572f8ea5007805e2f20c2 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 26 Feb 2024 23:48:31 +1300
Subject: [PATCH v5 13/14] Streaming Read API

---
 contrib/pg_prewarm/pg_prewarm.c          |  40 +-
 src/backend/storage/Makefile             |   2 +-
 src/backend/storage/aio/Makefile         |  14 +
 src/backend/storage/aio/meson.build      |   5 +
 src/backend/storage/aio/streaming_read.c | 612 ++++++++++++++++++++++
 src/backend/storage/buffer/bufmgr.c      | 641 ++++++++++++++++-------
 src/backend/storage/buffer/localbuf.c    |  14 +-
 src/backend/storage/meson.build          |   1 +
 src/include/storage/bufmgr.h             |  45 ++
 src/include/storage/streaming_read.h     |  52 ++
 src/tools/pgindent/typedefs.list         |   3 +
 11 files changed, 1218 insertions(+), 211 deletions(-)
 create mode 100644 src/backend/storage/aio/Makefile
 create mode 100644 src/backend/storage/aio/meson.build
 create mode 100644 src/backend/storage/aio/streaming_read.c
 create mode 100644 src/include/storage/streaming_read.h

diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c
index 8541e4d6e46..1cc84bcb0c2 100644
--- a/contrib/pg_prewarm/pg_prewarm.c
+++ b/contrib/pg_prewarm/pg_prewarm.c
@@ -20,6 +20,7 @@
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/smgr.h"
+#include "storage/streaming_read.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -38,6 +39,25 @@ typedef enum
 
 static PGIOAlignedBlock blockbuffer;
 
+struct pg_prewarm_streaming_read_private
+{
+	BlockNumber blocknum;
+	int64		last_block;
+};
+
+static BlockNumber
+pg_prewarm_streaming_read_next(PgStreamingRead *pgsr,
+							   void *pgsr_private,
+							   void *per_buffer_data)
+{
+	struct pg_prewarm_streaming_read_private *p = pgsr_private;
+
+	if (p->blocknum <= p->last_block)
+		return p->blocknum++;
+
+	return InvalidBlockNumber;
+}
+
 /*
  * pg_prewarm(regclass, mode text, fork text,
  *			  first_block int8, last_block int8)
@@ -183,18 +203,36 @@ pg_prewarm(PG_FUNCTION_ARGS)
 	}
 	else if (ptype == PREWARM_BUFFER)
 	{
+		struct pg_prewarm_streaming_read_private p;
+		PgStreamingRead *pgsr;
+
 		/*
 		 * In buffer mode, we actually pull the data into shared_buffers.
 		 */
+
+		/* Set up the private state for our streaming buffer read callback. */
+		p.blocknum = first_block;
+		p.last_block = last_block;
+
+		pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_FULL,
+											  &p,
+											  0,
+											  NULL,
+											  BMR_REL(rel),
+											  forkNumber,
+											  pg_prewarm_streaming_read_next);
+
 		for (block = first_block; block <= last_block; ++block)
 		{
 			Buffer		buf;
 
 			CHECK_FOR_INTERRUPTS();
-			buf = ReadBufferExtended(rel, forkNumber, block, RBM_NORMAL, NULL);
+			buf = pg_streaming_read_buffer_get_next(pgsr, NULL);
 			ReleaseBuffer(buf);
 			++blocks_done;
 		}
+		Assert(pg_streaming_read_buffer_get_next(pgsr, NULL) == InvalidBuffer);
+		pg_streaming_read_free(pgsr);
 	}
 
 	/* Close relation, release lock. */
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca20..eec03f6f2b4 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = aio buffer file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
new file mode 100644
index 00000000000..bcab44c802f
--- /dev/null
+++ b/src/backend/storage/aio/Makefile
@@ -0,0 +1,14 @@
+#
+# Makefile for storage/aio
+#
+# src/backend/storage/aio/Makefile
+#
+
+subdir = src/backend/storage/aio
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+	streaming_read.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
new file mode 100644
index 00000000000..39aef2a84a2
--- /dev/null
+++ b/src/backend/storage/aio/meson.build
@@ -0,0 +1,5 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+backend_sources += files(
+  'streaming_read.c',
+)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
new file mode 100644
index 00000000000..71f2c4a70b6
--- /dev/null
+++ b/src/backend/storage/aio/streaming_read.c
@@ -0,0 +1,612 @@
+#include "postgres.h"
+
+#include "storage/streaming_read.h"
+#include "utils/rel.h"
+
+/*
+ * Element type for PgStreamingRead's circular array of block ranges.
+ */
+typedef struct PgStreamingReadRange
+{
+	bool		need_wait;
+	bool		advice_issued;
+	BlockNumber blocknum;
+	int			nblocks;
+	int			per_buffer_data_index;
+	Buffer		buffers[MAX_BUFFERS_PER_TRANSFER];
+	ReadBuffersOperation operation;
+} PgStreamingReadRange;
+
+/*
+ * Streaming read object.
+ */
+struct PgStreamingRead
+{
+	int			max_ios;
+	int			ios_in_progress;
+	int			max_pinned_buffers;
+	int			pinned_buffers;
+	int			pinned_buffers_trigger;
+	int			next_tail_buffer;
+	int			ramp_up_pin_limit;
+	int			ramp_up_pin_stall;
+	bool		finished;
+	bool		advice_enabled;
+	void	   *pgsr_private;
+	PgStreamingReadBufferCB callback;
+
+	BufferAccessStrategy strategy;
+	BufferManagerRelation bmr;
+	ForkNumber	forknum;
+
+	/* Sometimes we need to buffer one block for flow control. */
+	BlockNumber unget_blocknum;
+	void	   *unget_per_buffer_data;
+
+	/* Next expected block, for detecting sequential access. */
+	BlockNumber seq_blocknum;
+
+	/* Space for optional per-buffer private data. */
+	size_t		per_buffer_data_size;
+	void	   *per_buffer_data;
+
+	/* Circular buffer of ranges. */
+	int			size;
+	int			head;
+	int			tail;
+	PgStreamingReadRange ranges[FLEXIBLE_ARRAY_MEMBER];
+};
+
+static PgStreamingRead *
+pg_streaming_read_buffer_alloc_internal(int flags,
+										void *pgsr_private,
+										size_t per_buffer_data_size,
+										BufferAccessStrategy strategy)
+{
+	PgStreamingRead *pgsr;
+	int			size;
+	int			max_ios;
+	uint32		max_pinned_buffers;
+
+
+	/*
+	 * Decide how many assumed I/Os we will allow to run concurrently.  That
+	 * is, advice to the kernel to tell it that we will soon read.  This
+	 * number also affects how far we look ahead for opportunities to start
+	 * more I/Os.
+	 */
+	if (flags & PGSR_FLAG_MAINTENANCE)
+		max_ios = maintenance_io_concurrency;
+	else
+		max_ios = effective_io_concurrency;
+
+	/*
+	 * The desired level of I/O concurrency controls how far ahead we are
+	 * willing to look ahead.  We also clamp it to at least
+	 * MAX_BUFFER_PER_TRANFER so that we can have a chance to build up a full
+	 * sized read, even when max_ios is zero.
+	 */
+	max_pinned_buffers = Max(max_ios * 4, MAX_BUFFERS_PER_TRANSFER);
+
+	/*
+	 * The *_io_concurrency GUCs might be set to 0, but we want to allow at
+	 * least one, to keep our gating logic simple.
+	 */
+	max_ios = Max(max_ios, 1);
+
+	/*
+	 * Don't allow this backend to pin too many buffers.  For now we'll apply
+	 * the limit for the shared buffer pool and the local buffer pool, without
+	 * worrying which it is.
+	 */
+	LimitAdditionalPins(&max_pinned_buffers);
+	LimitAdditionalLocalPins(&max_pinned_buffers);
+	Assert(max_pinned_buffers > 0);
+
+	/*
+	 * pgsr->ranges is a circular buffer.  When it is empty, head == tail.
+	 * When it is full, there is an empty element between head and tail.  Head
+	 * can also be empty (nblocks == 0), therefore we need two extra elements
+	 * for non-occupied ranges, on top of max_pinned_buffers to allow for the
+	 * maxmimum possible number of occupied ranges of the smallest possible
+	 * size of one.
+	 */
+	size = max_pinned_buffers + 2;
+
+	pgsr = (PgStreamingRead *)
+		palloc0(offsetof(PgStreamingRead, ranges) +
+				sizeof(pgsr->ranges[0]) * size);
+
+	pgsr->max_ios = max_ios;
+	pgsr->per_buffer_data_size = per_buffer_data_size;
+	pgsr->max_pinned_buffers = max_pinned_buffers;
+	pgsr->pgsr_private = pgsr_private;
+	pgsr->strategy = strategy;
+	pgsr->size = size;
+
+	pgsr->unget_blocknum = InvalidBlockNumber;
+
+#ifdef USE_PREFETCH
+
+	/*
+	 * This system supports prefetching advice.  As long as direct I/O isn't
+	 * enabled, and the caller hasn't promised sequential access, we can use
+	 * it.
+	 */
+	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+		(flags & PGSR_FLAG_SEQUENTIAL) == 0)
+		pgsr->advice_enabled = true;
+#endif
+
+	/*
+	 * We start off building small ranges, but double that quickly, for the
+	 * benefit of users that don't know how far ahead they'll read.  This can
+	 * be disabled by users that already know they'll read all the way.
+	 */
+	if (flags & PGSR_FLAG_FULL)
+		pgsr->ramp_up_pin_limit = INT_MAX;
+	else
+		pgsr->ramp_up_pin_limit = 1;
+
+	/*
+	 * We want to avoid creating ranges that are smaller than they could be
+	 * just because we hit max_pinned_buffers.  We only look ahead when the
+	 * number of pinned buffers falls below this trigger number, or put
+	 * another way, we stop looking ahead when we wouldn't be able to build a
+	 * "full sized" range.
+	 */
+	pgsr->pinned_buffers_trigger =
+		Max(1, (int) max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER);
+
+	/* Space for the callback to store extra data along with each block. */
+	if (per_buffer_data_size)
+		pgsr->per_buffer_data = palloc(per_buffer_data_size * max_pinned_buffers);
+
+	return pgsr;
+}
+
+/*
+ * Create a new streaming read object that can be used to perform the
+ * equivalent of a series of ReadBuffer() calls for one fork of one relation.
+ * Internally, it generates larger vectored reads where possible by looking
+ * ahead.
+ */
+PgStreamingRead *
+pg_streaming_read_buffer_alloc(int flags,
+							   void *pgsr_private,
+							   size_t per_buffer_data_size,
+							   BufferAccessStrategy strategy,
+							   BufferManagerRelation bmr,
+							   ForkNumber forknum,
+							   PgStreamingReadBufferCB next_block_cb)
+{
+	PgStreamingRead *result;
+
+	result = pg_streaming_read_buffer_alloc_internal(flags,
+													 pgsr_private,
+													 per_buffer_data_size,
+													 strategy);
+	result->callback = next_block_cb;
+	result->bmr = bmr;
+	result->forknum = forknum;
+
+	return result;
+}
+
+/*
+ * Find the per-buffer data index for the Nth block of a range.
+ */
+static int
+get_per_buffer_data_index(PgStreamingRead *pgsr, PgStreamingReadRange *range, int n)
+{
+	int			result;
+
+	/*
+	 * Find slot in the circular buffer of per-buffer data, without using the
+	 * expensive % operator.
+	 */
+	result = range->per_buffer_data_index + n;
+	if (result >= pgsr->max_pinned_buffers)
+		result -= pgsr->max_pinned_buffers;
+	Assert(result == (range->per_buffer_data_index + n) % pgsr->max_pinned_buffers);
+
+	return result;
+}
+
+/*
+ * Return a pointer to the per-buffer data by index.
+ */
+static void *
+get_per_buffer_data_by_index(PgStreamingRead *pgsr, int per_buffer_data_index)
+{
+	return (char *) pgsr->per_buffer_data +
+		pgsr->per_buffer_data_size * per_buffer_data_index;
+}
+
+/*
+ * Return a pointer to the per-buffer data for the Nth block of a range.
+ */
+static void *
+get_per_buffer_data(PgStreamingRead *pgsr, PgStreamingReadRange *range, int n)
+{
+	return get_per_buffer_data_by_index(pgsr,
+										get_per_buffer_data_index(pgsr,
+																  range,
+																  n));
+}
+
+/*
+ * Start reading the head range, and create a new head range.  The new head
+ * range is returned.  It may not be empty, if StartReadBuffers() couldn't
+ * start the entire range; in that case the returned range contains the
+ * remaining portion of the range.
+ */
+static PgStreamingReadRange *
+pg_streaming_read_start_head_range(PgStreamingRead *pgsr)
+{
+	PgStreamingReadRange *head_range;
+	PgStreamingReadRange *new_head_range;
+	int			nblocks_pinned;
+	int			flags;
+
+	/* Caller should make sure we never exceed max_ios. */
+	Assert(pgsr->ios_in_progress < pgsr->max_ios);
+
+	/* Should only call if the head range has some blocks to read. */
+	head_range = &pgsr->ranges[pgsr->head];
+	Assert(head_range->nblocks > 0);
+
+	/*
+	 * If advice hasn't been suppressed, and this system supports it, this
+	 * isn't a strictly sequential pattern, then we'll issue advice.
+	 */
+	if (pgsr->advice_enabled && head_range->blocknum != pgsr->seq_blocknum)
+		flags = READ_BUFFERS_ISSUE_ADVICE;
+	else
+		flags = 0;
+
+
+	/* Start reading as many blocks as we can from the head range. */
+	nblocks_pinned = head_range->nblocks;
+	head_range->need_wait =
+		StartReadBuffers(pgsr->bmr,
+						 head_range->buffers,
+						 pgsr->forknum,
+						 head_range->blocknum,
+						 &nblocks_pinned,
+						 pgsr->strategy,
+						 flags,
+						 &head_range->operation);
+
+	/* Did that start an I/O? */
+	if (head_range->need_wait && (flags & READ_BUFFERS_ISSUE_ADVICE))
+	{
+		head_range->advice_issued = true;
+		pgsr->ios_in_progress++;
+		Assert(pgsr->ios_in_progress <= pgsr->max_ios);
+	}
+
+	/*
+	 * StartReadBuffers() might have pinned fewer blocks than we asked it to,
+	 * but always at least one.
+	 */
+	Assert(nblocks_pinned <= head_range->nblocks);
+	Assert(nblocks_pinned >= 1);
+	pgsr->pinned_buffers += nblocks_pinned;
+
+	/*
+	 * Remember where the next block would be after that, so we can detect
+	 * sequential access next time.
+	 */
+	pgsr->seq_blocknum = head_range->blocknum + nblocks_pinned;
+
+	/*
+	 * Create a new head range.  There must be space, because we have enough
+	 * elements for every range to hold just one block, up to the pin limit.
+	 */
+	Assert(pgsr->size > pgsr->max_pinned_buffers);
+	Assert((pgsr->head + 1) % pgsr->size != pgsr->tail);
+	if (++pgsr->head == pgsr->size)
+		pgsr->head = 0;
+	new_head_range = &pgsr->ranges[pgsr->head];
+	new_head_range->nblocks = 0;
+	new_head_range->advice_issued = false;
+
+	/*
+	 * If we didn't manage to start the whole read above, we split the range,
+	 * moving the remainder into the new head range.
+	 */
+	if (nblocks_pinned < head_range->nblocks)
+	{
+		int			nblocks_remaining = head_range->nblocks - nblocks_pinned;
+
+		head_range->nblocks = nblocks_pinned;
+
+		new_head_range->blocknum = head_range->blocknum + nblocks_pinned;
+		new_head_range->nblocks = nblocks_remaining;
+	}
+
+	/* The new range has per-buffer data starting after the previous range. */
+	new_head_range->per_buffer_data_index =
+		get_per_buffer_data_index(pgsr, head_range, nblocks_pinned);
+
+	return new_head_range;
+}
+
+/*
+ * Ask the callback which block it would like us to read next, with a small
+ * buffer in front to allow pg_streaming_unget_block() to work.
+ */
+static BlockNumber
+pg_streaming_get_block(PgStreamingRead *pgsr, void *per_buffer_data)
+{
+	BlockNumber result;
+
+	if (unlikely(pgsr->unget_blocknum != InvalidBlockNumber))
+	{
+		/*
+		 * If we had to unget a block, now it is time to return that one
+		 * again.
+		 */
+		result = pgsr->unget_blocknum;
+		pgsr->unget_blocknum = InvalidBlockNumber;
+
+		/*
+		 * The same per_buffer_data element must have been used, and still
+		 * contains whatever data the callback wrote into it.  So we just
+		 * sanity-check that we were called with the value that
+		 * pg_streaming_unget_block() pushed back.
+		 */
+		Assert(per_buffer_data == pgsr->unget_per_buffer_data);
+	}
+	else
+	{
+		/* Use the installed callback directly. */
+		result = pgsr->callback(pgsr, pgsr->pgsr_private, per_buffer_data);
+	}
+
+	return result;
+}
+
+/*
+ * In order to deal with short reads in StartReadBuffers(), we sometimes need
+ * to defer handling of a block until later.  This *must* be called with the
+ * last value returned by pg_streaming_get_block().
+ */
+static void
+pg_streaming_unget_block(PgStreamingRead *pgsr, BlockNumber blocknum, void *per_buffer_data)
+{
+	Assert(pgsr->unget_blocknum == InvalidBlockNumber);
+	pgsr->unget_blocknum = blocknum;
+	pgsr->unget_per_buffer_data = per_buffer_data;
+}
+
+static void
+pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
+{
+	PgStreamingReadRange *range;
+
+	/*
+	 * If we're still ramping up, we may have to stall to wait for buffers to
+	 * be consumed first before we do any more prefetching.
+	 */
+	if (pgsr->ramp_up_pin_stall > 0)
+	{
+		Assert(pgsr->pinned_buffers > 0);
+		return;
+	}
+
+	/*
+	 * If we're finished or can't start more I/O, then don't look ahead.
+	 */
+	if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios)
+		return;
+
+	/*
+	 * We'll also wait until the number of pinned buffers falls below our
+	 * trigger level, so that we have the chance to create a full range.
+	 */
+	if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+		return;
+
+	do
+	{
+		BlockNumber blocknum;
+		void	   *per_buffer_data;
+
+		/* Do we have a full-sized range? */
+		range = &pgsr->ranges[pgsr->head];
+		if (range->nblocks == lengthof(range->buffers))
+		{
+			/* Start as much of it as we can. */
+			range = pg_streaming_read_start_head_range(pgsr);
+
+			/* If we're now at the I/O limit, stop here. */
+			if (pgsr->ios_in_progress == pgsr->max_ios)
+				return;
+
+			/*
+			 * If we couldn't form a full range, then stop here to avoid
+			 * creating small I/O.
+			 */
+			if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+				return;
+
+			/*
+			 * That might have only been partially started, but always
+			 * processes at least one so that'll do for now.
+			 */
+			Assert(range->nblocks < lengthof(range->buffers));
+		}
+
+		/* Find per-buffer data slot for the next block. */
+		per_buffer_data = get_per_buffer_data(pgsr, range, range->nblocks);
+
+		/* Find out which block the callback wants to read next. */
+		blocknum = pg_streaming_get_block(pgsr, per_buffer_data);
+		if (blocknum == InvalidBlockNumber)
+		{
+			/* End of stream. */
+			pgsr->finished = true;
+			break;
+		}
+
+		/*
+		 * Is there a head range that we cannot extend, because the requested
+		 * block is not consecutive?
+		 */
+		if (range->nblocks > 0 &&
+			range->blocknum + range->nblocks != blocknum)
+		{
+			/* Yes.  Start it, so we can begin building a new one. */
+			range = pg_streaming_read_start_head_range(pgsr);
+
+			/*
+			 * It's possible that it was only partially started, and we have a
+			 * new range with the remainder.  Keep starting I/Os until we get
+			 * it all out of the way, or we hit the I/O limit.
+			 */
+			while (range->nblocks > 0 && pgsr->ios_in_progress < pgsr->max_ios)
+				range = pg_streaming_read_start_head_range(pgsr);
+
+			/*
+			 * We have to 'unget' the block returned by the callback if we
+			 * don't have enough I/O capacity left to start something.
+			 */
+			if (pgsr->ios_in_progress == pgsr->max_ios)
+			{
+				pg_streaming_unget_block(pgsr, blocknum, per_buffer_data);
+				return;
+			}
+		}
+
+		/* If we have a new, empty range, initialize the start block. */
+		if (range->nblocks == 0)
+		{
+			range->blocknum = blocknum;
+		}
+
+		/* This block extends the range by one. */
+		Assert(range->blocknum + range->nblocks == blocknum);
+		range->nblocks++;
+
+	} while (pgsr->pinned_buffers + range->nblocks < pgsr->max_pinned_buffers &&
+			 pgsr->pinned_buffers + range->nblocks < pgsr->ramp_up_pin_limit);
+
+	/* If we've hit the ramp-up limit, insert a stall. */
+	if (pgsr->pinned_buffers + range->nblocks >= pgsr->ramp_up_pin_limit)
+	{
+		/* Can't get here if an earlier stall hasn't finished. */
+		Assert(pgsr->ramp_up_pin_stall == 0);
+		/* Don't do any more prefetching until these buffers are consumed. */
+		pgsr->ramp_up_pin_stall = pgsr->ramp_up_pin_limit;
+		/* Double it.  It will soon be out of the way. */
+		pgsr->ramp_up_pin_limit *= 2;
+	}
+
+	/* Start as much as we can. */
+	while (range->nblocks > 0)
+	{
+		range = pg_streaming_read_start_head_range(pgsr);
+		if (pgsr->ios_in_progress == pgsr->max_ios)
+			break;
+	}
+}
+
+Buffer
+pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_data)
+{
+	pg_streaming_read_look_ahead(pgsr);
+
+	/* See if we have one buffer to return. */
+	while (pgsr->tail != pgsr->head)
+	{
+		PgStreamingReadRange *tail_range;
+
+		tail_range = &pgsr->ranges[pgsr->tail];
+
+		/*
+		 * Do we need to perform an I/O before returning the buffers from this
+		 * range?
+		 */
+		if (tail_range->need_wait)
+		{
+			WaitReadBuffers(&tail_range->operation);
+			tail_range->need_wait = false;
+
+			/*
+			 * We don't really know if the kernel generated a physical I/O
+			 * when we issued advice, let alone when it finished, but it has
+			 * certainly finished now because we've performed the read.
+			 */
+			if (tail_range->advice_issued)
+			{
+				Assert(pgsr->ios_in_progress > 0);
+				pgsr->ios_in_progress--;
+			}
+		}
+
+		/* Are there more buffers available in this range? */
+		if (pgsr->next_tail_buffer < tail_range->nblocks)
+		{
+			int			buffer_index;
+			Buffer		buffer;
+
+			buffer_index = pgsr->next_tail_buffer++;
+			buffer = tail_range->buffers[buffer_index];
+
+			Assert(BufferIsValid(buffer));
+
+			/* We are giving away ownership of this pinned buffer. */
+			Assert(pgsr->pinned_buffers > 0);
+			pgsr->pinned_buffers--;
+
+			if (pgsr->ramp_up_pin_stall > 0)
+				pgsr->ramp_up_pin_stall--;
+
+			if (per_buffer_data)
+				*per_buffer_data = get_per_buffer_data(pgsr, tail_range, buffer_index);
+
+			return buffer;
+		}
+
+		/* Advance tail to next range, if there is one. */
+		if (++pgsr->tail == pgsr->size)
+			pgsr->tail = 0;
+		pgsr->next_tail_buffer = 0;
+
+		/*
+		 * If tail crashed into head, and head is not empty, then it is time
+		 * to start that range.
+		 */
+		if (pgsr->tail == pgsr->head &&
+			pgsr->ranges[pgsr->head].nblocks > 0)
+			pg_streaming_read_start_head_range(pgsr);
+	}
+
+	Assert(pgsr->pinned_buffers == 0);
+
+	return InvalidBuffer;
+}
+
+void
+pg_streaming_read_free(PgStreamingRead *pgsr)
+{
+	Buffer		buffer;
+
+	/* Stop looking ahead. */
+	pgsr->finished = true;
+
+	/* Unpin anything that wasn't consumed. */
+	while ((buffer = pg_streaming_read_buffer_get_next(pgsr, NULL)) != InvalidBuffer)
+		ReleaseBuffer(buffer);
+
+	Assert(pgsr->pinned_buffers == 0);
+	Assert(pgsr->ios_in_progress == 0);
+
+	/* Release memory. */
+	if (pgsr->per_buffer_data)
+		pfree(pgsr->per_buffer_data);
+
+	pfree(pgsr);
+}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index bdf89bbc4dc..3b1b0ad99df 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -19,6 +19,11 @@
  *		and pin it so that no one can destroy it while this process
  *		is using it.
  *
+ * StartReadBuffers() -- as above, but for multiple contiguous blocks in
+ *		two steps.
+ *
+ * WaitReadBuffers() -- second step of StartReadBuffers().
+ *
  * ReleaseBuffer() -- unpin a buffer
  *
  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
@@ -472,10 +477,9 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
 )
 
 
-static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
+static Buffer ReadBuffer_common(BufferManagerRelation bmr,
 								ForkNumber forkNum, BlockNumber blockNum,
-								ReadBufferMode mode, BufferAccessStrategy strategy,
-								bool *hit);
+								ReadBufferMode mode, BufferAccessStrategy strategy);
 static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr,
 										   ForkNumber fork,
 										   BufferAccessStrategy strategy,
@@ -501,7 +505,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
 static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
 						  WritebackContext *wb_context);
 static void WaitIO(BufferDesc *buf);
-static bool StartBufferIO(BufferDesc *buf, bool forInput);
+static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
 							  uint32 set_flag_bits, bool forget_owner);
 static void AbortBufferIO(Buffer buffer);
@@ -782,7 +786,6 @@ Buffer
 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 				   ReadBufferMode mode, BufferAccessStrategy strategy)
 {
-	bool		hit;
 	Buffer		buf;
 
 	/*
@@ -795,15 +798,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot access temporary tables of other sessions")));
 
-	/*
-	 * Read the buffer, and update pgstat counters to reflect a cache hit or
-	 * miss.
-	 */
-	pgstat_count_buffer_read(reln);
-	buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence,
-							forkNum, blockNum, mode, strategy, &hit);
-	if (hit)
-		pgstat_count_buffer_hit(reln);
+	buf = ReadBuffer_common(BMR_REL(reln),
+							forkNum, blockNum, mode, strategy);
+
 	return buf;
 }
 
@@ -823,13 +820,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
 						  BlockNumber blockNum, ReadBufferMode mode,
 						  BufferAccessStrategy strategy, bool permanent)
 {
-	bool		hit;
-
 	SMgrRelation smgr = smgropen(rlocator, InvalidBackendId);
 
-	return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
-							 RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
-							 mode, strategy, &hit);
+	return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+									  RELPERSISTENCE_UNLOGGED),
+							 forkNum, blockNum,
+							 mode, strategy);
 }
 
 /*
@@ -995,35 +991,68 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
 	 */
 	if (buffer == InvalidBuffer)
 	{
-		bool		hit;
-
 		Assert(extended_by == 0);
-		buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence,
-								   fork, extend_to - 1, mode, strategy,
-								   &hit);
+		buffer = ReadBuffer_common(bmr, fork, extend_to - 1, mode, strategy);
 	}
 
 	return buffer;
 }
 
+/*
+ * Zero a buffer and lock it, as part of the implementation of
+ * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK.  The buffer must be already
+ * pinned.  It does not have to be valid, but it is valid and locked on
+ * return.
+ */
+static void
+ZeroBuffer(Buffer buffer, ReadBufferMode mode)
+{
+	BufferDesc *bufHdr;
+	uint32		buf_state;
+
+	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+
+	if (BufferIsLocal(buffer))
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+	else
+	{
+		bufHdr = GetBufferDescriptor(buffer - 1);
+		if (mode == RBM_ZERO_AND_LOCK)
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		else
+			LockBufferForCleanup(buffer);
+	}
+
+	memset(BufferGetPage(buffer), 0, BLCKSZ);
+
+	if (BufferIsLocal(buffer))
+	{
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state |= BM_VALID;
+		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+	}
+	else
+	{
+		buf_state = LockBufHdr(bufHdr);
+		buf_state |= BM_VALID;
+		UnlockBufHdr(bufHdr, buf_state);
+	}
+}
+
 /*
  * ReadBuffer_common -- common logic for all ReadBuffer variants
  *
  * *hit is set to true if the request was satisfied from shared buffer cache.
  */
 static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum,
 				  BlockNumber blockNum, ReadBufferMode mode,
-				  BufferAccessStrategy strategy, bool *hit)
+				  BufferAccessStrategy strategy)
 {
-	BufferDesc *bufHdr;
-	Block		bufBlock;
-	bool		found;
-	IOContext	io_context;
-	IOObject	io_object;
-	bool		isLocalBuf = SmgrIsTemp(smgr);
-
-	*hit = false;
+	ReadBuffersOperation operation;
+	Buffer		buffer;
+	int			nblocks;
+	int			flags;
 
 	/*
 	 * Backward compatibility path, most code should use ExtendBufferedRel()
@@ -1042,181 +1071,404 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
 			flags |= EB_LOCK_FIRST;
 
-		return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
-								 forkNum, strategy, flags);
+		return ExtendBufferedRel(bmr, forkNum, strategy, flags);
 	}
 
-	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
-									   smgr->smgr_rlocator.locator.spcOid,
-									   smgr->smgr_rlocator.locator.dbOid,
-									   smgr->smgr_rlocator.locator.relNumber,
-									   smgr->smgr_rlocator.backend);
+	nblocks = 1;
+	if (mode == RBM_ZERO_ON_ERROR)
+		flags = READ_BUFFERS_ZERO_ON_ERROR;
+	else
+		flags = 0;
+	if (StartReadBuffers(bmr,
+						 &buffer,
+						 forkNum,
+						 blockNum,
+						 &nblocks,
+						 strategy,
+						 flags,
+						 &operation))
+		WaitReadBuffers(&operation);
+	Assert(nblocks == 1);		/* single block can't be short */
+
+	if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK)
+		ZeroBuffer(buffer, mode);
+
+	return buffer;
+}
+
+static Buffer
+PrepareReadBuffer(BufferManagerRelation bmr,
+				  ForkNumber forkNum,
+				  BlockNumber blockNum,
+				  BufferAccessStrategy strategy,
+				  bool *foundPtr)
+{
+	BufferDesc *bufHdr;
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	Assert(blockNum != P_NEW);
 
+	Assert(bmr.smgr);
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
 	if (isLocalBuf)
 	{
-		/*
-		 * We do not use a BufferAccessStrategy for I/O of temporary tables.
-		 * However, in some cases, the "strategy" may not be NULL, so we can't
-		 * rely on IOContextForStrategy() to set the right IOContext for us.
-		 * This may happen in cases like CREATE TEMPORARY TABLE AS...
-		 */
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
-		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
-		if (found)
-			pgBufferUsage.local_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.local_blks_read++;
 	}
 	else
 	{
-		/*
-		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
-		 * not currently in memory.
-		 */
 		io_context = IOContextForStrategy(strategy);
 		io_object = IOOBJECT_RELATION;
-		bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
-							 strategy, &found, io_context);
-		if (found)
-			pgBufferUsage.shared_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.shared_blks_read++;
 	}
 
-	/* At this point we do NOT hold any locks. */
+	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+									   bmr.smgr->smgr_rlocator.locator.spcOid,
+									   bmr.smgr->smgr_rlocator.locator.dbOid,
+									   bmr.smgr->smgr_rlocator.locator.relNumber,
+									   bmr.smgr->smgr_rlocator.backend);
 
-	/* if it was already in the buffer pool, we're done */
-	if (found)
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+	if (isLocalBuf)
+	{
+		bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr);
+		if (*foundPtr)
+			pgBufferUsage.local_blks_hit++;
+	}
+	else
+	{
+		bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum,
+							 strategy, foundPtr, io_context);
+		if (*foundPtr)
+			pgBufferUsage.shared_blks_hit++;
+	}
+	if (bmr.rel)
+	{
+		/*
+		 * While pgBufferUsage's "read" counter isn't bumped unless we reach
+		 * WaitReadBuffers() (so, not for hits, and not for buffers that are
+		 * zeroed instead), the per-relation stats always count them.
+		 */
+		pgstat_count_buffer_read(bmr.rel);
+		if (*foundPtr)
+			pgstat_count_buffer_hit(bmr.rel);
+	}
+	if (*foundPtr)
 	{
-		/* Just need to update stats before we exit */
-		*hit = true;
 		VacuumPageHit++;
 		pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
 		if (VacuumCostActive)
 			VacuumCostBalance += VacuumCostPageHit;
 
 		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-										  smgr->smgr_rlocator.locator.spcOid,
-										  smgr->smgr_rlocator.locator.dbOid,
-										  smgr->smgr_rlocator.locator.relNumber,
-										  smgr->smgr_rlocator.backend,
-										  found);
+										  bmr.smgr->smgr_rlocator.locator.spcOid,
+										  bmr.smgr->smgr_rlocator.locator.dbOid,
+										  bmr.smgr->smgr_rlocator.locator.relNumber,
+										  bmr.smgr->smgr_rlocator.backend,
+										  true);
+	}
 
-		/*
-		 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
-		 * on return.
-		 */
-		if (!isLocalBuf)
-		{
-			if (mode == RBM_ZERO_AND_LOCK)
-				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
-							  LW_EXCLUSIVE);
-			else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
-				LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
-		}
+	return BufferDescriptorGetBuffer(bufHdr);
+}
 
-		return BufferDescriptorGetBuffer(bufHdr);
+/*
+ * Begin reading a range of blocks beginning at blockNum and extending for
+ * *nblocks.  On return, up to *nblocks pinned buffers holding those blocks
+ * are written into the buffers array, and *nblocks is updated to contain the
+ * actual number, which may be fewer than requested.
+ *
+ * If false is returned, no I/O is necessary and WaitReadBuffers() is not
+ * necessary.  If true is returned, one I/O has been started, and
+ * WaitReadBuffers() must be called with the same operation object before the
+ * buffers are accessed.  Along with the operation object, the caller-supplied
+ * array of buffers must remain valid until WaitReadBuffers() is called.
+ *
+ * Currently the I/O is only started with optional operating system advice,
+ * and the real I/O happens in WaitReadBuffers().  In future work, true I/O
+ * could be initiated here.
+ */
+bool
+StartReadBuffers(BufferManagerRelation bmr,
+				 Buffer *buffers,
+				 ForkNumber forkNum,
+				 BlockNumber blockNum,
+				 int *nblocks,
+				 BufferAccessStrategy strategy,
+				 int flags,
+				 ReadBuffersOperation *operation)
+{
+	int			actual_nblocks = *nblocks;
+
+	if (bmr.rel)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
 	}
 
-	/*
-	 * if we have gotten to this point, we have allocated a buffer for the
-	 * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
-	 * if it's a shared buffer.
-	 */
-	Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));	/* spinlock not needed */
+	operation->bmr = bmr;
+	operation->forknum = forkNum;
+	operation->blocknum = blockNum;
+	operation->buffers = buffers;
+	operation->nblocks = actual_nblocks;
+	operation->strategy = strategy;
+	operation->flags = flags;
 
-	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
+	operation->io_buffers_len = 0;
 
-	/*
-	 * Read in the page, unless the caller intends to overwrite it and just
-	 * wants us to allocate a buffer.
-	 */
-	if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
-		MemSet((char *) bufBlock, 0, BLCKSZ);
-	else
+	for (int i = 0; i < actual_nblocks; ++i)
 	{
-		instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
+		bool		found;
 
-		smgrread(smgr, forkNum, blockNum, bufBlock);
+		buffers[i] = PrepareReadBuffer(bmr,
+									   forkNum,
+									   blockNum + i,
+									   strategy,
+									   &found);
 
-		pgstat_count_io_op_time(io_object, io_context,
-								IOOP_READ, io_start, 1);
+		if (found)
+		{
+			/*
+			 * Terminate the read as soon as we get a hit.  It could be a
+			 * single buffer hit, or it could be a hit that follows a readable
+			 * range.  We don't want to create more than one readable range,
+			 * so we stop here.
+			 */
+			actual_nblocks = operation->nblocks = *nblocks = i + 1;
+		}
+		else
+		{
+			/* Extend the readable range to cover this block. */
+			operation->io_buffers_len++;
+		}
+	}
 
-		/* check for garbage data */
-		if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
-									PIV_LOG_WARNING | PIV_REPORT_STAT))
+	if (operation->io_buffers_len > 0)
+	{
+		if (flags & READ_BUFFERS_ISSUE_ADVICE)
 		{
-			if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
-			{
-				ereport(WARNING,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s; zeroing out page",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-				MemSet((char *) bufBlock, 0, BLCKSZ);
-			}
-			else
-				ereport(ERROR,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
+			/*
+			 * In theory we should only do this if PrepareReadBuffers() had to
+			 * allocate new buffers above.  That way, if two calls to
+			 * StartReadBuffers() were made for the same blocks before
+			 * WaitReadBuffers(), only the first would issue the advice.
+			 * That'd be a better simulation of true asynchronous I/O, which
+			 * would only start the I/O once, but isn't done here for
+			 * simplicity.  Note also that the following call might actually
+			 * issue two advice calls if we cross a segment boundary; in a
+			 * true asynchronous version we might choose to process only one
+			 * real I/O at a time in that case.
+			 */
+			smgrprefetch(bmr.smgr, forkNum, blockNum, operation->io_buffers_len);
 		}
+
+		/* Indicate that WaitReadBuffers() should be called. */
+		return true;
 	}
+	else
+	{
+		return false;
+	}
+}
 
-	/*
-	 * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer
-	 * content lock before marking the page as valid, to make sure that no
-	 * other backend sees the zeroed page before the caller has had a chance
-	 * to initialize it.
-	 *
-	 * Since no-one else can be looking at the page contents yet, there is no
-	 * difference between an exclusive lock and a cleanup-strength lock. (Note
-	 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
-	 * they assert that the buffer is already valid.)
-	 */
-	if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
-		!isLocalBuf)
+static inline bool
+WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+{
+	if (BufferIsLocal(buffer))
 	{
-		LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+
+		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
 	}
+	else
+		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
+}
+
+void
+WaitReadBuffers(ReadBuffersOperation *operation)
+{
+	BufferManagerRelation bmr;
+	Buffer	   *buffers;
+	int			nblocks;
+	BlockNumber blocknum;
+	ForkNumber	forknum;
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	/*
+	 * Currently operations are only allowed to include a read of some range,
+	 * with an optional extra buffer that is already pinned at the end.  So
+	 * nblocks can be at most one more than io_buffers_len.
+	 */
+	Assert((operation->nblocks == operation->io_buffers_len) ||
+		   (operation->nblocks == operation->io_buffers_len + 1));
 
+	/* Find the range of the physical read we need to perform. */
+	nblocks = operation->io_buffers_len;
+	if (nblocks == 0)
+		return;					/* nothing to do */
+
+	buffers = &operation->buffers[0];
+	blocknum = operation->blocknum;
+	forknum = operation->forknum;
+	bmr = operation->bmr;
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
 	if (isLocalBuf)
 	{
-		/* Only need to adjust flags */
-		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
-
-		buf_state |= BM_VALID;
-		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
 	}
 	else
 	{
-		/* Set BM_VALID, terminate IO, and wake up any waiters */
-		TerminateBufferIO(bufHdr, false, BM_VALID, true);
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
 	}
 
-	VacuumPageMiss++;
-	if (VacuumCostActive)
-		VacuumCostBalance += VacuumCostPageMiss;
+	/*
+	 * We count all these blocks as read by this backend.  This is traditional
+	 * behavior, but might turn out to be not true if we find that someone
+	 * else has beaten us and completed the read of some of these blocks.  In
+	 * that case the system globally double-counts, but we traditionally don't
+	 * count this as a "hit", and we don't have a separate counter for "miss,
+	 * but another backend completed the read".
+	 */
+	if (isLocalBuf)
+		pgBufferUsage.local_blks_read += nblocks;
+	else
+		pgBufferUsage.shared_blks_read += nblocks;
 
-	TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-									  smgr->smgr_rlocator.locator.spcOid,
-									  smgr->smgr_rlocator.locator.dbOid,
-									  smgr->smgr_rlocator.locator.relNumber,
-									  smgr->smgr_rlocator.backend,
-									  found);
+	for (int i = 0; i < nblocks; ++i)
+	{
+		int			io_buffers_len;
+		Buffer		io_buffers[MAX_BUFFERS_PER_TRANSFER];
+		void	   *io_pages[MAX_BUFFERS_PER_TRANSFER];
+		instr_time	io_start;
+		BlockNumber io_first_block;
 
-	return BufferDescriptorGetBuffer(bufHdr);
+		/*
+		 * Skip this block if someone else has already completed it.  If an
+		 * I/O is already in progress in another backend, this will wait for
+		 * the outcome: either done, or something went wrong and we will
+		 * retry.
+		 */
+		if (!WaitReadBuffersCanStartIO(buffers[i], false))
+		{
+			/*
+			 * Report this as a 'hit' for this backend, even though it must
+			 * have started out as a miss in PrepareReadBuffer().
+			 */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  true);
+			continue;
+		}
+
+		/* We found a buffer that we need to read in. */
+		io_buffers[0] = buffers[i];
+		io_pages[0] = BufferGetBlock(buffers[i]);
+		io_first_block = blocknum + i;
+		io_buffers_len = 1;
+
+		/*
+		 * How many neighboring-on-disk blocks can we can scatter-read into
+		 * other buffers at the same time?  In this case we don't wait if we
+		 * see an I/O already in progress.  We already hold BM_IO_IN_PROGRESS
+		 * for the head block, so we should get on with that I/O as soon as
+		 * possible.  We'll come back to this block again, above.
+		 */
+		while ((i + 1) < nblocks &&
+			   WaitReadBuffersCanStartIO(buffers[i + 1], true))
+		{
+			/* Must be consecutive block numbers. */
+			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
+				   BufferGetBlockNumber(buffers[i]) + 1);
+
+			io_buffers[io_buffers_len] = buffers[++i];
+			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+		}
+
+		io_start = pgstat_prepare_io_time(track_io_timing);
+		smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len);
+		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
+								io_buffers_len);
+
+		/* Verify each block we read, and terminate the I/O. */
+		for (int j = 0; j < io_buffers_len; ++j)
+		{
+			BufferDesc *bufHdr;
+			Block		bufBlock;
+
+			if (isLocalBuf)
+			{
+				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
+				bufBlock = LocalBufHdrGetBlock(bufHdr);
+			}
+			else
+			{
+				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
+				bufBlock = BufHdrGetBlock(bufHdr);
+			}
+
+			/* check for garbage data */
+			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
+										PIV_LOG_WARNING | PIV_REPORT_STAT))
+			{
+				if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
+				{
+					ereport(WARNING,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s; zeroing out page",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+					memset(bufBlock, 0, BLCKSZ);
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+			}
+
+			/* Terminate I/O and set BM_VALID. */
+			if (isLocalBuf)
+			{
+				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+				buf_state |= BM_VALID;
+				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+			}
+			else
+			{
+				/* Set BM_VALID, terminate IO, and wake up any waiters */
+				TerminateBufferIO(bufHdr, false, BM_VALID, true);
+			}
+
+			/* Report I/Os as completing individually. */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  false);
+		}
+
+		VacuumPageMiss += io_buffers_len;
+		if (VacuumCostActive)
+			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	}
 }
 
 /*
- * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
- *		buffer.  If no buffer exists already, selects a replacement
- *		victim and evicts the old page, but does NOT read in new page.
+ * BufferAlloc -- subroutine for StartReadBuffers.  Handles lookup of a shared
+ *		buffer.  If no buffer exists already, selects a replacement victim and
+ *		evicts the old page, but does NOT read in new page.
  *
  * "strategy" can be a buffer replacement strategy object, or NULL for
  * the default strategy.  The selected buffer's usage_count is advanced when
@@ -1224,11 +1476,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  *
  * The returned buffer is pinned and is already marked as holding the
  * desired page.  If it already did have the desired page, *foundPtr is
- * set true.  Otherwise, *foundPtr is set false and the buffer is marked
- * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
- *
- * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
- * we keep it for simplicity in ReadBuffer.
+ * set true.  Otherwise, *foundPtr is set false.
  *
  * io_context is passed as an output parameter to avoid calling
  * IOContextForStrategy() when there is a shared buffers hit and no IO
@@ -1287,19 +1535,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called StartReadBuffers() but not yet WaitReadBuffers().
 			 */
-			if (StartBufferIO(buf, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return buf;
@@ -1364,19 +1603,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called StartReadBuffers() but not yet WaitReadBuffers().
 			 */
-			if (StartBufferIO(existing_buf_hdr, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return existing_buf_hdr;
@@ -1408,15 +1638,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	LWLockRelease(newPartitionLock);
 
 	/*
-	 * Buffer contents are currently invalid.  Try to obtain the right to
-	 * start I/O.  If StartBufferIO returns false, then someone else managed
-	 * to read it before we did, so there's nothing left for BufferAlloc() to
-	 * do.
+	 * Buffer contents are currently invalid.
 	 */
-	if (StartBufferIO(victim_buf_hdr, true))
-		*foundPtr = false;
-	else
-		*foundPtr = true;
+	*foundPtr = false;
 
 	return victim_buf_hdr;
 }
@@ -1770,7 +1994,7 @@ again:
  * pessimistic, but outside of toy-sized shared_buffers it should allow
  * sufficient pins.
  */
-static void
+void
 LimitAdditionalPins(uint32 *additional_pins)
 {
 	uint32		max_backends;
@@ -2035,7 +2259,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 
 				buf_state &= ~BM_VALID;
 				UnlockBufHdr(existing_hdr, buf_state);
-			} while (!StartBufferIO(existing_hdr, true));
+			} while (!StartBufferIO(existing_hdr, true, false));
 		}
 		else
 		{
@@ -2058,7 +2282,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			LWLockRelease(partition_lock);
 
 			/* XXX: could combine the locked operations in it with the above */
-			StartBufferIO(victim_buf_hdr, true);
+			StartBufferIO(victim_buf_hdr, true, false);
 		}
 	}
 
@@ -2373,7 +2597,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 	else
 	{
 		/*
-		 * If we previously pinned the buffer, it must surely be valid.
+		 * If we previously pinned the buffer, it is likely to be valid, but
+		 * it may not be if StartReadBuffers() was called and
+		 * WaitReadBuffers() hasn't been called yet.  We'll check by loading
+		 * the flags without locking.  This is racy, but it's OK to return
+		 * false spuriously: when WaitReadBuffers() calls StartBufferIO(),
+		 * it'll see that it's now valid.
 		 *
 		 * Note: We deliberately avoid a Valgrind client request here.
 		 * Individual access methods can optionally superimpose buffer page
@@ -2382,7 +2611,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 		 * that the buffer page is legitimately non-accessible here.  We
 		 * cannot meddle with that.
 		 */
-		result = true;
+		result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
 	}
 
 	ref->refcount++;
@@ -3450,7 +3679,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * someone else flushed the buffer before we could, so we need not do
 	 * anything.
 	 */
-	if (!StartBufferIO(buf, false))
+	if (!StartBufferIO(buf, false, false))
 		return;
 
 	/* Setup error traceback support for ereport() */
@@ -5185,9 +5414,15 @@ WaitIO(BufferDesc *buf)
  *
  * Returns true if we successfully marked the buffer as I/O busy,
  * false if someone else already did the work.
+ *
+ * If nowait is true, then we don't wait for an I/O to be finished by another
+ * backend.  In that case, false indicates either that the I/O was already
+ * finished, or is still in progress.  This is useful for callers that want to
+ * find out if they can perform the I/O as part of a larger operation, without
+ * waiting for the answer or distinguishing the reasons why not.
  */
 static bool
-StartBufferIO(BufferDesc *buf, bool forInput)
+StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 {
 	uint32		buf_state;
 
@@ -5200,6 +5435,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
 		UnlockBufHdr(buf, buf_state);
+		if (nowait)
+			return false;
 		WaitIO(buf);
 	}
 
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 1f02fed250e..6956d4e5b49 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -109,10 +109,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
  * LocalBufferAlloc -
  *	  Find or create a local buffer for the given page of the given relation.
  *
- * API is similar to bufmgr.c's BufferAlloc, except that we do not need
- * to do any locking since this is all local.   Also, IO_IN_PROGRESS
- * does not get set.  Lastly, we support only default access strategy
- * (hence, usage_count is always advanced).
+ * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
+ * any locking since this is all local.  We support only default access
+ * strategy (hence, usage_count is always advanced).
  */
 BufferDesc *
 LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
@@ -288,7 +287,7 @@ GetLocalVictimBuffer(void)
 }
 
 /* see LimitAdditionalPins() */
-static void
+void
 LimitAdditionalLocalPins(uint32 *additional_pins)
 {
 	uint32		max_pins;
@@ -298,9 +297,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins)
 
 	/*
 	 * In contrast to LimitAdditionalPins() other backends don't play a role
-	 * here. We can allow up to NLocBuffer pins in total.
+	 * here. We can allow up to NLocBuffer pins in total, but it might not be
+	 * initialized yet so read num_temp_buffers.
 	 */
-	max_pins = (NLocBuffer - NLocalPinnedBuffers);
+	max_pins = (num_temp_buffers - NLocalPinnedBuffers);
 
 	if (*additional_pins >= max_pins)
 		*additional_pins = max_pins;
diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build
index 40345bdca27..739d13293fb 100644
--- a/src/backend/storage/meson.build
+++ b/src/backend/storage/meson.build
@@ -1,5 +1,6 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
+subdir('aio')
 subdir('buffer')
 subdir('file')
 subdir('freespace')
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index d51d46d3353..b57f71f97e3 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,6 +14,7 @@
 #ifndef BUFMGR_H
 #define BUFMGR_H
 
+#include "port/pg_iovec.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -158,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 #define BUFFER_LOCK_SHARE		1
 #define BUFFER_LOCK_EXCLUSIVE	2
 
+/*
+ * Maximum number of buffers for multi-buffer I/O functions.  This is set to
+ * allow 128kB transfers, unless BLCKSZ and IOV_MAX imply a a smaller maximum.
+ */
+#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ)
 
 /*
  * prototypes for functions in bufmgr.c
@@ -177,6 +183,42 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
 										ForkNumber forkNum, BlockNumber blockNum,
 										ReadBufferMode mode, BufferAccessStrategy strategy,
 										bool permanent);
+
+#define READ_BUFFERS_ZERO_ON_ERROR 0x01
+#define READ_BUFFERS_ISSUE_ADVICE 0x02
+
+/*
+ * Private state used by StartReadBuffers() and WaitReadBuffers().  Declared
+ * in public header only to allow inclusion in other structs, but contents
+ * should not be accessed.
+ */
+struct ReadBuffersOperation
+{
+	/* Parameters passed in to StartReadBuffers(). */
+	BufferManagerRelation bmr;
+	Buffer	   *buffers;
+	ForkNumber	forknum;
+	BlockNumber blocknum;
+	int			nblocks;
+	BufferAccessStrategy strategy;
+	int			flags;
+
+	/* Range of buffers, if we need to perform a read. */
+	int			io_buffers_len;
+};
+
+typedef struct ReadBuffersOperation ReadBuffersOperation;
+
+extern bool StartReadBuffers(BufferManagerRelation bmr,
+							 Buffer *buffers,
+							 ForkNumber forknum,
+							 BlockNumber blocknum,
+							 int *nblocks,
+							 BufferAccessStrategy strategy,
+							 int flags,
+							 ReadBuffersOperation *operation);
+extern void WaitReadBuffers(ReadBuffersOperation *operation);
+
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern bool BufferIsExclusiveLocked(Buffer buffer);
@@ -250,6 +292,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern void LimitAdditionalPins(uint32 *additional_pins);
+extern void LimitAdditionalLocalPins(uint32 *additional_pins);
+
 /* in buf_init.c */
 extern void InitBufferPool(void);
 extern Size BufferShmemSize(void);
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
new file mode 100644
index 00000000000..c4d3892bb26
--- /dev/null
+++ b/src/include/storage/streaming_read.h
@@ -0,0 +1,52 @@
+#ifndef STREAMING_READ_H
+#define STREAMING_READ_H
+
+#include "storage/bufmgr.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
+
+/* Default tuning, reasonable for many users. */
+#define PGSR_FLAG_DEFAULT 0x00
+
+/*
+ * I/O streams that are performing maintenance work on behalf of potentially
+ * many users.
+ */
+#define PGSR_FLAG_MAINTENANCE 0x01
+
+/*
+ * We usually avoid issuing prefetch advice automatically when sequential
+ * access is detected, but this flag explicitly disables it, for cases that
+ * might not be correctly detected.  Explicit advice is known to perform worse
+ * than letting the kernel (at least Linux) detect sequential access.
+ */
+#define PGSR_FLAG_SEQUENTIAL 0x02
+
+/*
+ * We usually ramp up from smaller reads to larger ones, to support users who
+ * don't know if it's worth reading lots of buffers yet.  This flag disables
+ * that, declaring ahead of time that we'll be reading all available buffers.
+ */
+#define PGSR_FLAG_FULL 0x04
+
+struct PgStreamingRead;
+typedef struct PgStreamingRead PgStreamingRead;
+
+/* Callback that returns the next block number to read. */
+typedef BlockNumber (*PgStreamingReadBufferCB) (PgStreamingRead *pgsr,
+												void *pgsr_private,
+												void *per_buffer_private);
+
+extern PgStreamingRead *pg_streaming_read_buffer_alloc(int flags,
+													   void *pgsr_private,
+													   size_t per_buffer_private_size,
+													   BufferAccessStrategy strategy,
+													   BufferManagerRelation bmr,
+													   ForkNumber forknum,
+													   PgStreamingReadBufferCB next_block_cb);
+
+extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
+extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private);
+extern void pg_streaming_read_free(PgStreamingRead *pgsr);
+
+#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index fc8b15d0cf2..cfb58cf4836 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2097,6 +2097,8 @@ PgStat_TableCounts
 PgStat_TableStatus
 PgStat_TableXactStatus
 PgStat_WalStats
+PgStreamingRead
+PgStreamingReadRange
 PgXmlErrorContext
 PgXmlStrictness
 Pg_finfo_record
@@ -2267,6 +2269,7 @@ ReInitializeDSMForeignScan_function
 ReScanForeignScan_function
 ReadBufPtrType
 ReadBufferMode
+ReadBuffersOperation
 ReadBytePtrType
 ReadExtraTocPtrType
 ReadFunc
-- 
2.37.2

