[PATCH 6/6] pg_basebackup: add a single-tar output format.

From: Joshua Elsasser <josh(at)idealist(dot)org>
To: pgsql-hackers(at)postgresql(dot)org
Cc: Joshua Elsasser <josh(at)idealist(dot)org>
Subject: [PATCH 6/6] pg_basebackup: add a single-tar output format.
Date: 2015-09-29 22:16:28
Message-ID: 1443564988-17928-7-git-send-email-josh@idealist.org
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

This will write one single tar file containing all tablespaces, and
can be written to stdout.
---
src/bin/pg_basebackup/pg_basebackup.c | 282 ++++++++++++++++++++++++++++++++--
1 file changed, 269 insertions(+), 13 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index e29e466..b3534cb 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -29,6 +29,7 @@
#include "getopt_long.h"
#include "libpq-fe.h"
#include "pqexpbuffer.h"
+#include "common/fe_memutils.h"
#include "pgtar.h"
#include "pgtime.h"
#include "receivelog.h"
@@ -71,7 +72,9 @@ typedef struct TarParser {

/* Global options */
static char *basedir = NULL;
+static char *outpath = NULL;
static TablespaceList tablespace_dirs = {NULL, NULL};
+static char *default_basetablespace = NULL;
static char *xlog_dir = "";
static char format = 'p'; /* p(lain)/t(ar) */
static char *label = "pg_basebackup base backup";
@@ -117,15 +120,19 @@ static void usage(void);
static void disconnect_and_exit(int code);
static void verify_dir_is_empty_or_create(char *dirname);
static void progress_report(int tablespacenum, const char *filename, bool force);
+static char *get_base_tablespace_path(void);

static void OpenTarFile(TarStream *tarfile, const char *path);
static void CloseTarFile(TarStream *tarfile);
-static void TarInsertRecoveryConf(TarStream *stream);
+static void TarInsertRecoveryConf(TarStream *stream, const char *prefix);
+static void TarInsertDirectory(TarStream *stream, const char *path, mode_t mode);
static void IterateAndWriteTar(TarParser *tp, char *inbuf, int buflen,
bool (*callback)(char *, void *), void *cbarg);
static bool TarIterSkipRecoveryConf(char *h, void *arg);
+static bool TarIterRenameForTablespace(char *h, void *arg);

static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveAndAppendTarFile(TarStream *tarfile, PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void GenerateRecoveryConf(PGconn *conn);
static void WriteRecoveryConf(void);
@@ -259,7 +266,7 @@ usage(void)
printf(_(" %s [OPTION]...\n"), progname);
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n"));
- printf(_(" -F, --format=p|t output format (plain (default), tar)\n"));
+ printf(_(" -F, --format=p|t|s output format (plain (default), tar, single-tar)\n"));
printf(_(" -r, --max-rate=RATE maximum transfer rate to transfer data directory\n"
" (in kB/s, or use suffix \"k\" or \"M\")\n"));
printf(_(" -R, --write-recovery-conf\n"
@@ -746,6 +753,39 @@ parse_max_rate(char *src)
return (int32) result;
}

+
+/*
+ * Returns the path of the server's data directory. The returned string is
+ * malloc'd.
+ */
+static char *
+get_base_tablespace_path(void)
+{
+ PGconn *sqlconn;
+ PGresult *res;
+ char *dir;
+
+ sqlconn = GetConnection(false);
+ if (!sqlconn)
+ /* Error message already written in GetConnection() */
+ disconnect_and_exit(1);
+
+ res = PQexec(sqlconn, "SELECT setting FROM pg_catalog.pg_settings "
+ "WHERE name = 'data_directory';");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not get server data_directory: %s"),
+ progname, PQerrorMessage(sqlconn));
+ disconnect_and_exit(1);
+ }
+
+ dir = pg_strdup(PQgetvalue(res, 0, 0));
+ PQclear(res);
+ PQfinish(sqlconn);
+ return dir;
+}
+
+
/*
* Write a piece of tar data
*/
@@ -891,7 +931,7 @@ CloseTarFile(TarStream *tarfile)
* Write a recovery.conf file into the tar stream.
*/
static void
-TarInsertRecoveryConf(TarStream *stream)
+TarInsertRecoveryConf(TarStream *stream, const char *prefix)
{
static char zerobuf[512];
char header[512];
@@ -901,6 +941,8 @@ TarInsertRecoveryConf(TarStream *stream)
recoveryconfcontents->len,
0600, 04000, 02000,
time(NULL));
+ if (prefix != NULL)
+ TarIterRenameForTablespace(header, (void *)prefix);

padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len;

@@ -912,6 +954,29 @@ TarInsertRecoveryConf(TarStream *stream)


/*
+ * Write a directory into the tar stream.
+ */
+static void
+TarInsertDirectory(TarStream *stream, const char *path, mode_t mode)
+{
+ char hdr[512];
+
+ /* set file type to directory */
+ mode = (mode & 07777) | 040000;
+
+ if (tarCreateHeader(hdr, path, NULL, 0,
+ mode, 04000, 02000, time(NULL)) != TAR_OK)
+ {
+ fprintf(stderr, _("%s: filename too long for tar format: %s"),
+ progname, path);
+ disconnect_and_exit(1);
+ }
+
+ writeTarData(stream, hdr, 512);
+}
+
+
+/*
* Process the individual files inside the TAR stream and pass their headers
* to a callback which can modify or chose to skip them. The stream consists
* of a header and zero or more chunks, all 512 bytes long. The stream from
@@ -1026,6 +1091,52 @@ TarIterSkipRecoveryConf(char *h, void *arg)


/*
+ * Adjusts the filename in the tar header pointed to by the first argument to
+ * place it under the directory in the second argument.
+ *
+ * Intended for use as a callback for IterateAndWriteTar() when iterating over
+ * a non-default tablespace's files. The callback argument should be the base
+ * tablespace directory.
+ */
+static bool
+TarIterRenameForTablespace(char *h, void *arg)
+{
+ const char *prefix = arg;
+ char full[MAXPGPATH];
+ size_t len;
+
+ /*
+ * Build a new path out of the tablespace prefix and the existing name in
+ * the header.
+ */
+ strlcpy(full, prefix, sizeof(full));
+ strlcat(full, "/", sizeof(full));
+ len = strlen(full);
+ if (tarHeaderGetName(h, &full[len], sizeof(full) - len) >= sizeof(full) - len)
+ {
+ tarHeaderGetName(h, full, sizeof(full));
+ fprintf(stderr, _("%s: filename too long: %s/%s"),
+ progname, prefix, full);
+ disconnect_and_exit(1);
+ }
+
+ /* clean up the path to remove /./ and such */
+ canonicalize_path(full);
+
+ /* Store the new name in the tar header */
+ if (tarHeaderRename(h, full) != TAR_OK)
+ {
+ fprintf(stderr, _("%s: filename too long for tar format: %s"),
+ progname, full);
+ disconnect_and_exit(1);
+ }
+
+ /* Never skip any files */
+ return false;
+}
+
+
+/*
* Open a (possibly zlib-compressed) tar file for writing. The filebase
* argument should be the desired filename relative to basedir, without a .tar
* or .tar.gz file extension. If the user specified a basedir of - then stdout
@@ -1110,7 +1221,7 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
* file, as required by some tar programs.
*/
if (basetablespace && writerecoveryconf)
- TarInsertRecoveryConf(&stream);
+ TarInsertRecoveryConf(&stream, NULL);

CloseTarFile(&stream);
break;
@@ -1151,6 +1262,108 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
}


