From 3a17a292805451c7b1733bd1e331bee91b2ce1c5 Mon Sep 17 00:00:00 2001 From: Julien Rouhaud Date: Wed, 22 Feb 2023 09:19:32 +0800 Subject: [PATCH v3] Optionally preserve the full subscription's state during pg_upgrade Previously, only the subscription metadata information was preserved. Without the list of relations and their state it's impossible to re-enable the subscriptions without missing some records as the list of relations can only be refreshed after enabling the subscription (and therefore starting the apply worker). Even if we added a way to refresh the subscription while enabling a publication, we still wouldn't know which relations are new on the publication side, and therefore should be fully synced, and which shouldn't. Similarly, the subscription's replication origin are needed to ensure that we don't replicate anything twice. To fix this problem, this patch teaches pg_dump in binary upgrade mode to emit additional commands to be able to restore the content of pg_subscription_rel, and addition LSN parameter in the subscription creation to restore the underlying replication origin remote LSN. The LSN parameter is only accepted in CREATE SUBSCRIPTION in binary upgrade mode. The new ALTER SUBSCRIPTION subcommand, usable only during binary upgrade, has the following syntax: ALTER SUBSCRIPTION name ADD TABLE (relid = XYZ, state = 'x' [, lsn = 'X/Y']) The relation is identified by its oid, as it's preserved during pg_upgrade. The lsn is optional, and defaults to NULL / InvalidXLogRecPtr. This mode is optional and not enabled by default. A new --preserve-subscription-state option is added to pg_upgrade to use it. For now, pg_upgrade will check that all the subscription relations are in 'r' (ready) state, and will error out if any subscription relation in any database has a different state, logging the list of problematic databases with the number of problematic relation in each. Author: Julien Rouhaud Reviewed-by: FIXME Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud --- doc/src/sgml/ref/pgupgrade.sgml | 18 ++++ src/backend/commands/subscriptioncmds.c | 67 +++++++++++++- src/backend/parser/gram.y | 11 +++ src/bin/pg_dump/pg_backup.h | 2 + src/bin/pg_dump/pg_dump.c | 114 +++++++++++++++++++++++- src/bin/pg_dump/pg_dump.h | 13 +++ src/bin/pg_upgrade/check.c | 54 +++++++++++ src/bin/pg_upgrade/dump.c | 3 +- src/bin/pg_upgrade/option.c | 7 ++ src/bin/pg_upgrade/pg_upgrade.h | 1 + src/include/nodes/parsenodes.h | 3 +- 11 files changed, 288 insertions(+), 5 deletions(-) diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml index 7816b4c685..0b3a8fd57b 100644 --- a/doc/src/sgml/ref/pgupgrade.sgml +++ b/doc/src/sgml/ref/pgupgrade.sgml @@ -240,6 +240,24 @@ PostgreSQL documentation + + + + + Fully preserve the logical subscription state if any. That includes + the underlying replication origin with their remote LSN and the list of + relations in each subscription so that replication can be simply + resumed if the subscriptions are reactived. + If that option isn't used, it is up to the user to reactivate the + subscriptions in a suitable way; see the subscription part in for more information. + If this option is used and any of the subscription on the old cluster + has any relation in a state different from r + (ready), the pg_upgrade run will error. + + + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 8a26ddab1c..9e9d011c06 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -66,6 +66,8 @@ #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 #define SUBOPT_ORIGIN 0x00001000 +#define SUBOPT_RELID 0x00002000 +#define SUBOPT_STATE 0x00004000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -90,6 +92,8 @@ typedef struct SubOpts bool disableonerr; char *origin; XLogRecPtr lsn; + Oid relid; + char state; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -324,6 +328,38 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_RELID) && + strcmp(defel->defname, "relid") == 0) + { + Oid relid = defGetObjectId(defel); + + if (IsSet(opts->specified_opts, SUBOPT_RELID)) + errorConflictingDefElem(defel, pstate); + + if (!OidIsValid(relid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid relation identifier used"))); + + opts->specified_opts |= SUBOPT_RELID; + opts->relid = relid; + } + else if (IsSet(supported_opts, SUBOPT_STATE) && + strcmp(defel->defname, "state") == 0) + { + char *state_str = defGetString(defel); + + if (IsSet(opts->specified_opts, SUBOPT_STATE)) + errorConflictingDefElem(defel, pstate); + + if (strlen(state_str) != 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid relation state used"))); + + opts->specified_opts |= SUBOPT_STATE; + opts->state = defGetString(defel)[0]; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -550,6 +586,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, List *publications; bits32 supported_opts; SubOpts opts = {0}; + RepOriginId originid; /* * Parse and check options. @@ -561,6 +598,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN); + if(IsBinaryUpgrade) + supported_opts |= SUBOPT_LSN; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -659,7 +698,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); - replorigin_create(originname); + originid = replorigin_create(originname); + + if (IsBinaryUpgrade && IsSet(opts.lsn, SUBOPT_LSN)) + replorigin_advance(originid, opts.lsn, InvalidXLogRecPtr, + false /* backward */ , + false /* WAL log */ ); /* * Connect to remote side to execute requested commands and fetch table @@ -1341,6 +1385,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_ADD_TABLE: + { + if (!IsBinaryUpgrade) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR)), + errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not supported")); + + supported_opts = SUBOPT_RELID | SUBOPT_STATE | SUBOPT_LSN; + parse_subscription_options(pstate, stmt->options, + supported_opts, &opts); + + /* relid and state should always be provided. */ + Assert(IsSet(opts.specified_opts, SUBOPT_RELID)); + Assert(IsSet(opts.specified_opts, SUBOPT_STATE)); + + AddSubscriptionRelState(subid, opts.relid, opts.state, + opts.lsn); + + break; + } + default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index efe88ccf9d..43e8039a68 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -10670,6 +10670,17 @@ AlterSubscriptionStmt: n->options = $5; $$ = (Node *) n; } + /* for binary upgrade only */ + | ALTER SUBSCRIPTION name ADD_P TABLE definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_ADD_TABLE; + n->subname = $3; + n->options = $6; + $$ = (Node *) n; + } ; /***************************************************************************** diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index aba780ef4b..8a72a39d60 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -200,6 +200,8 @@ typedef struct _dumpOptions int sequence_data; /* dump sequence data even in schema-only mode */ int do_nothing; + + bool preserve_subscriptions; } DumpOptions; /* diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index d62780a088..d949f4b72d 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -430,6 +430,7 @@ main(int argc, char **argv) {"table-and-children", required_argument, NULL, 12}, {"exclude-table-and-children", required_argument, NULL, 13}, {"exclude-table-data-and-children", required_argument, NULL, 14}, + {"preserve-subscription-state", no_argument, NULL, 15}, {NULL, 0, NULL, 0} }; @@ -656,6 +657,10 @@ main(int argc, char **argv) optarg); break; + case 15: /* include full subscription state */ + dopt.preserve_subscriptions = true; + break; + default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -713,6 +718,10 @@ main(int argc, char **argv) if (dopt.do_nothing && dopt.dump_inserts == 0) pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts"); + /* --preserve-subscription-state requires --binary-upgrade */ + if (dopt.preserve_subscriptions && !dopt.binary_upgrade) + pg_fatal("option --preserve-subscription-state requires option --binary-upgrade"); + /* Identify archive format to emit */ archiveFormat = parseArchiveFormat(format, &archiveMode); @@ -4584,6 +4593,69 @@ is_superuser(Archive *fout) return false; } +/* + * getSubscriptionRels + * get information about the given subscription's relations + */ +static SubRelInfo * +getSubscriptionRels(Archive *fout, Oid subid, int *nrels) +{ + SubRelInfo *rels; + PQExpBuffer query; + PGresult *res; + int i_srrelid; + int i_srsubstate; + int i_srsublsn; + int i, + ntups; + + if (!fout->dopt->binary_upgrade || !fout->dopt->preserve_subscriptions) + { + *nrels = 0; + + return NULL; + } + + query = createPQExpBuffer(); + + appendPQExpBuffer(query, "SELECT srrelid, srsubstate, srsublsn " + " FROM pg_subscription_rel" + " WHERE srsubid = %u", subid); + + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + *nrels = ntups; + + if (ntups == 0) + { + rels = NULL; + goto cleanup; + } + + /* + * Get subscription relation fields. + */ + i_srrelid = PQfnumber(res, "srrelid"); + i_srsubstate = PQfnumber(res, "srsubstate"); + i_srsublsn = PQfnumber(res, "srsublsn"); + + rels = pg_malloc(ntups * sizeof(SubRelInfo)); + + for (i = 0; i < ntups; i++) + { + rels[i].srrelid = atooid(PQgetvalue(res, i, i_srrelid)); + rels[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0]; + rels[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn)); + } + +cleanup: + PQclear(res); + destroyPQExpBuffer(query); + + return rels; +} + /* * getSubscriptions * get information about subscriptions @@ -4608,6 +4680,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_subbinary; + int i_suboriginremotelsn; int i, ntups; @@ -4660,12 +4733,16 @@ getSubscriptions(Archive *fout) LOGICALREP_TWOPHASE_STATE_DISABLED); if (fout->remoteVersion >= 160000) - appendPQExpBufferStr(query, " s.suborigin\n"); + appendPQExpBufferStr(query, " s.suborigin,\n"); else - appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY); + appendPQExpBuffer(query, " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY); + + appendPQExpBufferStr(query, "o.remote_lsn\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n" + "LEFT JOIN pg_replication_origin_status o \n" + " ON o.external_id = 'pg_' || s.oid::text \n" "WHERE s.subdbid = (SELECT oid FROM pg_database\n" " WHERE datname = current_database())"); @@ -4690,6 +4767,7 @@ getSubscriptions(Archive *fout) i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); i_suborigin = PQfnumber(res, "suborigin"); + i_suboriginremotelsn = PQfnumber(res, "remote_lsn"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4720,6 +4798,15 @@ getSubscriptions(Archive *fout) subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); + if (PQgetisnull(res, i, i_suboriginremotelsn)) + subinfo[i].suboriginremotelsn = NULL; + else + subinfo[i].suboriginremotelsn = + pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn)); + + subinfo[i].subrels = getSubscriptionRels(fout, + subinfo[i].dobj.catId.oid, + &subinfo[i].nrels); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4801,9 +4888,31 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); + if (dopt->binary_upgrade && dopt->preserve_subscriptions && + subinfo->suboriginremotelsn) + { + appendPQExpBuffer(query, ", lsn = '%s'", subinfo->suboriginremotelsn); + } + appendPQExpBufferStr(query, ");\n"); if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) + { + for (i = 0; i < subinfo->nrels; i++) + { + appendPQExpBuffer(query, "\nALTER SUBSCRIPTION %s ADD TABLE " + "(relid = %u, state = '%c'", + qsubname, + subinfo->subrels[i].srrelid, + subinfo->subrels[i].srsubstate); + + if (subinfo->subrels[i].srsublsn[0] != '\0') + appendPQExpBuffer(query, ", LSN = '%s'", + subinfo->subrels[i].srsublsn); + + appendPQExpBufferStr(query, ");"); + } + ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId, ARCHIVE_OPTS(.tag = subinfo->dobj.name, .owner = subinfo->rolname, @@ -4811,6 +4920,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) .section = SECTION_POST_DATA, .createStmt = query->data, .dropStmt = delq->data)); + } if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT) dumpComment(fout, "SUBSCRIPTION", qsubname, diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 283cd1a602..fa5dd41541 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -647,6 +647,16 @@ typedef struct _PublicationSchemaInfo NamespaceInfo *pubschema; } PublicationSchemaInfo; +/* + * The SubRelInfo struct is used to represent subscription relation. + */ +typedef struct _SubRelInfo +{ + Oid srrelid; + char srsubstate; + char *srsublsn; +} SubRelInfo; + /* * The SubscriptionInfo struct is used to represent subscription. */ @@ -663,6 +673,9 @@ typedef struct _SubscriptionInfo char *suborigin; char *subsynccommit; char *subpublications; + char *suboriginremotelsn; + int nrels; + SubRelInfo *subrels; } SubscriptionInfo; /* diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index fea159689e..7961cd8110 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -20,6 +20,7 @@ static void check_is_install_user(ClusterInfo *cluster); static void check_proper_datallowconn(ClusterInfo *cluster); static void check_for_prepared_transactions(ClusterInfo *cluster); static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster); +static void check_for_subscription_rels(ClusterInfo *cluster); static void check_for_user_defined_postfix_ops(ClusterInfo *cluster); static void check_for_incompatible_polymorphics(ClusterInfo *cluster); static void check_for_tables_with_oids(ClusterInfo *cluster); @@ -103,6 +104,8 @@ check_and_dump_old_cluster(bool live_check) check_for_composite_data_type_usage(&old_cluster); check_for_reg_data_type_usage(&old_cluster); check_for_isn_and_int8_passing_mismatch(&old_cluster); + if (user_opts.preserve_subscriptions) + check_for_subscription_rels(&old_cluster); /* * PG 16 increased the size of the 'aclitem' type, which breaks the on-disk @@ -785,6 +788,57 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster) check_ok(); } +/* + * check_for_subscription_rels() + * + * Verify that no table in a subscription is in a state different than ready. + */ +static void +check_for_subscription_rels(ClusterInfo *cluster) +{ + int dbnum; + bool is_error = false; + + Assert(user_opts.preserve_subscriptions); + + /* No subscription before pg10. */ + if (GET_MAJOR_VERSION(cluster->major_version < 1000)) + return; + + prep_status("Checking for non-ready subscription relations"); + + for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + { + PGresult *res; + int nb; + DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; + PGconn *conn = connectToServer(cluster, active_db->db_name); + + res = executeQueryOrDie(conn, + "SELECT count(0) " + "FROM pg_catalog.pg_subscription_rel " + "WHERE srsubstate != 'r'"); + + if (PQntuples(res) != 1) + pg_fatal("could not determine the number of non-ready subscription relations"); + + nb = atooid(PQgetvalue(res, 0, 0)); + if (nb != 0) + { + is_error = true; + pg_log(PG_WARNING, + "\nWARNING: database \"%s\" has %d subscription " + "relations(s) in non-ready state", active_db->db_name, nb); + } + } + + if (is_error) + pg_fatal("--preserve-subscription-state is incompatible with " + "subscription relations in non-ready state"); + + check_ok(); +} + /* * Verify that no user defined postfix operators exist. */ diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c index 6c8c82dca8..9284576af7 100644 --- a/src/bin/pg_upgrade/dump.c +++ b/src/bin/pg_upgrade/dump.c @@ -53,9 +53,10 @@ generate_old_dump(void) parallel_exec_prog(log_file_name, NULL, "\"%s/pg_dump\" %s --schema-only --quote-all-identifiers " - "--binary-upgrade --format=custom %s --file=\"%s/%s\" %s", + "--binary-upgrade --format=custom %s %s --file=\"%s/%s\" %s", new_cluster.bindir, cluster_conn_opts(&old_cluster), log_opts.verbose ? "--verbose" : "", + user_opts.preserve_subscriptions ? "--preserve-subscription-state" : "", log_opts.dumpdir, sql_file_name, escaped_connstr.data); diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c index 8869b6b60d..b033aa26ba 100644 --- a/src/bin/pg_upgrade/option.c +++ b/src/bin/pg_upgrade/option.c @@ -57,6 +57,7 @@ parseCommandLine(int argc, char *argv[]) {"verbose", no_argument, NULL, 'v'}, {"clone", no_argument, NULL, 1}, {"copy", no_argument, NULL, 2}, + {"preserve-subscription-state", no_argument, NULL, 3}, {NULL, 0, NULL, 0} }; @@ -66,6 +67,7 @@ parseCommandLine(int argc, char *argv[]) user_opts.do_sync = true; user_opts.transfer_mode = TRANSFER_MODE_COPY; + user_opts.preserve_subscriptions = false; os_info.progname = get_progname(argv[0]); @@ -199,6 +201,10 @@ parseCommandLine(int argc, char *argv[]) user_opts.transfer_mode = TRANSFER_MODE_COPY; break; + case 3: + user_opts.preserve_subscriptions = true; + break; + default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), os_info.progname); @@ -289,6 +295,7 @@ usage(void) printf(_(" -V, --version display version information, then exit\n")); printf(_(" --clone clone instead of copying files to new cluster\n")); printf(_(" --copy copy files to new cluster (default)\n")); + printf(_(" --preserve-subscription-state preserve the subscription state fully\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\n" "Before running pg_upgrade you must:\n" diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 3eea0139c7..131fd9a56e 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -304,6 +304,7 @@ typedef struct transferMode transfer_mode; /* copy files or link them? */ int jobs; /* number of processes/threads to use */ char *socketdir; /* directory to use for Unix sockets */ + bool preserve_subscriptions; /* fully transfer subscription state */ } UserOpts; typedef struct diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 028588fb33..6b47efb884 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3921,7 +3921,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, ALTER_SUBSCRIPTION_ENABLED, - ALTER_SUBSCRIPTION_SKIP + ALTER_SUBSCRIPTION_SKIP, + ALTER_SUBSCRIPTION_ADD_TABLE } AlterSubscriptionType; typedef struct AlterSubscriptionStmt -- 2.37.0