From 5866926dd581881af6b75c41e858125f9427b4e6 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 6 Mar 2024 06:58:48 +0000 Subject: [PATCH] Shorten main function --- src/bin/pg_basebackup/pg_createsubscriber.c | 516 +++++++++++--------- 1 file changed, 281 insertions(+), 235 deletions(-) diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index e70fc5dca0..80d76a78ce 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -70,8 +70,7 @@ static PGconn *connect_database(const char *conninfo, bool exit_on_error); static void disconnect_database(PGconn *conn, bool exit_on_error); static uint64 get_primary_sysid(const char *conninfo); static uint64 get_standby_sysid(const char *datadir); -static void modify_subscriber_sysid(const char *pg_resetwal_path, - struct CreateSubscriberOptions *opt); +static void modify_subscriber_sysid(struct CreateSubscriberOptions *opt); static bool server_is_in_recovery(PGconn *conn); static void check_publisher(struct LogicalRepInfo *dbinfo); static void setup_publisher(struct LogicalRepInfo *dbinfo); @@ -86,10 +85,12 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, static char *setup_server_logfile(const char *datadir); static void pg_ctl_status(const char *pg_ctl_cmd, int rc); static void start_standby_server(struct CreateSubscriberOptions *opt, - const char *pg_ctl_path, const char *logfile, + const char *logfile, bool with_options); -static void stop_standby_server(const char *pg_ctl_path, const char *datadir); -static void wait_for_end_recovery(const char *conninfo, const char *pg_ctl_path, +static void stop_standby_server(const char *datadir); +static void restart_server(struct CreateSubscriberOptions *options, + const char *logfile) +static void wait_for_end_recovery(const char *conninfo, struct CreateSubscriberOptions *opt); static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); @@ -97,11 +98,20 @@ static void create_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo); static void set_replication_progress(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *lsn); static void enable_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo); +static void parse_command_option(int argc, char **argv, + struct CreateSubscriberOptions *options); +static void verification_phase(struct CreateSubscriberOptions *options); +static char *catchup_phase(struct CreateSubscriberOptions *options, + char *server_start_log); +static void cleanup_phase(struct CreateSubscriberOptions *options, + char *server_start_log); #define USEC_PER_SEC 1000000 #define WAIT_INTERVAL 1 /* 1 second */ static const char *progname; +static const char *pg_ctl_path; +static const char *pg_resetwal_path; static char *primary_slot_name = NULL; static bool dry_run = false; @@ -521,7 +531,7 @@ get_standby_sysid(const char *datadir) * files from one of the systems might be used in the other one. */ static void -modify_subscriber_sysid(const char *pg_resetwal_path, struct CreateSubscriberOptions *opt) +modify_subscriber_sysid(struct CreateSubscriberOptions *opt) { ControlFileData *cf; bool crc_ok; @@ -1163,8 +1173,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc) } static void -start_standby_server(struct CreateSubscriberOptions *opt, const char *pg_ctl_path, - const char *logfile, bool with_options) +start_standby_server(struct CreateSubscriberOptions *opt, const char *logfile, + bool with_options) { PQExpBuffer pg_ctl_cmd = createPQExpBuffer(); char socket_string[MAXPGPATH + 200]; @@ -1210,7 +1220,7 @@ start_standby_server(struct CreateSubscriberOptions *opt, const char *pg_ctl_pat } static void -stop_standby_server(const char *pg_ctl_path, const char *datadir) +stop_standby_server(const char *datadir) { char *pg_ctl_cmd; int rc; @@ -1223,6 +1233,25 @@ stop_standby_server(const char *pg_ctl_path, const char *datadir) pg_log_info("server was stopped"); } +/* + * Wrapper for stop_standby_server() and start_standby_server() + */ +static void +restart_server(struct CreateSubscriberOptions *options, const char *logfile) +{ + struct stat statbuf; + char pidfile[MAXPGPATH]; + + /* Subscriber PID file */ + snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", options->subscriber_dir); + + /* If the standby server is running, stop it */ + if (stat(pidfile, &statbuf) == 0) + stop_standby_server(options->subscriber_dir); + + start_standby_server(options, logfile, true); +} + /* * Returns after the server finishes the recovery process. * @@ -1230,7 +1259,7 @@ stop_standby_server(const char *pg_ctl_path, const char *datadir) * the recovery process. By default, it waits forever. */ static void -wait_for_end_recovery(const char *conninfo, const char *pg_ctl_path, +wait_for_end_recovery(const char *conninfo, struct CreateSubscriberOptions *opt) { PGconn *conn; @@ -1272,7 +1301,7 @@ wait_for_end_recovery(const char *conninfo, const char *pg_ctl_path, { if (++count > NUM_CONN_ATTEMPTS) { - stop_standby_server(pg_ctl_path, opt->subscriber_dir); + stop_standby_server(opt->subscriber_dir); pg_log_error("standby server disconnected from the primary"); break; } @@ -1285,7 +1314,7 @@ wait_for_end_recovery(const char *conninfo, const char *pg_ctl_path, /* Bail out after recovery_timeout seconds if this option is set */ if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout) { - stop_standby_server(pg_ctl_path, opt->subscriber_dir); + stop_standby_server(opt->subscriber_dir); pg_log_error("recovery timed out"); disconnect_database(conn, true); } @@ -1581,165 +1610,20 @@ enable_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo) destroyPQExpBuffer(str); } -int -main(int argc, char **argv) +/* + * Verify the input arguments are appropriate. + */ +static void +verify_input_arguments(struct CreateSubscriberOptions *options) { - static struct option long_options[] = - { - {"database", required_argument, NULL, 'd'}, - {"pgdata", required_argument, NULL, 'D'}, - {"dry-run", no_argument, NULL, 'n'}, - {"subscriber-port", required_argument, NULL, 'p'}, - {"publisher-server", required_argument, NULL, 'P'}, - {"retain", no_argument, NULL, 'r'}, - {"socket-directory", required_argument, NULL, 's'}, - {"recovery-timeout", required_argument, NULL, 't'}, - {"subscriber-username", required_argument, NULL, 'U'}, - {"verbose", no_argument, NULL, 'v'}, - {"version", no_argument, NULL, 'V'}, - {"help", no_argument, NULL, '?'}, - {NULL, 0, NULL, 0} - }; - - struct CreateSubscriberOptions opt = {0}; - - int c; - int option_index; - - char *pg_ctl_path = NULL; - char *pg_resetwal_path = NULL; - - char *server_start_log; - - char *pub_base_conninfo = NULL; - char *sub_base_conninfo = NULL; char *dbname_conninfo = NULL; - - uint64 pub_sysid; - uint64 sub_sysid; - struct stat statbuf; - - PGconn *conn; - char *consistent_lsn; - - PQExpBuffer sub_conninfo_str = createPQExpBuffer(); - PQExpBuffer recoveryconfcontents = NULL; - - char pidfile[MAXPGPATH]; - - pg_logging_init(argv[0]); - pg_logging_set_level(PG_LOG_WARNING); - progname = get_progname(argv[0]); - set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber")); - - if (argc > 1) - { - if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) - { - usage(); - exit(0); - } - else if (strcmp(argv[1], "-V") == 0 - || strcmp(argv[1], "--version") == 0) - { - puts("pg_createsubscriber (PostgreSQL) " PG_VERSION); - exit(0); - } - } - - /* Default settings */ - opt.subscriber_dir = NULL; - opt.pub_conninfo_str = NULL; - opt.socket_dir = NULL; - opt.sub_port = DEFAULT_SUB_PORT; - opt.sub_username = NULL; - opt.database_names = (SimpleStringList) - { - NULL, NULL - }; - opt.retain = false; - opt.recovery_timeout = 0; - - /* - * Don't allow it to be run as root. It uses pg_ctl which does not allow - * it either. - */ -#ifndef WIN32 - if (geteuid() == 0) - { - pg_log_error("cannot be executed by \"root\""); - pg_log_error_hint("You must run %s as the PostgreSQL superuser.", - progname); - exit(1); - } -#endif - - get_restricted_token(); - - while ((c = getopt_long(argc, argv, "d:D:nP:rS:t:v", - long_options, &option_index)) != -1) - { - switch (c) - { - case 'd': - /* Ignore duplicated database names */ - if (!simple_string_list_member(&opt.database_names, optarg)) - { - simple_string_list_append(&opt.database_names, optarg); - num_dbs++; - } - break; - case 'D': - opt.subscriber_dir = pg_strdup(optarg); - canonicalize_path(opt.subscriber_dir); - break; - case 'n': - dry_run = true; - break; - case 'p': - if ((opt.sub_port = atoi(optarg)) <= 0) - pg_fatal("invalid subscriber port number"); - break; - case 'P': - opt.pub_conninfo_str = pg_strdup(optarg); - break; - case 'r': - opt.retain = true; - break; - case 's': - opt.socket_dir = pg_strdup(optarg); - break; - case 't': - opt.recovery_timeout = atoi(optarg); - break; - case 'U': - opt.sub_username = pg_strdup(optarg); - break; - case 'v': - pg_logging_increase_verbosity(); - break; - default: - /* getopt_long already emitted a complaint */ - pg_log_error_hint("Try \"%s --help\" for more information.", progname); - exit(1); - } - } - - /* - * Any non-option arguments? - */ - if (optind < argc) - { - pg_log_error("too many command-line arguments (first is \"%s\")", - argv[optind]); - pg_log_error_hint("Try \"%s --help\" for more information.", progname); - exit(1); - } + char *pub_base_conninfo; + PQExpBuffer sub_conninfo_str = createPQExpBuffer(); /* * Required arguments */ - if (opt.subscriber_dir == NULL) + if (options->subscriber_dir == NULL) { pg_log_error("no subscriber data directory specified"); pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -1749,14 +1633,14 @@ main(int argc, char **argv) /* * If socket directory is not provided, use the current directory. */ - if (opt.socket_dir == NULL) + if (options->socket_dir == NULL) { char cwd[MAXPGPATH]; if (!getcwd(cwd, MAXPGPATH)) pg_fatal("could not determine current directory"); - opt.socket_dir = pg_strdup(cwd); - canonicalize_path(opt.socket_dir); + options->socket_dir = pg_strdup(cwd); + canonicalize_path(options->socket_dir); } /* @@ -1765,17 +1649,17 @@ main(int argc, char **argv) * variable sets it. If not, obtain the operating system name of the user * running it. */ - if (opt.sub_username == NULL) + if (options->sub_username == NULL) { char *errstr = NULL; if (getenv("PGUSER")) { - opt.sub_username = getenv("PGUSER"); + options->sub_username = getenv("PGUSER"); } else { - opt.sub_username = get_user_name(&errstr); + options->sub_username = get_user_name(&errstr); if (errstr) pg_fatal("%s", errstr); } @@ -1785,7 +1669,7 @@ main(int argc, char **argv) * Parse connection string. Build a base connection string that might be * reused by multiple databases. */ - if (opt.pub_conninfo_str == NULL) + if (options->pub_conninfo_str == NULL) { /* * TODO use primary_conninfo (if available) from subscriber and @@ -1798,19 +1682,16 @@ main(int argc, char **argv) exit(1); } pg_log_info("validating connection string on publisher"); - pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str, + pub_base_conninfo = get_base_conninfo(options->pub_conninfo_str, &dbname_conninfo); if (pub_base_conninfo == NULL) exit(1); pg_log_info("validating connection string on subscriber"); appendPQExpBuffer(sub_conninfo_str, "host=%s port=%u user=%s fallback_application_name=%s", - opt.socket_dir, opt.sub_port, opt.sub_username, progname); - sub_base_conninfo = get_base_conninfo(sub_conninfo_str->data, NULL); - if (sub_base_conninfo == NULL) - exit(1); + options->socket_dir, options->sub_port, options->sub_username, progname); - if (opt.database_names.head == NULL) + if (options->database_names.head == NULL) { pg_log_info("no database was specified"); @@ -1821,7 +1702,7 @@ main(int argc, char **argv) */ if (dbname_conninfo) { - simple_string_list_append(&opt.database_names, dbname_conninfo); + simple_string_list_append(&options->database_names, dbname_conninfo); num_dbs++; pg_log_info("database \"%s\" was extracted from the publisher connection string", @@ -1836,58 +1717,134 @@ main(int argc, char **argv) } } - /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */ - pg_ctl_path = get_exec_path(argv[0], "pg_ctl"); - pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal"); - /* Rudimentary check for a data directory */ - check_data_directory(opt.subscriber_dir); + check_data_directory(options->subscriber_dir); /* * Store database information for publisher and subscriber. It should be * called before atexit() because its return is used in the * cleanup_objects_atexit(). */ - dbinfo = store_pub_sub_info(opt.database_names, pub_base_conninfo, - sub_base_conninfo); + dbinfo = store_pub_sub_info(options->database_names, pub_base_conninfo, + sub_conninfo_str->data); - /* Register a function to clean up objects in case of failure */ - atexit(cleanup_objects_atexit); + pfree(dbname_conninfo); + pfree(pub_base_conninfo); + destroyPQExpBuffer(sub_conninfo_str); +} - /* - * Check if the subscriber data directory has the same system identifier - * than the publisher data directory. - */ - pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo); - sub_sysid = get_standby_sysid(opt.subscriber_dir); - if (pub_sysid != sub_sysid) - pg_fatal("subscriber data directory is not a copy of the source database cluster"); +/* + * Parse command-line options and store into CreateSubscriberOptions. + */ +static void +parse_command_option(int argc, char **argv, struct CreateSubscriberOptions *options) +{ + static struct option long_options[] = + { + {"database", required_argument, NULL, 'd'}, + {"pgdata", required_argument, NULL, 'D'}, + {"dry-run", no_argument, NULL, 'n'}, + {"subscriber-port", required_argument, NULL, 'p'}, + {"publisher-server", required_argument, NULL, 'P'}, + {"retain", no_argument, NULL, 'r'}, + {"socket-directory", required_argument, NULL, 's'}, + {"recovery-timeout", required_argument, NULL, 't'}, + {"subscriber-username", required_argument, NULL, 'U'}, + {"verbose", no_argument, NULL, 'v'}, + {"version", no_argument, NULL, 'V'}, + {"help", no_argument, NULL, '?'}, + {NULL, 0, NULL, 0} + }; - /* Create the output directory to store any data generated by this tool */ - server_start_log = setup_server_logfile(opt.subscriber_dir); + int c; + int option_index; - /* Subscriber PID file */ - snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", opt.subscriber_dir); + get_restricted_token(); + + while ((c = getopt_long(argc, argv, "d:D:nP:rS:t:v", + long_options, &option_index)) != -1) + { + switch (c) + { + case 'd': + /* Ignore duplicated database names */ + if (!simple_string_list_member(&options->database_names, optarg)) + { + simple_string_list_append(&options->database_names, optarg); + num_dbs++; + } + break; + case 'D': + options->subscriber_dir = pg_strdup(optarg); + canonicalize_path(options->subscriber_dir); + break; + case 'n': + dry_run = true; + break; + case 'p': + if ((options->sub_port = atoi(optarg)) <= 0) + pg_fatal("invalid subscriber port number"); + break; + case 'P': + options->pub_conninfo_str = pg_strdup(optarg); + break; + case 'r': + options->retain = true; + break; + case 's': + options->socket_dir = pg_strdup(optarg); + break; + case 't': + options->recovery_timeout = atoi(optarg); + break; + case 'U': + options->sub_username = pg_strdup(optarg); + break; + case 'v': + pg_logging_increase_verbosity(); + break; + default: + /* getopt_long already emitted a complaint */ + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + } /* - * If the standby server is running, stop it. Some parameters (that can - * only be set at server start) are informed by command-line options. + * Any non-option arguments? */ - if (stat(pidfile, &statbuf) == 0) + if (optind < argc) { - - pg_log_info("standby is up and running"); - pg_log_info("stopping the server to start the transformation steps"); - stop_standby_server(pg_ctl_path, opt.subscriber_dir); + pg_log_error("too many command-line arguments (first is \"%s\")", + argv[optind]); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); } + verify_input_arguments(options); + + /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */ + pg_ctl_path = get_exec_path(argv[0], "pg_ctl"); + pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal"); +} + +/* + * Check whether nodes can be a logical replication cluster + */ +static void +verification_phase(struct CreateSubscriberOptions *options) +{ + uint64 pub_sysid; + uint64 sub_sysid; + /* - * Start a short-lived standby server with temporary parameters (provided - * by command-line options). The goal is to avoid connections during the - * transformation steps. + * Check if the subscriber data directory has the same system identifier + * than the publisher data directory. */ - pg_log_info("starting the standby with command-line options"); - start_standby_server(&opt, pg_ctl_path, server_start_log, true); + pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo); + sub_sysid = get_standby_sysid(options->subscriber_dir); + if (pub_sysid != sub_sysid) + pg_fatal("subscriber data directory is not a copy of the source database cluster"); /* Check if the standby server is ready for logical replication */ check_subscriber(dbinfo); @@ -1899,14 +1856,17 @@ main(int argc, char **argv) * called after it. */ check_publisher(dbinfo); +} - /* - * Create the required objects for each database on publisher. This step - * is here mainly because if we stop the standby we cannot verify if the - * primary slot is in use. We could use an extra connection for it but it - * doesn't seem worth. - */ - setup_publisher(dbinfo); +/* + * Ensure the target server is caught up to the primary + */ +static char * +catchup_phase(struct CreateSubscriberOptions *options, char *server_start_log) +{ + PGconn *conn; + char *consistent_lsn; + PQExpBuffer recoveryconfcontents = NULL; /* * Create a temporary logical replication slot to get a consistent LSN. @@ -1959,7 +1919,7 @@ main(int argc, char **argv) { appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n", consistent_lsn); - WriteRecoveryConfig(conn, opt.subscriber_dir, recoveryconfcontents); + WriteRecoveryConfig(conn, options->subscriber_dir, recoveryconfcontents); } disconnect_database(conn, false); @@ -1970,20 +1930,18 @@ main(int argc, char **argv) * until accepting connections. */ pg_log_info("stopping and starting the subscriber"); - stop_standby_server(pg_ctl_path, opt.subscriber_dir); - start_standby_server(&opt, pg_ctl_path, server_start_log, true); + restart_server(options, server_start_log); /* Waiting the subscriber to be promoted */ - wait_for_end_recovery(dbinfo[0].subconninfo, pg_ctl_path, &opt); + wait_for_end_recovery(dbinfo[0].subconninfo, options); - /* - * Create the subscription for each database on subscriber. It does not - * enable it immediately because it needs to adjust the logical - * replication start point to the LSN reported by consistent_lsn (see - * set_replication_progress). It also cleans up publications created by - * this tool and replication to the standby. - */ - setup_subscriber(dbinfo, consistent_lsn); + return consistent_lsn; +} + +static void +cleanup_phase(struct CreateSubscriberOptions *options, char *server_start_log) +{ + PGconn *conn; /* * If the primary_slot_name exists on primary, drop it. @@ -2009,10 +1967,10 @@ main(int argc, char **argv) /* Stop the subscriber */ pg_log_info("stopping the subscriber"); - stop_standby_server(pg_ctl_path, opt.subscriber_dir); + stop_standby_server(options->subscriber_dir); /* Change system identifier from subscriber */ - modify_subscriber_sysid(pg_resetwal_path, &opt); + modify_subscriber_sysid(options); /* * In dry run mode, the server is restarted with the provided command-line @@ -2021,14 +1979,102 @@ main(int argc, char **argv) * the command-line options. */ if (dry_run) - start_standby_server(&opt, pg_ctl_path, NULL, false); + start_standby_server(options, NULL, false); /* * The log file is kept if retain option is specified or this tool does * not run successfully. Otherwise, log file is removed. */ - if (!dry_run && !opt.retain) + if (!dry_run && !options->retain) unlink(server_start_log); +} + +int +main(int argc, char **argv) +{ + struct CreateSubscriberOptions opt = {0}; + char *server_start_log; + char *consistent_lsn; + + pg_logging_init(argv[0]); + pg_logging_set_level(PG_LOG_WARNING); + progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber")); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(); + exit(0); + } + else if (strcmp(argv[1], "-V") == 0 + || strcmp(argv[1], "--version") == 0) + { + puts("pg_createsubscriber (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + /* Default settings */ + opt.subscriber_dir = NULL; + opt.pub_conninfo_str = NULL; + opt.socket_dir = NULL; + opt.sub_port = DEFAULT_SUB_PORT; + opt.sub_username = NULL; + opt.database_names = (SimpleStringList) + { + NULL, NULL + }; + opt.retain = false; + opt.recovery_timeout = 0; + + /* + * Don't allow it to be run as root. It uses pg_ctl which does not allow + * it either. + */ +#ifndef WIN32 + if (geteuid() == 0) + { + pg_log_error("cannot be executed by \"root\""); + pg_log_error_hint("You must run %s as the PostgreSQL superuser.", + progname); + exit(1); + } +#endif + + parse_command_option(argc, argv, &opt); + + /* Create the output directory to store any data generated by this tool */ + server_start_log = setup_server_logfile(opt.subscriber_dir); + + restart_server(&opt, server_start_log); + + verification_phase(&opt); + + /* Register a function to clean up objects in case of failure */ + atexit(cleanup_objects_atexit); + + /* + * Create the required objects for each database on publisher. This step + * is here mainly because if we stop the standby we cannot verify if the + * primary slot is in use. We could use an extra connection for it but it + * doesn't seem worth. + */ + setup_publisher(dbinfo); + + consistent_lsn = catchup_phase(&opt, server_start_log); + + /* + * Create the subscription for each database on subscriber. It does not + * enable it immediately because it needs to adjust the logical + * replication start point to the LSN reported by consistent_lsn (see + * set_replication_progress). It also cleans up publications created by + * this tool and replication to the standby. + */ + setup_subscriber(dbinfo, consistent_lsn); + + cleanup_phase(&opt, server_start_log); success = true; -- 2.43.0