+static bool
+tablespaceRenameAndSkipRecoveryConf(char *h, void *arg)
+{
+ if (TarIterSkipRecoveryConf(h, NULL))
+ return true;
+ TarIterRenameForTablespace(h, arg);
+ return false;
+}
+
+
+/*
+ * Receive a tar format file from the connection to the server, and write
+ * the data from this file directly into a tar file. If compression is
+ * enabled, the data will be compressed while written to the file.
+ *
+ * The file must have already been opened by the caller.
+ *
+ * No attempt to inspect or validate the contents of the file is done.
+ */
+static void
+ReceiveAndAppendTarFile(TarStream *stream, PGconn *conn, PGresult *res, int rownum)
+{
+ const char *prefix;
+ char *copybuf = NULL;
+ int buflen;
+ bool basetablespace = PQgetisnull(res, rownum, 0);
+ TarParser parser;
+
+ MemSet(&parser, 0, sizeof(parser));
+ parser.stream = stream;
+ parser.in_tarhdr = true;
+
+ /* Get the directory prefix for the tablespace */
+ prefix = get_tablespace_mapping(basetablespace ? default_basetablespace :
+ PQgetvalue(res, rownum, 1));
+
+ if (!basetablespace)
+ TarInsertDirectory(stream, prefix, 0700);
+
+ /*
+ * Get the COPY data stream
+ */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COPY_OUT)
+ {
+ fprintf(stderr, _("%s: could not get COPY data stream: %s"),
+ progname, PQerrorMessage(conn));
+ disconnect_and_exit(1);
+ }
+
+ while ((buflen = PQgetCopyData(conn, &copybuf, 0)) >= 0)
+ {
+ if (basetablespace && writerecoveryconf)
+ {
+ /*
+ * Look for a recovery.conf in the existing tar stream. If it's
+ * there, we must skip it so we can later overwrite it with our
+ * own version of the file.
+ */
+ IterateAndWriteTar(&parser, copybuf, buflen,
+ tablespaceRenameAndSkipRecoveryConf, (void *)prefix);
+ }
+ else
+ {
+ /*
+ * Rename files in non-base tablespaces into the correct
+ * tablespace directory.
+ */
+ IterateAndWriteTar(&parser, copybuf, buflen,
+ TarIterRenameForTablespace, (void *)prefix);
+ }
+
+ if (copybuf != NULL)
+ {
+ PQfreemem(copybuf);
+ copybuf = NULL;
+ }
+
+ totaldone += buflen;
+ progress_report(rownum, stream->path, false);
+ } /* while (1) */
+
+ if (buflen == -1)
+ {
+ /*
+ * End of chunk. If requested, and this is the base tablespace,
+ * write recovery.conf into the tarfile.
+ */
+ if (basetablespace && writerecoveryconf)
+ TarInsertRecoveryConf(stream, prefix);
+ }
+ else if (buflen == -2)
+ {
+ fprintf(stderr, _("%s: could not read COPY data: %s"),
+ progname, PQerrorMessage(conn));
+ disconnect_and_exit(1);
+ }
+
+ progress_report(rownum, stream->path, true);
+}
+
+
/*
* Retrieve tablespace path, either relocated or original depending on whether
* -T was passed or not.
@@ -1648,6 +1861,7 @@ BaseBackup(void)
int minServerMajor,
maxServerMajor;
int serverMajor;
+ TarStream tarfile;

/*
* Connect in replication mode to the server
@@ -1683,6 +1897,9 @@ BaseBackup(void)
disconnect_and_exit(1);
}

+ if (format == 's')
+ default_basetablespace = get_base_tablespace_path();
+
/*
* Build contents of recovery.conf if requested
*/
@@ -1711,7 +1928,7 @@ BaseBackup(void)
fastcheckpoint ? "FAST" : "",
includewal ? "NOWAIT" : "",
maxrate_clause ? maxrate_clause : "",
- format == 't' ? "TABLESPACE_MAP" : "");
+ (format == 't' || format == 's') ? "TABLESPACE_MAP" : "");

