diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 0e6b636703..78921fca59 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -93,6 +93,26 @@ PostgreSQL documentation
+
+
+
+ Command to execute once a WAL segment is completed. Any
+ %f> in the string is replaced by the same of the WAL segment
+ that has just been completed. This is useful to perform extra actions
+ that need to be run once a segment has been completed without relying
+ on any external utilities, like copying the just-finished segment into
+ a secundary location, perform extra sanity checks on it or for example
+ perform cleanup actions on all the WAL segments already archived like
+ a cleanup of the oldest segments saved. Here is a simple example of
+ command:
+
+--end-segment-command="cp /mnt/server/archive/%f /mnt/server2/archive/"
+
+
+
+
+
+
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 4b75e765bb..07e708b011 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -489,6 +489,7 @@ LogStreamerMain(logstreamer_param *param)
stream.partial_suffix = NULL;
stream.replication_slot = replication_slot;
stream.temp_slot = param->temp_slot;
+ stream.end_segment_cmd = NULL;
if (stream.temp_slot && !stream.replication_slot)
stream.replication_slot = psprintf("pg_basebackup_%d", (int) getpid());
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 15348ada58..5640785823 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -42,6 +42,7 @@ static bool slot_exists_ok = false;
static bool do_drop_slot = false;
static bool synchronous = false;
static char *replication_slot = NULL;
+static char *end_segment_cmd = NULL;
static void usage(void);
@@ -83,6 +84,10 @@ usage(void)
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
printf(_(" --synchronous flush transaction log immediately after writing\n"));
+ printf(_(" --end-segment-command\n"
+ " custom command executed node a segment completes.\n"
+ " %%f can be used as placeholder to define the\n"
+ " name of the segment name.\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -Z, --compress=0-9 compress logs with given compression level\n"));
@@ -418,6 +423,7 @@ StreamLog(void)
stream.partial_suffix = ".partial";
stream.replication_slot = replication_slot;
stream.temp_slot = false;
+ stream.end_segment_cmd = end_segment_cmd;
ReceiveXlogStream(conn, &stream);
@@ -473,6 +479,7 @@ main(int argc, char **argv)
{"drop-slot", no_argument, NULL, 2},
{"if-not-exists", no_argument, NULL, 3},
{"synchronous", no_argument, NULL, 4},
+ {"end-segment-command", required_argument, NULL, 5},
{NULL, 0, NULL, 0}
};
@@ -570,6 +577,9 @@ main(int argc, char **argv)
case 4:
synchronous = true;
break;
+ case 5:
+ end_segment_cmd = pg_strdup(optarg);
+ break;
default:
/*
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index f415135172..51776e5958 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -54,6 +54,7 @@ static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_ti
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
+static bool runEndSegmentCommand(StreamCtl *stream, XLogRecPtr blockpos);
static bool
mark_file_as_archived(StreamCtl *stream, const char *fname)
@@ -749,6 +750,79 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
}
/*
+ * Run command provided by user once a segment is completed. Returns true
+ * if the command succeeds, and false otherwise.
+ */
+static bool
+runEndSegmentCommand(StreamCtl *stream, XLogRecPtr blockpos)
+{
+ char endSegmentCmd[MAXPGPATH];
+ char xlogfname[MAXPGPATH];
+ char *dp, *endp, *sp;
+ XLogSegNo segno;
+ int rc;
+
+ Assert(stream->end_segment_cmd != NULL);
+
+ /*
+ * Build the name of the segment just completed. This takes into
+ * account compressed segments.
+ */
+ XLByteToPrevSeg(blockpos, segno);
+ XLogFileName(xlogfname, stream->timeline, segno);
+ if (stream->walmethod->get_compression() > 0)
+ {
+ snprintf(xlogfname, MAXPGPATH, "%s.gz", xlogfname);
+ }
+
+ /* Construct the command to be executed */
+ dp = endSegmentCmd;
+ endp = endSegmentCmd + MAXPGPATH - 1;
+ *endp = '\0';
+
+ /*
+ * Check presence of placeholders in the command provided and replace
+ * them accordingly
+ */
+ for (sp = stream->end_segment_cmd; *sp; sp++)
+ {
+ if (*sp == '%')
+ {
+ switch (sp[1])
+ {
+ case 'f':
+ /* %f: filename of just-completed segment file */
+ sp++;
+ StrNCpy(dp, xlogfname, endp - dp);
+ dp += strlen(dp);
+ break;
+ default:
+ /* otherwise treat the % as not special */
+ if (dp < endp)
+ *dp++ = *sp;
+ break;
+ }
+ }
+ else
+ {
+ if (dp < endp)
+ *dp++ = *sp;
+ }
+ }
+ *dp = '\0';
+
+ /* And now run the command */
+ rc = system(endSegmentCmd);
+ if (rc != 0)
+ {
+ fprintf(stderr, _("%s: failed to run end-of-segment command \"%s\"\n"),
+ progname, endSegmentCmd);
+ return false;
+ }
+ return true;
+}
+
+/*
* The main loop of ReceiveXlogStream. Handles the COPY stream after
* initiating streaming with the START_STREAMING command.
*
@@ -1174,6 +1248,10 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
still_sending = false;
return true; /* ignore the rest of this XLogData packet */
}
+
+ /* Run custom end-of-segment command */
+ if (stream->end_segment_cmd != NULL)
+ runEndSegmentCommand(stream, *blockpos);
}
}
/* No more data left to write, receive next copy packet */
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 42e93ac745..f8610da079 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -46,6 +46,8 @@ typedef struct StreamCtl
char *partial_suffix; /* Suffix appended to partially received files */
char *replication_slot; /* Replication slot to use, or NULL */
bool temp_slot; /* Create temporary replication slot */
+ char *end_segment_cmd; /* Custom command run each time a segment
+ * is completed */
} StreamCtl;
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index d9ad596bf0..7c00a38dd5 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -310,6 +310,12 @@ dir_get_file_size(const char *pathname)
return statbuf.st_size;
}
+static int
+dir_get_compression(void)
+{
+ return dir_data->compression;
+}
+
static bool
dir_existsfile(const char *pathname)
{
@@ -351,6 +357,7 @@ CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
method->open_for_write = dir_open_for_write;
method->write = dir_write;
method->get_current_pos = dir_get_current_pos;
+ method->get_compression = dir_get_compression;
method->get_file_size = dir_get_file_size;
method->close = dir_close;
method->sync = dir_sync;
@@ -675,6 +682,12 @@ tar_get_file_size(const char *pathname)
return -1;
}
+static int
+tar_get_compression(void)
+{
+ return tar_data->compression;
+}
+
static off_t
tar_get_current_pos(Walfile f)
{
@@ -953,6 +966,7 @@ CreateWalTarMethod(const char *tarbase, int compression, bool sync)
method->open_for_write = tar_open_for_write;
method->write = tar_write;
method->get_current_pos = tar_get_current_pos;
+ method->get_compression = tar_get_compression;
method->get_file_size = tar_get_file_size;
method->close = tar_close;
method->sync = tar_sync;
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index 8d679dab61..2948ebdffa 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -26,6 +26,7 @@ struct WalWriteMethod
int (*close) (Walfile f, WalCloseMethod method);
bool (*existsfile) (const char *pathname);
ssize_t (*get_file_size) (const char *pathname);
+ int (*get_compression) (void);
ssize_t (*write) (Walfile f, const void *buf, size_t count);
off_t (*get_current_pos) (Walfile f);