From aae4640acbb2a1ae4ff5d2e80abce0798799fe73 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Fri, 30 Aug 2019 20:42:51 +0200 Subject: [PATCH v2] Base backup client as auxiliary backend process Discussion: https://www.postgresql.org/message-id/flat/61b8d18d-c922-ac99-b990-a31ba63cdcbb@2ndquadrant.com --- doc/src/sgml/protocol.sgml | 12 +- doc/src/sgml/ref/initdb.sgml | 17 + src/backend/access/transam/xlog.c | 84 ++-- src/backend/bootstrap/bootstrap.c | 9 + src/backend/postmaster/pgstat.c | 6 + src/backend/postmaster/postmaster.c | 114 ++++- src/backend/replication/basebackup.c | 68 +++ .../libpqwalreceiver/libpqwalreceiver.c | 419 ++++++++++++++++++ src/backend/replication/repl_gram.y | 9 +- src/backend/replication/repl_scanner.l | 1 + src/bin/initdb/initdb.c | 39 +- src/include/access/xlog.h | 6 + src/include/miscadmin.h | 2 + src/include/pgstat.h | 1 + src/include/replication/basebackup.h | 2 + src/include/replication/walreceiver.h | 4 + src/include/utils/guc.h | 2 +- src/test/recovery/t/018_basebackup.pl | 29 ++ 18 files changed, 768 insertions(+), 56 deletions(-) create mode 100644 src/test/recovery/t/018_basebackup.pl diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index b20f1690a7..81f43b5c00 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2466,7 +2466,7 @@ Streaming Replication Protocol - BASE_BACKUP [ LABEL 'label' ] [ PROGRESS ] [ FAST ] [ WAL ] [ NOWAIT ] [ MAX_RATE rate ] [ TABLESPACE_MAP ] [ NOVERIFY_CHECKSUMS ] + BASE_BACKUP [ LABEL 'label' ] [ PROGRESS ] [ FAST ] [ WAL ] [ NOWAIT ] [ MAX_RATE rate ] [ TABLESPACE_MAP ] [ NOVERIFY_CHECKSUMS ] [ EXCLUDE_CONF ] BASE_BACKUP @@ -2576,6 +2576,16 @@ Streaming Replication Protocol + + + EXCLUDE_CONF + + + Do not copy configuration files, that is, files that end in + .conf. + + + diff --git a/doc/src/sgml/ref/initdb.sgml b/doc/src/sgml/ref/initdb.sgml index da5c8f5307..1261e02d59 100644 --- a/doc/src/sgml/ref/initdb.sgml +++ b/doc/src/sgml/ref/initdb.sgml @@ -286,6 +286,23 @@ Options + + + + + + Initialize a data directory for a physical replication replica. The + data directory will not be initialized with a full database system, + but will instead only contain a minimal set of files. A server that + is started on this data directory will first fetch a base backup and + then switch to standby mode. The connection information for the base + backup has to be configured by setting , and other parameters as desired, + before the server is started. + + + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e651a841bb..7ab8ab45f5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -905,8 +905,6 @@ static void CheckRecoveryConsistency(void); static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int whichChkpt, bool report); static bool rescanLatestTimeLine(void); -static void WriteControlFile(void); -static void ReadControlFile(void); static char *str_time(pg_time_t tnow); static bool CheckForStandbyTrigger(void); @@ -4481,7 +4479,7 @@ rescanLatestTimeLine(void) * ReadControlFile() verifies they are correct. We could split out the * I/O and compatibility-check functions, but there seems no need currently. */ -static void +void WriteControlFile(void) { int fd; @@ -4573,7 +4571,7 @@ WriteControlFile(void) XLOG_CONTROL_FILE))); } -static void +void ReadControlFile(void) { pg_crc32c crc; @@ -5079,6 +5077,41 @@ XLOGShmemInit(void) InitSharedLatch(&XLogCtl->recoveryWakeupLatch); } +void +InitControlFile(uint64 sysidentifier) +{ + char mock_auth_nonce[MOCK_AUTH_NONCE_LEN]; + + /* + * Generate a random nonce. This is used for authentication requests that + * will fail because the user does not exist. The nonce is used to create + * a genuine-looking password challenge for the non-existent user, in lieu + * of an actual stored password. + */ + if (!pg_strong_random(mock_auth_nonce, MOCK_AUTH_NONCE_LEN)) + ereport(PANIC, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("could not generate secret authorization token"))); + + memset(ControlFile, 0, sizeof(ControlFileData)); + /* Initialize pg_control status fields */ + ControlFile->system_identifier = sysidentifier; + memcpy(ControlFile->mock_authentication_nonce, mock_auth_nonce, MOCK_AUTH_NONCE_LEN); + ControlFile->state = DB_SHUTDOWNED; + ControlFile->unloggedLSN = FirstNormalUnloggedLSN; + + /* Set important parameter values for use when replaying WAL */ + ControlFile->MaxConnections = MaxConnections; + ControlFile->max_worker_processes = max_worker_processes; + ControlFile->max_wal_senders = max_wal_senders; + ControlFile->max_prepared_xacts = max_prepared_xacts; + ControlFile->max_locks_per_xact = max_locks_per_xact; + ControlFile->wal_level = wal_level; + ControlFile->wal_log_hints = wal_log_hints; + ControlFile->track_commit_timestamp = track_commit_timestamp; + ControlFile->data_checksum_version = bootstrap_data_checksum_version; +} + /* * This func must be called ONCE on system install. It creates pg_control * and the initial XLOG segment. @@ -5094,7 +5127,6 @@ BootStrapXLOG(void) char *recptr; bool use_existent; uint64 sysidentifier; - char mock_auth_nonce[MOCK_AUTH_NONCE_LEN]; struct timeval tv; pg_crc32c crc; @@ -5115,17 +5147,6 @@ BootStrapXLOG(void) sysidentifier |= ((uint64) tv.tv_usec) << 12; sysidentifier |= getpid() & 0xFFF; - /* - * Generate a random nonce. This is used for authentication requests that - * will fail because the user does not exist. The nonce is used to create - * a genuine-looking password challenge for the non-existent user, in lieu - * of an actual stored password. - */ - if (!pg_strong_random(mock_auth_nonce, MOCK_AUTH_NONCE_LEN)) - ereport(PANIC, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("could not generate secret authorization token"))); - /* First timeline ID is always 1 */ ThisTimeLineID = 1; @@ -5233,30 +5254,12 @@ BootStrapXLOG(void) openLogFile = -1; /* Now create pg_control */ - - memset(ControlFile, 0, sizeof(ControlFileData)); - /* Initialize pg_control status fields */ - ControlFile->system_identifier = sysidentifier; - memcpy(ControlFile->mock_authentication_nonce, mock_auth_nonce, MOCK_AUTH_NONCE_LEN); - ControlFile->state = DB_SHUTDOWNED; + InitControlFile(sysidentifier); ControlFile->time = checkPoint.time; ControlFile->checkPoint = checkPoint.redo; ControlFile->checkPointCopy = checkPoint; - ControlFile->unloggedLSN = FirstNormalUnloggedLSN; - - /* Set important parameter values for use when replaying WAL */ - ControlFile->MaxConnections = MaxConnections; - ControlFile->max_worker_processes = max_worker_processes; - ControlFile->max_wal_senders = max_wal_senders; - ControlFile->max_prepared_xacts = max_prepared_xacts; - ControlFile->max_locks_per_xact = max_locks_per_xact; - ControlFile->wal_level = wal_level; - ControlFile->wal_log_hints = wal_log_hints; - ControlFile->track_commit_timestamp = track_commit_timestamp; - ControlFile->data_checksum_version = bootstrap_data_checksum_version; /* some additional ControlFile fields are set in WriteControlFile() */ - WriteControlFile(); /* Bootstrap the commit log, too */ @@ -6225,13 +6228,11 @@ StartupXLOG(void) CurrentResourceOwner = AuxProcessResourceOwner; /* - * Verify XLOG status looks valid. + * Check that contents look valid. */ - if (ControlFile->state < DB_SHUTDOWNED || - ControlFile->state > DB_IN_PRODUCTION || - !XRecOffIsValid(ControlFile->checkPoint)) + if (!XRecOffIsValid(ControlFile->checkPoint)) ereport(FATAL, - (errmsg("control file contains invalid data"))); + (errmsg("control file contains invalid checkpoint location"))); if (ControlFile->state == DB_SHUTDOWNED) { @@ -6264,6 +6265,9 @@ StartupXLOG(void) ereport(LOG, (errmsg("database system was interrupted; last known up at %s", str_time(ControlFile->time)))); + else + ereport(FATAL, + (errmsg("control file contains invalid database cluster state"))); /* This is just to allow attaching to startup process with a debugger */ #ifdef XLOG_REPLAY_DELAY diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 9238fbe98d..a8b1ffd08a 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -36,6 +36,7 @@ #include "postmaster/bgwriter.h" #include "postmaster/startup.h" #include "postmaster/walwriter.h" +#include "replication/basebackup.h" #include "replication/walreceiver.h" #include "storage/bufmgr.h" #include "storage/bufpage.h" @@ -326,6 +327,9 @@ AuxiliaryProcessMain(int argc, char *argv[]) case StartupProcess: statmsg = pgstat_get_backend_desc(B_STARTUP); break; + case BaseBackupProcess: + statmsg = pgstat_get_backend_desc(B_BASE_BACKUP); + break; case BgWriterProcess: statmsg = pgstat_get_backend_desc(B_BG_WRITER); break; @@ -451,6 +455,11 @@ AuxiliaryProcessMain(int argc, char *argv[]) StartupProcessMain(); proc_exit(1); /* should never return */ + case BaseBackupProcess: + /* don't set signals, basebackup has its own agenda */ + BaseBackupMain(); + proc_exit(1); /* should never return */ + case BgWriterProcess: /* don't set signals, bgwriter has its own agenda */ BackgroundWriterMain(); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index d362e7f7d7..79465333bc 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -2934,6 +2934,9 @@ pgstat_bestart(void) case StartupProcess: lbeentry.st_backendType = B_STARTUP; break; + case BaseBackupProcess: + lbeentry.st_backendType = B_BASE_BACKUP; + break; case BgWriterProcess: lbeentry.st_backendType = B_BG_WRITER; break; @@ -4289,6 +4292,9 @@ pgstat_get_backend_desc(BackendType backendType) case B_BG_WORKER: backendDesc = "background worker"; break; + case B_BASE_BACKUP: + backendDesc = "base backup"; + break; case B_BG_WRITER: backendDesc = "background writer"; break; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 62dc93d56b..3096e6ef33 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -116,6 +116,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "replication/logicallauncher.h" +#include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -248,6 +249,7 @@ bool restart_after_crash = true; /* PIDs of special child processes; 0 when not running */ static pid_t StartupPID = 0, + BaseBackupPID = 0, BgWriterPID = 0, CheckpointerPID = 0, WalWriterPID = 0, @@ -539,6 +541,7 @@ static void ShmemBackendArrayRemove(Backend *bn); #endif /* EXEC_BACKEND */ #define StartupDataBase() StartChildProcess(StartupProcess) +#define StartBaseBackup() StartChildProcess(BaseBackupProcess) #define StartBackgroundWriter() StartChildProcess(BgWriterProcess) #define StartCheckpointer() StartChildProcess(CheckpointerProcess) #define StartWalWriter() StartChildProcess(WalWriterProcess) @@ -572,6 +575,8 @@ PostmasterMain(int argc, char *argv[]) bool listen_addr_saved = false; int i; char *output_config_variable = NULL; + struct stat stat_buf; + bool basebackup_signal_file_found = false; InitProcessGlobals(); @@ -877,12 +882,27 @@ PostmasterMain(int argc, char *argv[]) /* Verify that DataDir looks reasonable */ checkDataDir(); - /* Check that pg_control exists */ - checkControlFile(); - /* And switch working directory into it */ ChangeToDataDir(); + if (stat(BASEBACKUP_SIGNAL_FILE, &stat_buf) == 0) + { + int fd; + + fd = BasicOpenFilePerm(STANDBY_SIGNAL_FILE, O_RDWR | PG_BINARY, + S_IRUSR | S_IWUSR); + if (fd >= 0) + { + (void) pg_fsync(fd); + close(fd); + } + basebackup_signal_file_found = true; + } + + /* Check that pg_control exists */ + if (!basebackup_signal_file_found) + checkControlFile(); + /* * Check for invalid combinations of GUC settings. */ @@ -961,7 +981,8 @@ PostmasterMain(int argc, char *argv[]) * processes will inherit the correct function pointer and not need to * repeat the test. */ - LocalProcessControlFile(false); + if (!basebackup_signal_file_found) + LocalProcessControlFile(false); /* * Initialize SSL library, if specified. @@ -1363,6 +1384,39 @@ PostmasterMain(int argc, char *argv[]) */ AddToDataDirLockFile(LOCK_FILE_LINE_PM_STATUS, PM_STATUS_STARTING); + if (basebackup_signal_file_found) + { + BaseBackupPID = StartBaseBackup(); + + /* + * Wait until done. Start WAL receiver in the meantime, once base + * backup has received the starting position. + */ + while (BaseBackupPID != 0) + { + PG_SETMASK(&UnBlockSig); + pg_usleep(1000000L); + PG_SETMASK(&BlockSig); + MaybeStartWalReceiver(); + } + + /* + * XXX Shut down WAL receiver. It will be restarted later in xlog.c, + * and that will complain if it's already running. + */ + ShutdownWalRcv(); + + /* + * Base backup done, now signal standby mode. + */ + durable_rename(BASEBACKUP_SIGNAL_FILE, STANDBY_SIGNAL_FILE, FATAL); + + /* + * Reread the control file that came in with the base backup. + */ + ReadControlFile(); + } + /* * We're ready to rock and roll... */ @@ -2631,6 +2685,8 @@ SIGHUP_handler(SIGNAL_ARGS) SignalChildren(SIGHUP); if (StartupPID != 0) signal_child(StartupPID, SIGHUP); + if (BaseBackupPID != 0) + signal_child(BaseBackupPID, SIGHUP); if (BgWriterPID != 0) signal_child(BgWriterPID, SIGHUP); if (CheckpointerPID != 0) @@ -2782,6 +2838,8 @@ pmdie(SIGNAL_ARGS) if (StartupPID != 0) signal_child(StartupPID, SIGTERM); + if (BaseBackupPID != 0) + signal_child(BaseBackupPID, SIGTERM); if (BgWriterPID != 0) signal_child(BgWriterPID, SIGTERM); if (WalReceiverPID != 0) @@ -3012,6 +3070,23 @@ reaper(SIGNAL_ARGS) continue; } + /* + * Was it the base backup process? + */ + if (pid == BaseBackupPID) + { + BaseBackupPID = 0; + if (EXIT_STATUS_0(exitstatus)) + ; + else if (EXIT_STATUS_1(exitstatus)) + ereport(FATAL, + (errmsg("base backup failed"))); + else + HandleChildCrash(pid, exitstatus, + _("base backup process")); + continue; + } + /* * Was it the bgwriter? Normal exit can be ignored; we'll start a new * one at the next iteration of the postmaster's main loop, if @@ -3531,6 +3606,18 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) StartupStatus = STARTUP_SIGNALED; } + /* Take care of the base backup process too */ + if (pid == BaseBackupPID) + BaseBackupPID = 0; + else if (BaseBackupPID != 0 && take_action) + { + ereport(DEBUG2, + (errmsg_internal("sending %s to process %d", + (SendStop ? "SIGSTOP" : "SIGQUIT"), + (int) BaseBackupPID))); + signal_child(BaseBackupPID, (SendStop ? SIGSTOP : SIGQUIT)); + } + /* Take care of the bgwriter too */ if (pid == BgWriterPID) BgWriterPID = 0; @@ -3765,6 +3852,7 @@ PostmasterStateMachine(void) if (CountChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_WORKER) == 0 && StartupPID == 0 && WalReceiverPID == 0 && + BaseBackupPID == 0 && BgWriterPID == 0 && (CheckpointerPID == 0 || (!FatalError && Shutdown < ImmediateShutdown)) && @@ -3859,6 +3947,7 @@ PostmasterStateMachine(void) /* These other guys should be dead already */ Assert(StartupPID == 0); Assert(WalReceiverPID == 0); + Assert(BaseBackupPID == 0); Assert(BgWriterPID == 0); Assert(CheckpointerPID == 0); Assert(WalWriterPID == 0); @@ -4042,6 +4131,8 @@ TerminateChildren(int signal) if (signal == SIGQUIT || signal == SIGKILL) StartupStatus = STARTUP_SIGNALED; } + if (BaseBackupPID != 0) + signal_child(BgWriterPID, signal); if (BgWriterPID != 0) signal_child(BgWriterPID, signal); if (CheckpointerPID != 0) @@ -4867,6 +4958,7 @@ SubPostmasterMain(int argc, char *argv[]) strcmp(argv[1], "--forkavlauncher") == 0 || strcmp(argv[1], "--forkavworker") == 0 || strcmp(argv[1], "--forkboot") == 0 || + strcmp(argv[1], "--forkbasebackup") == 0 || strncmp(argv[1], "--forkbgworker=", 15) == 0) PGSharedMemoryReAttach(); else @@ -4906,7 +4998,8 @@ SubPostmasterMain(int argc, char *argv[]) * (re-)read control file, as it contains config. The postmaster will * already have read this, but this process doesn't know about that. */ - LocalProcessControlFile(false); + if (strcmp(argv[1], "--forkbasebackup") != 0) + LocalProcessControlFile(false); /* * Reload any libraries that were preloaded by the postmaster. Since we @@ -4967,7 +5060,8 @@ SubPostmasterMain(int argc, char *argv[]) /* And run the backend */ BackendRun(&port); /* does not return */ } - if (strcmp(argv[1], "--forkboot") == 0) + if (strcmp(argv[1], "--forkboot") == 0 || + strcmp(argv[1], "--forkbasebackup") == 0) { /* Restore basic shared memory pointers */ InitShmemAccess(UsedShmemSegAddr); @@ -5371,7 +5465,7 @@ StartChildProcess(AuxProcType type) av[ac++] = "postgres"; #ifdef EXEC_BACKEND - av[ac++] = "--forkboot"; + av[ac++] = (type == BaseBackupProcess) ? "--forkbasebackup" : "--forkboot"; av[ac++] = NULL; /* filled in by postmaster_forkexec */ #endif @@ -5415,6 +5509,10 @@ StartChildProcess(AuxProcType type) ereport(LOG, (errmsg("could not fork startup process: %m"))); break; + case BaseBackupProcess: + ereport(LOG, + (errmsg("could not fork base backup process: %m"))); + break; case BgWriterProcess: ereport(LOG, (errmsg("could not fork background writer process: %m"))); @@ -5556,7 +5654,7 @@ static void MaybeStartWalReceiver(void) { if (WalReceiverPID == 0 && - (pmState == PM_STARTUP || pmState == PM_RECOVERY || + (pmState == PM_INIT || pmState == PM_STARTUP || pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY || pmState == PM_WAIT_READONLY) && Shutdown == NoShutdown) { diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index c91f66dcbe..fcac192d0b 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -29,6 +29,7 @@ #include "port.h" #include "postmaster/syslogger.h" #include "replication/basebackup.h" +#include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/walsender_private.h" #include "storage/bufpage.h" @@ -38,6 +39,7 @@ #include "storage/ipc.h" #include "storage/reinit.h" #include "utils/builtins.h" +#include "utils/guc.h" #include "utils/ps_status.h" #include "utils/relcache.h" #include "utils/timestamp.h" @@ -111,6 +113,9 @@ static long long int total_checksum_failures; /* Do not verify checksums. */ static bool noverify_checksums = false; +/* Do not copy config files. */ +static bool exclude_conf = false; + /* * The contents of these directories are removed or recreated during server * start so they are not included in backups. The directories themselves are @@ -638,6 +643,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_maxrate = false; bool o_tablespace_map = false; bool o_noverify_checksums = false; + bool o_exclude_conf = false; MemSet(opt, 0, sizeof(*opt)); foreach(lopt, options) @@ -726,6 +732,15 @@ parse_basebackup_options(List *options, basebackup_options *opt) noverify_checksums = true; o_noverify_checksums = true; } + else if (strcmp(defel->defname, "exclude_conf") == 0) + { + if (o_exclude_conf) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + exclude_conf = true; + o_exclude_conf = true; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -1135,6 +1150,18 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, continue; } + if (exclude_conf) + { + char *dot = strrchr(de->d_name, '.'); + if (dot && strcmp(dot, ".conf") == 0) + { + elog(DEBUG2, + "configuration file \"%s\" excluded from backup", + de->d_name); + continue; + } + } + snprintf(pathbuf, sizeof(pathbuf), "%s/%s", path, de->d_name); /* Skip pg_control here to back up it last */ @@ -1711,3 +1738,44 @@ throttle(size_t increment) */ throttled_last = GetCurrentTimestamp(); } + + +/* + * base backup worker process (client) main function + */ +void +BaseBackupMain(void) +{ + WalReceiverConn *wrconn = NULL; + char *err; + TimeLineID primaryTLI; + uint64 primary_sysid; + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + if (WalReceiverFunctions == NULL) + elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + + /* Establish the connection to the primary */ + wrconn = walrcv_connect(PrimaryConnInfo, false, cluster_name[0] ? cluster_name : "basebackup", &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + + /* + * Get the remote sysid and stick it into the local control file, so that + * the walreceiver is happy. The control file will later be overwritten + * by the base backup. + */ + primary_sysid = strtoull(walrcv_identify_system(wrconn, &primaryTLI), NULL, 10); + InitControlFile(primary_sysid); + WriteControlFile(); + + walrcv_base_backup(wrconn); + + walrcv_disconnect(wrconn); + + ereport(LOG, + (errmsg("base backup completed"))); + proc_exit(0); +} diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 6eba08a920..6d448acacf 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -17,8 +17,14 @@ #include "postgres.h" #include +#include #include +#ifdef USE_SYSTEMD +#include +#endif + +#include "common/string.h" #include "libpq-fe.h" #include "pqexpbuffer.h" #include "access/xlog.h" @@ -27,10 +33,13 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" +#include "pgtar.h" #include "replication/walreceiver.h" #include "utils/builtins.h" +#include "utils/guc.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" +#include "utils/ps_status.h" #include "utils/tuplestore.h" PG_MODULE_MAGIC; @@ -61,6 +70,7 @@ static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len); +static void libpqrcv_base_backup(WalReceiverConn *conn); static bool libpqrcv_startstreaming(WalReceiverConn *conn, const WalRcvStreamOptions *options); static void libpqrcv_endstreaming(WalReceiverConn *conn, @@ -88,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_identify_system, libpqrcv_server_version, libpqrcv_readtimelinehistoryfile, + libpqrcv_base_backup, libpqrcv_startstreaming, libpqrcv_endstreaming, libpqrcv_receive, @@ -356,6 +367,414 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * XXX copied from pg_basebackup.c + */ + +unsigned long long totaldone; +unsigned long long totalsize_kb; +int tablespacenum; +int tablespacecount; + +static void +base_backup_report_progress(void) +{ + int percent; + char *progress; + + percent = totalsize_kb ? (int) ((totaldone / 1024) * 100 / totalsize_kb) : 0; + + /* + * Avoid overflowing past 100% or the full size. This may make the total + * size number change as we approach the end of the backup (the estimate + * will always be wrong if WAL is included), but that's better than having + * the done column be bigger than the total. + */ + if (percent > 100) + percent = 100; + if (totaldone / 1024 > totalsize_kb) + totalsize_kb = totaldone / 1024; + + /* Note: no translation of ps status */ + progress = psprintf((tablespacecount == 1 ? + "%llu/%llu kB (%d%%), %d/%d tablespace" : + "%llu/%llu kB (%d%%), %d/%d tablespaces"), + totaldone / 1024, + totalsize_kb, + percent, + tablespacenum, + tablespacecount); + + set_ps_display(progress, false); +#ifdef USE_SYSTEMD + sd_pid_notifyf(PostmasterPid, 0, "STATUS=base backup %s", progress); +#endif + + pfree(progress); +} + +static void +ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res) +{ + char current_path[MAXPGPATH]; + char filename[MAXPGPATH]; + pgoff_t current_len_left = 0; + int current_padding = 0; + char *copybuf = NULL; + FILE *file = NULL; + off_t flush_offset; + + strlcpy(current_path, DataDir, sizeof(current_path)); + + /* + * Get the COPY data + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COPY_OUT) + ereport(ERROR, + (errmsg("could not get COPY data stream: %s", + PQerrorMessage(conn)))); + + while (1) + { + int r; + + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + + r = PQgetCopyData(conn, ©buf, 0); + + if (r == -1) + { + /* + * End of chunk + */ + if (file) + fclose(file); + + break; + } + else if (r == -2) + { + ereport(ERROR, + (errmsg("could not read COPY data: %s", + PQerrorMessage(conn)))); + } + + if (file == NULL) + { + int filemode; + + /* + * No current file, so this must be the header for a new file + */ + if (r != 512) + ereport(ERROR, + (errmsg("invalid tar block header size: %d", r))); + + current_len_left = read_tar_number(©buf[124], 12); + + /* Set permissions on the file */ + filemode = read_tar_number(©buf[100], 8); + + /* + * All files are padded up to 512 bytes + */ + current_padding = + ((current_len_left + 511) & ~511) - current_len_left; + + /* + * First part of header is zero terminated filename + */ + snprintf(filename, sizeof(filename), "%s/%s", current_path, + copybuf); + if (filename[strlen(filename) - 1] == '/') + { + /* + * Ends in a slash means directory or symlink to directory + */ + if (copybuf[156] == '5') + { + /* + * Directory + */ + filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + if (MakePGDirectory(filename) != 0) + { + if (errno != EEXIST) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + filename))); + } +#ifndef WIN32 + if (chmod(filename, (mode_t) filemode)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not set permissions on directory \"%s\": %m", + filename))); +#endif + fsync_fname(filename, true); + } + else if (copybuf[156] == '2') + { + /* + * Symbolic link + * + * It's most likely a link in pg_tblspc directory, to the + * location of a tablespace. Apply any tablespace mapping + * given on the command line (--tablespace-mapping). (We + * blindly apply the mapping without checking that the + * link really is inside pg_tblspc. We don't expect there + * to be other symlinks in a data directory, but if there + * are, you can call it an undocumented feature that you + * can map them too.) + */ +#ifdef TODO + filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + + mapped_tblspc_path = get_tablespace_mapping(©buf[157]); + if (symlink(mapped_tblspc_path, filename) != 0) + { + pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m", + filename, mapped_tblspc_path); + exit(1); + } + fsync_fname(filename, false); +#endif + } + else + { + ereport(ERROR, + (errmsg("unrecognized link indicator \"%c\"", + copybuf[156]))); + } + continue; /* directory or link handled */ + } + + /* + * regular file + */ + file = fopen(filename, "wb"); + if (!file) + ereport(ERROR, + (errcode_for_file_access(), + (errmsg("could not create file \"%s\": %m", filename)))); + + flush_offset = 0; + +#ifndef WIN32 + if (chmod(filename, (mode_t) filemode)) + ereport(ERROR, + (errcode_for_file_access(), + (errmsg("could not set permissions on file \"%s\": %m", + filename)))); +#endif + + if (current_len_left == 0) + { + /* + * Done with this file, next one will be a new tar header + */ + pg_fsync(fileno(file)); + fclose(file); + file = NULL; + continue; + } + } /* new file */ + else + { + /* + * Continuing blocks in existing file + */ + if (current_len_left == 0 && r == current_padding) + { + /* + * Received the padding block for this file, ignore it and + * close the file, then move on to the next tar header. + */ + pg_fsync(fileno(file)); + fclose(file); + file = NULL; + continue; + } + + if (fwrite(copybuf, r, 1, file) != 1) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", filename))); + + pg_flush_data(fileno(file), flush_offset, r); + flush_offset += r; + totaldone += r; + base_backup_report_progress(); + + current_len_left -= r; + if (current_len_left == 0 && current_padding == 0) + { + /* + * Received the last block, and there is no padding to be + * expected. Close the file and move on to the next tar + * header. + */ + pg_fsync(fileno(file)); + fclose(file); + file = NULL; + continue; + } + } /* continuing data in existing file */ + } /* loop over all data blocks */ + base_backup_report_progress(); + + if (file != NULL) + ereport(ERROR, + (errmsg("COPY stream ended before last file was finished"))); + + if (copybuf != NULL) + PQfreemem(copybuf); +} + +/* + * Make base backup from remote and write to local disk. + */ +static void +libpqrcv_base_backup(WalReceiverConn *conn) +{ + StringInfoData stmt; + PGresult *res; + char xlogstart[64]; + TimeLineID starttli; + XLogRecPtr recptr; + bool error; + + ereport(LOG, + (errmsg("initiating base backup, waiting for remote checkpoint to complete"))); + set_ps_display("waiting for checkpoint", false); + + initStringInfo(&stmt); + appendStringInfo(&stmt, "BASE_BACKUP PROGRESS NOWAIT EXCLUDE_CONF"); + if (cluster_name && cluster_name[0]) + appendStringInfo(&stmt, " LABEL %s", quote_literal_cstr(cluster_name)); + + if (PQsendQuery(conn->streamConn, stmt.data) == 0) + ereport(ERROR, + (errmsg("could not start base backup on remote server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + + /* + * First result set: WAL start position and timeline ID + */ + res = PQgetResult(conn->streamConn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not start base backup on remote server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } + if (PQntuples(res) != 1) + { + PQclear(res); + ereport(ERROR, + (errmsg("server returned unexpected response to BASE_BACKUP command; got %d rows and %d fields, expected %d rows and %d fields", + PQntuples(res), PQnfields(res), 1, 2))); + } + + ereport(LOG, + (errmsg("remote checkpoint completed"))); + + strlcpy(xlogstart, PQgetvalue(res, 0, 0), sizeof(xlogstart)); + starttli = atoi(PQgetvalue(res, 0, 1)); + PQclear(res); + elog(DEBUG1, "write-ahead log start point: %s on timeline %u", + xlogstart, starttli); + recptr = pg_lsn_in_internal(xlogstart, &error); + if (error) + elog(ERROR, "invalid LSN received: %s", xlogstart); + + /* + * Second result set: tablespace information + */ + res = PQgetResult(conn->streamConn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not get backup header: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } + if (PQntuples(res) < 1) + { + PQclear(res); + ereport(ERROR, + (errmsg("no data returned from server"))); + } + + totalsize_kb = totaldone = 0; + tablespacecount = PQntuples(res); + for (int i = 0; i < PQntuples(res); i++) + { + totalsize_kb += atol(PQgetvalue(res, i, 2)); + } + + RequestXLogStreaming(starttli, recptr, PrimaryConnInfo, PrimarySlotName); + + /* + * Start receiving chunks + */ + for (int i = 0; i < PQntuples(res); i++) + { + tablespacenum = i; + ReceiveAndUnpackTarFile(conn->streamConn, res); + } + tablespacenum++; + base_backup_report_progress(); + + PQclear(res); + + /* + * Final result set: WAL end position and timeline ID + */ + res = PQgetResult(conn->streamConn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not get write-ahead log end position from server: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + } + if (PQntuples(res) != 1) + { + PQclear(res); + ereport(ERROR, + (errmsg("no write-ahead log end position returned from server"))); + } + PQclear(res); + + res = PQgetResult(conn->streamConn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { +#ifdef TODO + const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + + if (sqlstate && + strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0) + { + elog(ERROR, "checksum error occurred"); + } + else +#endif + { + elog(ERROR, "final receive failed: %s", + pchomp(PQerrorMessage(conn->streamConn))); + } + } + PQclear(res); +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index c4e11cc4e8..8c962bc711 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -78,6 +78,7 @@ static SQLCmd *make_sqlcmd(void); %token K_WAL %token K_TABLESPACE_MAP %token K_NOVERIFY_CHECKSUMS +%token K_EXCLUDE_CONF %token K_TIMELINE %token K_PHYSICAL %token K_LOGICAL @@ -154,8 +155,7 @@ var_name: IDENT { $$ = $1; } ; /* - * BASE_BACKUP [LABEL '