if (PQsendQuery(conn, basebkp) == 0)
{
@@ -1802,6 +2019,9 @@ BaseBackup(void)
fprintf(stderr,
_("%s: can only write single tablespace to stdout, database has %d\n"),
progname, PQntuples(res));
+ fprintf(stderr,
+ _("HINT: the -F single-tar option always writes a single tar file which may\n"
+ "be written to stdout.\n"));
disconnect_and_exit(1);
}

@@ -1817,6 +2037,9 @@ BaseBackup(void)
StartLogStreamer(xlogstart, starttli, sysidentifier);
}

+ if (format == 's')
+ OpenTarFile(&tarfile, outpath);
+
/*
* Start receiving chunks
*/
@@ -1824,10 +2047,15 @@ BaseBackup(void)
{
if (format == 't')
ReceiveTarFile(conn, res, i);
+ else if (format == 's')
+ ReceiveAndAppendTarFile(&tarfile, conn, res, i);
else
ReceiveAndUnpackTarFile(conn, res, i);
} /* Loop over all tablespaces */

+ if (format == 's')
+ CloseTarFile(&tarfile);
+
if (showprogress)
{
progress_report(PQntuples(res), NULL, true);
@@ -1982,6 +2210,7 @@ main(int argc, char **argv)
{"help", no_argument, NULL, '?'},
{"version", no_argument, NULL, 'V'},
{"pgdata", required_argument, NULL, 'D'},
+ {"file", required_argument, NULL, 'f'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
{"max-rate", required_argument, NULL, 'r'},
@@ -2027,7 +2256,7 @@ main(int argc, char **argv)
}
}

