From 303ea3f1d93735e968856d3c923b686c05046bd4 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 4 Apr 2026 16:48:14 +1300
Subject: [PATCH v3 4/6] libarchive: Provide astreamer_libarchive_reader.

This allows modern tar files (and potential other unrelated formats) to
be consumed from a file, with support for various compression
algorithms.
---
 src/fe_utils/Makefile               |   4 +
 src/fe_utils/astreamer_libarchive.c | 232 ++++++++++++++++++++++++++++
 src/fe_utils/meson.build            |   4 +
 src/include/fe_utils/astreamer.h    |  12 ++
 src/tools/pgindent/typedefs.list    |   1 +
 5 files changed, 253 insertions(+)
 create mode 100644 src/fe_utils/astreamer_libarchive.c

diff --git a/src/fe_utils/Makefile b/src/fe_utils/Makefile
index cbfbf93ac69..f6c88a73ee7 100644
--- a/src/fe_utils/Makefile
+++ b/src/fe_utils/Makefile
@@ -40,6 +40,10 @@ OBJS = \
 	string_utils.o \
 	version.o
 
+ifeq ($(with_libarchive), yes)
+OBJS += astreamer_libarchive.o
+endif
+
 ifeq ($(PORTNAME), win32)
 override CPPFLAGS += -DFD_SETSIZE=1024
 endif
diff --git a/src/fe_utils/astreamer_libarchive.c b/src/fe_utils/astreamer_libarchive.c
new file mode 100644
index 00000000000..967eca84abe
--- /dev/null
+++ b/src/fe_utils/astreamer_libarchive.c
@@ -0,0 +1,232 @@
+/*-------------------------------------------------------------------------
+ *
+ * astreamer_libarchive.c
+ *
+ * This module reads from archives using https://www.libarchive.org/.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/fe_utils/astreamer_libarchive.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <archive.h>
+#include <archive_entry.h>
+
+#include "common/logging.h"
+#include "fe_utils/astreamer.h"
+
+/* This is the data size we'll try to stream at once. */
+#define ASTREAMER_LIBARCHIVE_READER_BUFFER_SIZE (128 * 1024)
+
+typedef struct astreamer_libarchive_reader
+{
+	astreamer	base;
+	astreamer_member member;
+	struct archive *archive;
+	bool		end_of_file;
+	bool		end_of_archive;
+	char		data[ASTREAMER_LIBARCHIVE_READER_BUFFER_SIZE];
+} astreamer_libarchive_reader;
+
+static bool astreamer_libarchive_reader_pull_content(astreamer *streamer);
+static void astreamer_libarchive_reader_finalize(astreamer *streamer);
+static void astreamer_libarchive_reader_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_libarchive_reader_ops = {
+	.pull_content = astreamer_libarchive_reader_pull_content,
+	.finalize = astreamer_libarchive_reader_finalize,
+	.free = astreamer_libarchive_reader_free
+};
+
+/*
+ * Create an astreamer that decodes 'pathname' with libarchive and feeds its
+ * contents to 'next'.  This streamer is a source that must be the first in
+ * the chain, and content should be produced by calling
+ * astreamer_pull_content(streamer).
+ */
+astreamer *
+astreamer_libarchive_reader_new(astreamer *next, const char *pathname)
+{
+	astreamer_libarchive_reader *streamer;
+	int			r;
+
+	streamer = palloc0_object(astreamer_libarchive_reader);
+	*((const astreamer_ops **) &streamer->base.bbs_ops) =
+		&astreamer_libarchive_reader_ops;
+	streamer->base.bbs_next = next;
+
+	/* Prepare to read tar archives with any known compression filter. */
+	streamer->archive = archive_read_new();
+	if (streamer->archive == NULL)
+		pg_fatal("out of memory");
+	if (archive_read_support_format_tar(streamer->archive) != ARCHIVE_OK)
+		pg_fatal("libarchive: could not initialize tar format: %s",
+				 archive_error_string(streamer->archive));
+	if (archive_read_support_filter_all(streamer->archive) != ARCHIVE_OK)
+		pg_fatal("libarchive: could not initialize tar filter: %s",
+				 archive_error_string(streamer->archive));
+
+	/* Open file. */
+	r = archive_read_open_filename(streamer->archive,
+								   pathname,
+								   ASTREAMER_LIBARCHIVE_READER_BUFFER_SIZE);
+	if (r != ARCHIVE_OK)
+		pg_fatal("libarchive: could not open \"%s\": %s",
+				 pathname,
+				 archive_error_string(streamer->archive));
+
+	/* Start by wanting a new file. */
+	streamer->end_of_file = true;
+	streamer->end_of_archive = false;
+
+	return &streamer->base;
+}
+
+/* Fill in an astreamer member given a libarchive entry. */
+static void
+astreamer_libarchive_reader_fill_member(astreamer_member *member,
+										struct archive_entry *entry)
+{
+	strlcpy(member->pathname,
+			archive_entry_pathname(entry),
+			sizeof(member->pathname));
+	member->size = archive_entry_size(entry);
+	member->mode = archive_entry_mode(entry);
+	member->uid = archive_entry_uid(entry);
+	member->gid = archive_entry_gid(entry);
+	switch (archive_entry_filetype(entry))
+	{
+		case AE_IFREG:
+			member->is_regular = true;
+			break;
+		case AE_IFDIR:
+			member->is_directory = true;
+			break;
+		case AE_IFLNK:
+			member->is_symlink = true;
+			strlcpy(member->linktarget,
+					archive_entry_symlink(entry),
+					sizeof(member->linktarget));
+			break;
+		default:
+			break;
+	}
+}
+
+static bool
+astreamer_libarchive_reader_pull_content(astreamer *streamer)
+{
+	astreamer_libarchive_reader *mystreamer;
+	ssize_t		size;
+
+	mystreamer = (astreamer_libarchive_reader *) streamer;
+
+	while (!mystreamer->end_of_archive)
+	{
+		/* Do we need a new file? */
+		if (mystreamer->end_of_file)
+		{
+			struct archive_entry *entry;
+
+			/* Start next file, or discover end of archive. */
+			switch (archive_read_next_header(mystreamer->archive, &entry))
+			{
+				case ARCHIVE_RETRY:
+					continue;
+				case ARCHIVE_FATAL:
+					pg_fatal("libarchive: %s",
+							 archive_error_string(mystreamer->archive));
+					break;
+				case ARCHIVE_WARN:
+					pg_log_warning("libarchive: %s",
+								   archive_error_string(mystreamer->archive));
+					pg_fallthrough;
+				case ARCHIVE_OK:
+					/* Send file header, then fall through to send one chunk. */
+					mystreamer->end_of_file = false;
+					astreamer_libarchive_reader_fill_member(&mystreamer->member,
+															entry);
+					astreamer_content(mystreamer->base.bbs_next,
+									  &mystreamer->member,
+									  NULL,
+									  0,
+									  ASTREAMER_MEMBER_HEADER);
+					break;
+				case ARCHIVE_EOF:
+					/* End of archive. */
+					mystreamer->end_of_archive = true;
+					astreamer_content(mystreamer->base.bbs_next,
+									  NULL,
+									  NULL,
+									  0,
+									  ASTREAMER_ARCHIVE_TRAILER);
+					break;
+				default:
+					pg_fatal("unexpected result from archive_read_next_header()");
+					break;
+			}
+		}
+
+		/* Stream a chunk of data, or discover end of file. */
+		Assert(!mystreamer->end_of_file);
+		size = archive_read_data(mystreamer->archive,
+								 mystreamer->data,
+								 sizeof(mystreamer->data));
+		switch (size)
+		{
+			case ARCHIVE_RETRY:
+				continue;
+			case ARCHIVE_FATAL:
+				pg_fatal("libarchive: %s",
+						 archive_error_string(mystreamer->archive));
+				pg_unreachable();
+			case ARCHIVE_WARN:
+				pg_log_warning("libarchive: %s",
+							   archive_error_string(mystreamer->archive));
+				continue;
+			default:
+				break;
+		}
+
+		if (size == 0)
+		{
+			/* Send trailer, and go around to start another file. */
+			mystreamer->end_of_file = true;
+			astreamer_content(mystreamer->base.bbs_next,
+							  &mystreamer->member,
+							  NULL,
+							  0,
+							  ASTREAMER_MEMBER_TRAILER);
+			continue;
+		}
+
+		/* Stream large chunk and return. */
+		astreamer_content(mystreamer->base.bbs_next,
+						  &mystreamer->member,
+						  mystreamer->data,
+						  size,
+						  ASTREAMER_MEMBER_CONTENTS);
+		return true;
+	}
+	return false;
+}
+
+static void
+astreamer_libarchive_reader_finalize(astreamer *streamer)
+{
+	astreamer_finalize(streamer->bbs_next);
+}
+
+static void
+astreamer_libarchive_reader_free(astreamer *streamer)
+{
+	astreamer_libarchive_reader *mystreamer;
+
+	mystreamer = (astreamer_libarchive_reader *) streamer;
+	archive_free(mystreamer->archive);
+	pfree(streamer);
+}
diff --git a/src/fe_utils/meson.build b/src/fe_utils/meson.build
index 86befca192e..6b95c36e9a5 100644
--- a/src/fe_utils/meson.build
+++ b/src/fe_utils/meson.build
@@ -21,6 +21,10 @@ fe_utils_sources = files(
   'version.c',
 )
 
