From c13c056bb33268bffb85922d25e8b01b14083e5c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 9 Apr 2026 20:02:02 +1200
Subject: [PATCH v2 1/6] Provide astreamer_plain_reader for reading files.

A new public API astreamer_pull_content() can be used to tell it to read
more data and send it to the next astreamer.  astreamer_plain_reader is
the opposite of astreamer_plain_writer.
---
 src/fe_utils/astreamer_file.c    | 87 ++++++++++++++++++++++++++++++++
 src/include/fe_utils/astreamer.h | 19 ++++++-
 src/tools/pgindent/typedefs.list |  1 +
 3 files changed, 105 insertions(+), 2 deletions(-)

diff --git a/src/fe_utils/astreamer_file.c b/src/fe_utils/astreamer_file.c
index 0fca70a4f86..3fb9ed72eea 100644
--- a/src/fe_utils/astreamer_file.c
+++ b/src/fe_utils/astreamer_file.c
@@ -21,6 +21,18 @@
 #include "common/logging.h"
 #include "fe_utils/astreamer.h"
 
+/* Size of internal buffer used by astreamer_plain_reader. */
+#define ASTREAMER_PLAIN_READER_BUFFER (128 * 1024)
+
+typedef struct astreamer_plain_reader
+{
+	astreamer	base;
+	FILE	   *file;
+	size_t		size;
+	char		pathname[MAXPGPATH];
+	char		data[ASTREAMER_PLAIN_READER_BUFFER];
+} astreamer_plain_reader;
+
 typedef struct astreamer_plain_writer
 {
 	astreamer	base;
@@ -39,6 +51,16 @@ typedef struct astreamer_extractor
 	FILE	   *file;
 } astreamer_extractor;
 
+static bool astreamer_plain_reader_pull_content(astreamer *streamer);
+static void astreamer_plain_reader_finalize(astreamer *streamer);
+static void astreamer_plain_reader_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_plain_reader_ops = {
+	.pull_content = astreamer_plain_reader_pull_content,
+	.finalize = astreamer_plain_reader_finalize,
+	.free = astreamer_plain_reader_free
+};
+
 static void astreamer_plain_writer_content(astreamer *streamer,
 										   astreamer_member *member,
 										   const char *data, int len,
@@ -68,6 +90,71 @@ static const astreamer_ops astreamer_extractor_ops = {
 	.free = astreamer_extractor_free
 };
 
+/*
+ * Create a 'source' astreamer that just reads data from a file.
+ *
+ * It must be first in a chain of astreamers, and it should be asked to read
+ * more of the file by calling astreamer_pull().  Each time you do that, it
+ * pushes some raw bytes with context ASTREAMER_UNKNOWN into the astreamer
+ * provided as 'next'.
+ */
+astreamer *
+astreamer_plain_reader_new(astreamer *next, const char *pathname)
+{
+	astreamer_plain_reader *streamer;
+
+	streamer = palloc_object(astreamer_plain_reader);
+	*((const astreamer_ops **) &streamer->base.bbs_ops) =
+		&astreamer_plain_reader_ops;
+	streamer->base.bbs_next = next;
+	strlcpy(streamer->pathname, pathname, sizeof(streamer->pathname));
+	streamer->file = fopen(pathname, "r");
+	if (streamer->file == NULL)
+		pg_fatal("astreamer_plain_reader: could not open file \"%s\"",
+				 pathname);
+
+	return &streamer->base;
+}
+
+static bool
+astreamer_plain_reader_pull_content(astreamer *streamer)
+{
+	astreamer_plain_reader *mystreamer = (astreamer_plain_reader *) streamer;
+
+	mystreamer->size = fread(mystreamer->data,
+							 1,
+							 sizeof(mystreamer->data),
+							 mystreamer->file);
+
+	if (mystreamer->size == 0)
+	{
+		if (ferror(mystreamer->file))
+			pg_fatal("could not read file \"%s\"", mystreamer->pathname);
+		return false;
+	}
+	astreamer_content(mystreamer->base.bbs_next,
+					  NULL,
+					  mystreamer->data,
+					  mystreamer->size,
+					  ASTREAMER_UNKNOWN);
+	return true;
+}
+
+static void
+astreamer_plain_reader_finalize(astreamer *streamer)
+{
+	astreamer_finalize(streamer->bbs_next);
+}
+
+static void
+astreamer_plain_reader_free(astreamer *streamer)
+{
+	astreamer_plain_reader *mystreamer = (astreamer_plain_reader *) streamer;
+
+	pclose(mystreamer->file);
+	pfree(mystreamer);
+}
+
 /*
  * Create a astreamer that just writes data to a file.
  *
diff --git a/src/include/fe_utils/astreamer.h b/src/include/fe_utils/astreamer.h
index 8329e4efbc5..2509d157bc5 100644
--- a/src/include/fe_utils/astreamer.h
+++ b/src/include/fe_utils/astreamer.h
@@ -114,8 +114,10 @@ struct astreamer
 };
 
 /*
- * There are three callbacks for a astreamer. The 'content' callback is
- * called repeatedly, as described in the astreamer_archive_context comments.
+ * There are four callbacks for an astreamer. The 'content' callback is called
+ * repeatedly, as described in the astreamer_archive_context comments, to push
+ * data through an astreamer chain. The 'pull_content' variant is an
+ * alternative, for certain astreamers that act as a source of data themselves.
  * Then, the 'finalize' callback is called once at the end, to give the
  * astreamer a chance to perform cleanup such as closing files. Finally,
  * because this code is running in a frontend environment where, as of this
@@ -125,6 +127,7 @@ struct astreamer
  */
 struct astreamer_ops
 {
+	bool		(*pull_content) (astreamer *streamer);
 	void		(*content) (astreamer *streamer, astreamer_member *member,
 							const char *data, int len,
 							astreamer_archive_context context);
@@ -132,6 +135,16 @@ struct astreamer_ops
 	void		(*free) (astreamer *streamer);
 };
 
+/*
+ * Tell a 'source' astreamer to consume content from the source it represents,
+ * and report whether there is any more data.
+ */
+static inline bool
+astreamer_pull_content(astreamer *streamer)
+{
+	return streamer->bbs_ops->pull_content(streamer);
+}
+
 /* Send some content to a astreamer. */
 static inline void
 astreamer_content(astreamer *streamer, astreamer_member *member,
@@ -210,6 +223,8 @@ astreamer_buffer_until(astreamer *streamer, const char **data, int *len,
  * Functions for creating astreamer objects of various types. See the header
  * comments for each of these functions for details.
  */
+extern astreamer *astreamer_plain_reader_new(astreamer *next,
+											 const char *pathname);
 extern astreamer *astreamer_plain_writer_new(char *pathname, FILE *file);
 extern astreamer *astreamer_gzip_writer_new(char *pathname, FILE *file,
 											pg_compress_specification *compress);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ea95e7984bc..a936c2c9c49 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3592,6 +3592,7 @@ astreamer_gzip_writer
 astreamer_lz4_frame
 astreamer_member
 astreamer_ops
+astreamer_plain_reader
 astreamer_plain_writer
 astreamer_recovery_injector
 astreamer_tar_archiver
-- 
2.53.0

