From 18ccb63d223e020fd3027e2ddcbc997eb968c1ba Mon Sep 17 00:00:00 2001 From: Julien Rouhaud Date: Wed, 22 Feb 2023 09:19:32 +0800 Subject: [PATCH v1] POC: Preserve the subscription relations during pg_upgrade Previously, only the subscription information was preserved. Without the list of relations and their state it's impossible to re-enable the subscriptions without missing some records as the list of relations can only be refreshed after enabling the subscription (and therefore starting the apply worker). Even if we added a way to refresh the subscription while enabling a publication, we still wouldn't know which relation are new on the publication side, and therefore should be fully synced, and which shouldn't. To fix this problem, this patch teaches pg_dump in binary upgrade mode to emit additional commands to be able to restore the content of pg_subscription_rel. This new ALTER SUBSCRIPTION subcommand, usable only during binary upgrade, has the following syntax: ALTER SUBSCRIPTION name ADD TABLE (relid = XYZ, state = 'x' [, lsn = 'X/Y']) The relation is identified by its oid, as it's preserved during pg_upgrade. The lsn is optional, and defaults to NULL / InvalidXLogRecPtr. Author: Julien Rouhaud Reviewed-by: FIXME Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud --- src/backend/commands/subscriptioncmds.c | 57 +++++++++++++++++ src/backend/parser/gram.y | 11 ++++ src/bin/pg_dump/pg_dump.c | 84 +++++++++++++++++++++++++ src/bin/pg_dump/pg_dump.h | 12 ++++ src/include/nodes/parsenodes.h | 3 +- 5 files changed, 166 insertions(+), 1 deletion(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 464db6d247..7f2560faf8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -66,6 +66,8 @@ #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 #define SUBOPT_ORIGIN 0x00001000 +#define SUBOPT_RELID 0x00002000 +#define SUBOPT_STATE 0x00004000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -90,6 +92,8 @@ typedef struct SubOpts bool disableonerr; char *origin; XLogRecPtr lsn; + Oid relid; + char state; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -324,6 +328,38 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_RELID) && + strcmp(defel->defname, "relid") == 0) + { + Oid relid = defGetObjectId(defel); + + if (IsSet(opts->specified_opts, SUBOPT_RELID)) + errorConflictingDefElem(defel, pstate); + + if (!OidIsValid(relid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid relation identifier used"))); + + opts->specified_opts |= SUBOPT_RELID; + opts->relid = relid; + } + else if (IsSet(supported_opts, SUBOPT_STATE) && + strcmp(defel->defname, "state") == 0) + { + char *state_str = defGetString(defel); + + if (IsSet(opts->specified_opts, SUBOPT_STATE)) + errorConflictingDefElem(defel, pstate); + + if (strlen(state_str) != 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid relation state used"))); + + opts->specified_opts |= SUBOPT_STATE; + opts->state = defGetString(defel)[0]; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1341,6 +1377,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_ADD_TABLE: + { + if (!IsBinaryUpgrade) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR)), + errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not supported")); + + supported_opts = SUBOPT_RELID | SUBOPT_STATE | SUBOPT_LSN; + parse_subscription_options(pstate, stmt->options, + supported_opts, &opts); + + /* relid and state should always be provided. */ + Assert(IsSet(opts.specified_opts, SUBOPT_RELID)); + Assert(IsSet(opts.specified_opts, SUBOPT_STATE)); + + AddSubscriptionRelState(subid, opts.relid, opts.state, + opts.lsn); + + break; + } + default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index a0138382a1..0a3448c487 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -10670,6 +10670,17 @@ AlterSubscriptionStmt: n->options = $5; $$ = (Node *) n; } + /* for binary upgrade only */ + | ALTER SUBSCRIPTION name ADD_P TABLE definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_ADD_TABLE; + n->subname = $3; + n->options = $6; + $$ = (Node *) n; + } ; /***************************************************************************** diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 527c7651ab..61f54ee549 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4470,6 +4470,69 @@ is_superuser(Archive *fout) return false; } +/* + * getSubscriptionRels + * get information about the given subscription's relations + */ +static SubRelInfo * +getSubscriptionRels(Archive *fout, Oid subid, int *nrels) +{ + SubRelInfo *rels; + PQExpBuffer query; + PGresult *res; + int i_srrelid; + int i_srsubstate; + int i_srsublsn; + int i, + ntups; + + if (!fout->dopt->binary_upgrade) + { + *nrels = 0; + + return NULL; + } + + query = createPQExpBuffer(); + + appendPQExpBuffer(query, "SELECT srrelid, srsubstate, srsublsn " + " FROM pg_subscription_rel" + " WHERE srsubid = %u", subid); + + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + *nrels = ntups; + + if (ntups == 0) + { + rels = NULL; + goto cleanup; + } + + /* + * Get subscription relation fields. + */ + i_srrelid = PQfnumber(res, "srrelid"); + i_srsubstate = PQfnumber(res, "srsubstate"); + i_srsublsn = PQfnumber(res, "srsublsn"); + + rels = pg_malloc(ntups * sizeof(SubRelInfo)); + + for (i = 0; i < ntups; i++) + { + rels[i].srrelid = atooid(PQgetvalue(res, i, i_srrelid)); + rels[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0]; + rels[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn)); + } + +cleanup: + PQclear(res); + destroyPQExpBuffer(query); + + return rels; +} + /* * getSubscriptions * get information about subscriptions @@ -4607,6 +4670,10 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); + subinfo[i].subrels = getSubscriptionRels(fout, + subinfo[i].dobj.catId.oid, + &subinfo[i].nrels); + /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); } @@ -4690,6 +4757,22 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBufferStr(query, ");\n"); if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) + { + for (i = 0; i < subinfo->nrels; i++) + { + appendPQExpBuffer(query, "\nALTER SUBSCRIPTION %s ADD TABLE " + "(RELID = %u, STATE = '%c'", + qsubname, + subinfo->subrels[i].srrelid, + subinfo->subrels[i].srsubstate); + + if (subinfo->subrels[i].srsublsn[0] != '\0') + appendPQExpBuffer(query, ", LSN = '%s'", + subinfo->subrels[i].srsublsn); + + appendPQExpBufferStr(query, ");"); + } + ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId, ARCHIVE_OPTS(.tag = subinfo->dobj.name, .owner = subinfo->rolname, @@ -4697,6 +4780,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) .section = SECTION_POST_DATA, .createStmt = query->data, .dropStmt = delq->data)); + } if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT) dumpComment(fout, "SUBSCRIPTION", qsubname, diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index e7cbd8d7ed..03fb0dafe0 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -646,6 +646,16 @@ typedef struct _PublicationSchemaInfo NamespaceInfo *pubschema; } PublicationSchemaInfo; +/* + * The SubRelInfo struct is used to represent subscription relation. + */ +typedef struct _SubRelInfo +{ + Oid srrelid; + char srsubstate; + char *srsublsn; +} SubRelInfo; + /* * The SubscriptionInfo struct is used to represent subscription. */ @@ -662,6 +672,8 @@ typedef struct _SubscriptionInfo char *suborigin; char *subsynccommit; char *subpublications; + int nrels; + SubRelInfo *subrels; } SubscriptionInfo; /* diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index f7d7f10f7d..8f66307287 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3917,7 +3917,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, ALTER_SUBSCRIPTION_ENABLED, - ALTER_SUBSCRIPTION_SKIP + ALTER_SUBSCRIPTION_SKIP, + ALTER_SUBSCRIPTION_ADD_TABLE } AlterSubscriptionType; typedef struct AlterSubscriptionStmt -- 2.37.0