+if libarchive.found()
+  fe_utils_sources += 'astreamer_libarchive.c'
+endif
+
 psqlscan = custom_target('psqlscan',
   input: 'psqlscan.l',
   output: 'psqlscan.c',
diff --git a/src/include/fe_utils/astreamer.h b/src/include/fe_utils/astreamer.h
index 2509d157bc5..95eb146c734 100644
--- a/src/include/fe_utils/astreamer.h
+++ b/src/include/fe_utils/astreamer.h
@@ -155,6 +155,13 @@ astreamer_content(astreamer *streamer, astreamer_member *member,
 	streamer->bbs_ops->content(streamer, member, data, len, context);
 }
 
+/* Variant for astreamers that produce data themselves. */
+static inline void
+astreamer_pull(astreamer *streamer)
+{
+	astreamer_content(streamer, NULL, NULL, 0, ASTREAMER_UNKNOWN);
+}
+
 /* Finalize a astreamer. */
 static inline void
 astreamer_finalize(astreamer *streamer)
@@ -243,4 +250,9 @@ extern astreamer *astreamer_tar_parser_new(astreamer *next);
 extern astreamer *astreamer_tar_terminator_new(astreamer *next);
 extern astreamer *astreamer_tar_archiver_new(astreamer *next);
 
+#ifdef USE_LIBARCHIVE
+extern astreamer *astreamer_libarchive_reader_new(astreamer *next,
+												  const char *pathname);
+#endif
+
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a936c2c9c49..0f5caaff4b4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3589,6 +3589,7 @@ astreamer_archive_context
 astreamer_extractor
 astreamer_gzip_decompressor
 astreamer_gzip_writer
+astreamer_libarchive_reader
 astreamer_lz4_frame
 astreamer_member
 astreamer_ops
-- 
2.53.0