- while ((c = getopt_long(argc, argv, "D:F:r:RT:xX:l:zZ:d:c:h:p:U:s:S:wWvP",
+ while ((c = getopt_long(argc, argv, "D:f:F:r:RT:xX:l:zZ:d:c:h:p:U:s:S:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
@@ -2035,15 +2264,20 @@ main(int argc, char **argv)
case 'D':
basedir = pg_strdup(optarg);
break;
+ case 'f':
+ outpath = pg_strdup(optarg);
+ break;
case 'F':
if (strcmp(optarg, "p") == 0 || strcmp(optarg, "plain") == 0)
format = 'p';
else if (strcmp(optarg, "t") == 0 || strcmp(optarg, "tar") == 0)
format = 't';
+ else if (strcmp(optarg, "s") == 0 || strcmp(optarg, "single-tar") == 0)
+ format = 's';
else
{
fprintf(stderr,
- _("%s: invalid output format \"%s\", must be \"plain\" or \"tar\"\n"),
+ _("%s: invalid output format \"%s\", must be \"plain\", \"tar\" or \"single-tar\"\n"),
progname, optarg);
exit(1);
}
@@ -2190,12 +2424,34 @@ main(int argc, char **argv)
/*
* Required arguments
*/
- if (basedir == NULL)
+ if (format == 's')
{
- fprintf(stderr, _("%s: no target directory specified\n"), progname);
- fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
- progname);
- exit(1);
+ if (basedir != NULL)
+ {
+ fprintf(stderr, _("%s: target directory cannot be specified in single-tar mode\n"), progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+ if (outpath == NULL)
+ outpath = pg_strdup("-");
+ }
+ else
+ {
+ if (basedir == NULL)
+ {
+ fprintf(stderr, _("%s: no target directory specified\n"), progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+ if (outpath != NULL)
+ {
+ fprintf(stderr, _("%s: target file can only be specified in single-tar mode\n"), progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
}

/*
@@ -2270,7 +2526,7 @@ main(int argc, char **argv)
* backups, always require the directory. For tar backups, require it
* unless we are writing to stdout.
*/
- if (format == 'p' || strcmp(basedir, "-") != 0)
+ if (format == 'p' || (basedir != NULL && strcmp(basedir, "-")) != 0)
verify_dir_is_empty_or_create(basedir);

/* Create transaction log symlink, if required */
--
2.3.0

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message David Rowley 2015-09-29 22:18:20 Re: PATCH: use foreign keys to improve join estimates v1
Previous Message Joshua Elsasser 2015-09-29 22:16:27 [PATCH 5/6] pg_basebackup: allow GetConnection() to make non-replication connections.