From 1a30aa3f3f5cd0ec54082961ba9e52fbc7dfe28c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 10 Apr 2026 16:22:30 +1200
Subject: [PATCH v2 2/6] pg_waldump: Read data with astreamer_plain_reader.

Previously, archive_waldump.c would read a file and "push" data into
astreamer_content().  Now, it attaches astreamer_plain_reader to the
source end of the astreamer chain, and tells it to "pull" data in with
astreamer_pull_content().  The end result is the same.
---
 src/bin/pg_waldump/archive_waldump.c | 69 ++++++----------------------
 src/bin/pg_waldump/pg_waldump.h      |  5 +-
 2 files changed, 14 insertions(+), 60 deletions(-)

diff --git a/src/bin/pg_waldump/archive_waldump.c b/src/bin/pg_waldump/archive_waldump.c
index e4a4bf44a7e..b27888a5056 100644
--- a/src/bin/pg_waldump/archive_waldump.c
+++ b/src/bin/pg_waldump/archive_waldump.c
@@ -23,11 +23,6 @@
 #include "fe_utils/simple_list.h"
 #include "pg_waldump.h"
 
-/*
- * How many bytes should we try to read from a file at once?
- */
-#define READ_CHUNK_SIZE				(128 * 1024)
-
 /* Temporary directory for spilled WAL segment files */
 char	   *TmpWalSegDir = NULL;
 
@@ -129,21 +124,18 @@ void
 init_archive_reader(XLogDumpPrivate *privateInfo,
 					pg_compress_algorithm compression)
 {
-	int			fd;
 	astreamer  *streamer;
 	ArchivedWALFile *entry = NULL;
 	XLogLongPageHeader longhdr;
 	ArchivedWAL_iterator iter;
+	char		pathname[MAXPGPATH];
 
-	/* Open tar archive and store its file descriptor */
-	fd = open_file_in_directory(privateInfo->archive_dir,
-								privateInfo->archive_name);
-
-	if (fd < 0)
-		pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
-
-	privateInfo->archive_fd = fd;
-	privateInfo->archive_fd_eof = false;
+	/* Construct tar archive pathname. */
+	snprintf(pathname,
+			 sizeof(pathname),
+			 "%s/%s",
+			 privateInfo->archive_dir,
+			 privateInfo->archive_name);
 
 	streamer = astreamer_waldump_new(privateInfo);
 
@@ -158,14 +150,10 @@ init_archive_reader(XLogDumpPrivate *privateInfo,
 	else if (compression == PG_COMPRESSION_ZSTD)
 		streamer = astreamer_zstd_decompressor_new(streamer);
 
-	privateInfo->archive_streamer = streamer;
+	/* And before that, we have to read the file. */
+	streamer = astreamer_plain_reader_new(streamer, pathname);
 
-	/*
-	 * Allocate a buffer for reading the archive file to begin content
-	 * decoding.
-	 */
-	privateInfo->archive_read_buf = pg_malloc(READ_CHUNK_SIZE);
-	privateInfo->archive_read_buf_size = READ_CHUNK_SIZE;
+	privateInfo->archive_streamer = streamer;
 
 	/*
 	 * Hash table storing WAL entries read from the archive with an arbitrary
@@ -278,18 +266,6 @@ free_archive_reader(XLogDumpPrivate *privateInfo)
 		ArchivedWAL_destroy(privateInfo->archive_wal_htab);
 		privateInfo->archive_wal_htab = NULL;
 	}
-
-	/* Free the reusable read buffer. */
-	if (privateInfo->archive_read_buf != NULL)
-	{
-		pg_free(privateInfo->archive_read_buf);
-		privateInfo->archive_read_buf = NULL;
-	}
-
-	/* Close the file. */
-	if (close(privateInfo->archive_fd) != 0)
-		pg_log_error("could not close file \"%s\": %m",
-					 privateInfo->archive_name);
 }
 
 /*
@@ -537,28 +513,11 @@ get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
 static bool
 read_archive_file(XLogDumpPrivate *privateInfo)
 {
-	int			rc;
-
-	/* Fail if we already reached EOF in a prior call. */
-	if (privateInfo->archive_fd_eof)
+	if (privateInfo->archive_streamer_finalized)
 		return false;
 
 	/* Try to read some more data. */
-	rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf,
-			  privateInfo->archive_read_buf_size);
-	if (rc < 0)
-		pg_fatal("could not read file \"%s\": %m",
-				 privateInfo->archive_name);
-
-	/*
-	 * Decompress (if required), and then parse the previously read contents
-	 * of the tar file.
-	 */
-	if (rc > 0)
-		astreamer_content(privateInfo->archive_streamer, NULL,
-						  privateInfo->archive_read_buf, rc,
-						  ASTREAMER_UNKNOWN);
-	else
+	if (!astreamer_pull_content(privateInfo->archive_streamer))
 	{
 		/*
 		 * We reached EOF, but there is probably still data queued in the
@@ -566,10 +525,8 @@ read_archive_file(XLogDumpPrivate *privateInfo)
 		 * process everything.
 		 */
 		astreamer_finalize(privateInfo->archive_streamer);
-		/* Set flag to ensure we don't finalize more than once. */
-		privateInfo->archive_fd_eof = true;
+		privateInfo->archive_streamer_finalized = true;
 	}
-
 	return true;
 }
 
diff --git a/src/bin/pg_waldump/pg_waldump.h b/src/bin/pg_waldump/pg_waldump.h
index bd46d14f3a8..d9f1b5d4726 100644
--- a/src/bin/pg_waldump/pg_waldump.h
+++ b/src/bin/pg_waldump/pg_waldump.h
@@ -34,12 +34,9 @@ typedef struct XLogDumpPrivate
 	/* Fields required to read WAL from archive */
 	char	   *archive_dir;
 	char	   *archive_name;	/* Tar archive filename */
-	int			archive_fd;		/* File descriptor for the open tar file */
-	bool		archive_fd_eof; /* Have we reached EOF on archive_fd? */
 
 	astreamer  *archive_streamer;
-	char	   *archive_read_buf;	/* Reusable read buffer for archive I/O */
-	Size		archive_read_buf_size;
+	bool		archive_streamer_finalized;
 
 	/*
 	 * The buffer for the WAL file the archive streamer is currently reading,
-- 
2.53.0

