From 8d1953a78051c08cd610d2ca543bf15095e8fc9d Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 23 Jun 2026 20:07:53 +1000 Subject: [PATCH v9 2/2] Preserve replication origin OIDs during pg_upgrade When pg_upgrade migrates a subscriber, replication origin OIDs (roident) can change across the upgrade. This is a problem because commit-timestamp records embed roident and are copied directly from the old cluster's pg_commit_ts directory, causing spurious "update_origin_differs" conflicts after the upgrade. Fix this by dumping replication origins as global objects via pg_dumpall during binary upgrade, using a new function binary_upgrade_create_replication_origin(oid, name, lsn) to recreate each origin with its preserved roident and remote_lsn. To avoid conflicts with this, CreateSubscription() skips replorigin_create() in binary-upgrade mode since the origin is already created by the time the subscription is restored. Author: Ajin Cherian Reviewer: Hayato Kuroda (Fujitsu) Reviewer: Zsolt Parragi Reviewer: Shlok Kyal Reviewer: Shveta malik Reviewer: Vignesh C Reviewer: Nisha Moond Reviewer: Rui Zhao --- doc/src/sgml/logical-replication.sgml | 4 +- src/backend/commands/subscriptioncmds.c | 11 ++- src/backend/replication/logical/origin.c | 98 ++++++++++++------- src/backend/utils/adt/pg_upgrade_support.c | 107 ++++++++++++--------- src/bin/pg_dump/pg_dump.c | 51 ++-------- src/bin/pg_dump/pg_dump.h | 1 - src/bin/pg_dump/pg_dumpall.c | 67 +++++++++++++ src/bin/pg_upgrade/check.c | 57 +++++++---- src/bin/pg_upgrade/info.c | 23 +++++ src/bin/pg_upgrade/pg_upgrade.h | 2 + src/bin/pg_upgrade/t/004_subscription.pl | 58 ++++++++++- src/include/catalog/pg_proc.dat | 8 +- src/include/replication/origin.h | 3 + 13 files changed, 338 insertions(+), 152 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 9e7868487de..b4ccd44a327 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2819,7 +2819,7 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER - Commit timestamps and origin data are not preserved during the upgrade. + Commit timestamps are not preserved during the upgrade. As a result, even if retain_dead_tuples is enabled, the upgraded subscriber may be unable to detect conflicts or @@ -2861,7 +2861,7 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER The new cluster must have max_active_replication_origins configured to a value greater than or equal to the number of - subscriptions present in the old cluster. + replication origins present in the old cluster. The new cluster must contain no replication origins. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5fe7440418c..4191d78aebd 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -19,6 +19,7 @@ #include "access/table.h" #include "access/twophase.h" #include "access/xact.h" +#include "catalog/binary_upgrade.h" #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/indexing.h" @@ -898,9 +899,15 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * apply workers initialization, and to handle origin creation dynamically * when tables are added to the subscription. It is not clear whether * preventing creation of origins is worth additional complexity. + * + * In binary-upgrade mode, skip origin creation here. This is required to + * preserve the roident from the old cluster for this subscription. */ - ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); - replorigin_create(originname); + if (!IsBinaryUpgrade) + { + ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); + replorigin_create(originname); + } /* * Connect to remote side to execute requested commands and fetch table diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index c9dfb094c2b..e3a3e816f24 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -265,6 +265,54 @@ replorigin_by_name(const char *roname, bool missing_ok) return roident; } +/* + * replorigin_create_with_id + * + * Create a replication origin with a specific ID and name, optionally + * restoring its remote_lsn. Used by pg_upgrade to preserve replication + * origin IDs across the upgrade. + * + * Caller must hold an exclusive lock on ReplicationOriginRelationId. + * + * Needs to be called in a transaction. + */ +void +replorigin_create_with_id(ReplOriginId roident, const char *roname, + XLogRecPtr remote_lsn, Relation rel) +{ + Datum roname_d; + bool nulls[Natts_pg_replication_origin]; + Datum values[Natts_pg_replication_origin]; + HeapTuple tuple; + + Assert(IsTransactionState()); + Assert(CheckRelationLockedByMe(rel, ExclusiveLock, false)); + + roname_d = CStringGetTextDatum(roname); + + if (SearchSysCacheExists1(REPLORIGNAME, roname_d)) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("replication origin \"%s\" already exists", roname)); + + memset(&nulls, 0, sizeof(nulls)); + memset(&values, 0, sizeof(values)); + + values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident); + values[Anum_pg_replication_origin_roname - 1] = roname_d; + + tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); + CatalogTupleInsert(rel, tuple); + heap_freetuple(tuple); + CommandCounterIncrement(); + + if (remote_lsn != InvalidXLogRecPtr) + replorigin_advance(roident, remote_lsn, InvalidXLogRecPtr, + false /* backward */, + false /* WAL log */); + +} + /* * Create a replication origin. * @@ -273,13 +321,12 @@ replorigin_by_name(const char *roname, bool missing_ok) ReplOriginId replorigin_create(const char *roname) { - Oid roident; - HeapTuple tuple = NULL; - Relation rel; - Datum roname_d; - SnapshotData SnapshotDirty; - SysScanDesc scan; - ScanKeyData key; + Oid roident; + Relation rel; + SnapshotData SnapshotDirty; + SysScanDesc scan; + ScanKeyData key; + bool found = false; /* * To avoid needing a TOAST table for pg_replication_origin, we limit @@ -293,8 +340,6 @@ replorigin_create(const char *roname) errdetail("Replication origin names must be no longer than %d bytes.", MAX_RONAME_LEN))); - roname_d = CStringGetTextDatum(roname); - Assert(IsTransactionState()); /* @@ -321,17 +366,15 @@ replorigin_create(const char *roname) * snapshot. To make that safe, it needs to not have a TOAST table, since * TOASTed data cannot be fetched without a snapshot. As of this writing, * its only varlena column is roname, which we limit to 512 bytes to avoid - * needing out-of-line storage. If you add a TOAST table to this catalog, - * be sure to set up a snapshot everywhere it might be needed. For more + * needing out-of-line storage. If you add a TOAST table to this catalog, + * be sure to set up a snapshot everywhere it might be needed. For more * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan. */ Assert(!OidIsValid(rel->rd_rel->reltoastrelid)); for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++) { - bool nulls[Natts_pg_replication_origin]; - Datum values[Natts_pg_replication_origin]; - bool collides; + bool collides; CHECK_FOR_INTERRUPTS(); @@ -341,41 +384,28 @@ replorigin_create(const char *roname) ObjectIdGetDatum(roident)); scan = systable_beginscan(rel, ReplicationOriginIdentIndex, - true /* indexOK */ , + true /* indexOK */, &SnapshotDirty, 1, &key); - collides = HeapTupleIsValid(systable_getnext(scan)); - systable_endscan(scan); if (!collides) { - /* - * Ok, found an unused roident, insert the new row and do a CCI, - * so our callers can look it up if they want to. - */ - memset(&nulls, 0, sizeof(nulls)); - - values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident); - values[Anum_pg_replication_origin_roname - 1] = roname_d; - - tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); - CatalogTupleInsert(rel, tuple); - CommandCounterIncrement(); + found = true; break; } } - /* now release lock again, */ - table_close(rel, ExclusiveLock); - - if (tuple == NULL) + if (!found) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("could not find free replication origin ID"))); - heap_freetuple(tuple); + replorigin_create_with_id(roident, roname, InvalidXLogRecPtr, rel); + + table_close(rel, ExclusiveLock); + return roident; } diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 59c3e7f0146..eb3bb081503 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -28,6 +28,7 @@ #include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/syscache.h" #include "utils/pg_lsn.h" @@ -377,71 +378,85 @@ binary_upgrade_add_sub_rel_state(PG_FUNCTION_ARGS) } /* - * binary_upgrade_replorigin_advance + * binary_upgrade_create_conflict_detection_slot * - * Update the remote_lsn for the subscriber's replication origin. + * Create a replication slot to retain information necessary for conflict + * detection such as dead tuples, commit timestamps, and origins. */ Datum -binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS) +binary_upgrade_create_conflict_detection_slot(PG_FUNCTION_ARGS) { - Relation rel; - Oid subid; - char *subname; - char originname[NAMEDATALEN]; - ReplOriginId node; - XLogRecPtr remote_commit; - CHECK_IS_BINARY_UPGRADE; - /* - * We must ensure a non-NULL subscription name before dereferencing the - * arguments. - */ - if (PG_ARGISNULL(0)) - elog(ERROR, "null argument to binary_upgrade_replorigin_advance is not allowed"); - - subname = text_to_cstring(PG_GETARG_TEXT_PP(0)); - remote_commit = PG_ARGISNULL(1) ? InvalidXLogRecPtr : PG_GETARG_LSN(1); - - rel = table_open(SubscriptionRelationId, RowExclusiveLock); - subid = get_subscription_oid(subname, false); - - ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); - - /* Lock to prevent the replication origin from vanishing */ - LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - node = replorigin_by_name(originname, false); - - /* - * The server will be stopped after setting up the objects in the new - * cluster and the origins will be flushed during the shutdown checkpoint. - * This will ensure that the latest LSN values for origin will be - * available after the upgrade. - */ - replorigin_advance(node, remote_commit, InvalidXLogRecPtr, - false /* backward */ , - false /* WAL log */ ); + CreateConflictDetectionSlot(); - UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - table_close(rel, RowExclusiveLock); + ReplicationSlotRelease(); PG_RETURN_VOID(); } /* - * binary_upgrade_create_conflict_detection_slot + * binary_upgrade_create_replication_origin * - * Create a replication slot to retain information necessary for conflict - * detection such as dead tuples, commit timestamps, and origins. + * Create a replication origin with a specific OID and name, optionally + * restoring its remote_lsn. Used by pg_upgrade to preserve replication + * origin OIDs across the upgrade. */ Datum -binary_upgrade_create_conflict_detection_slot(PG_FUNCTION_ARGS) +binary_upgrade_create_replication_origin(PG_FUNCTION_ARGS) { + Oid node_oid; + ReplOriginId node; + Relation rel; + char *originname; + XLogRecPtr remote_lsn = InvalidXLogRecPtr; + CHECK_IS_BINARY_UPGRADE; - CreateConflictDetectionSlot(); + if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) + elog(ERROR, + "null argument to binary_upgrade_create_replication_origin is not allowed"); - ReplicationSlotRelease(); + node_oid = PG_GETARG_OID(0); + + if (node_oid == InvalidOid || node_oid >= DoNotReplicateId) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication origin ID %u is out of range", node_oid)); + + node = (ReplOriginId) node_oid; + + if (SearchSysCacheExists1(REPLORIGIDENT, ObjectIdGetDatum(node))) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("replication origin with ID %u already exists", + (Oid) node)); + + originname = text_to_cstring(PG_GETARG_TEXT_PP(1)); + + /* + * To avoid needing a TOAST table for pg_replication_origin, we limit + * replication origin names to 512 bytes. This should be more than enough + * for all practical use. + */ + if (strlen(originname) > MAX_RONAME_LEN) + ereport(ERROR, + errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("replication origin name is too long"), + errdetail("Replication origin names must be no longer than %d bytes.", + MAX_RONAME_LEN)); + + if (!PG_ARGISNULL(2)) + remote_lsn = PG_GETARG_LSN(2); + + Assert(IsTransactionState()); + + /* Acquire an exclusive lock before inserting the new origin. */ + rel = table_open(ReplicationOriginRelationId, ExclusiveLock); + + replorigin_create_with_id(node, originname, remote_lsn, rel); + + table_close(rel, ExclusiveLock); PG_RETURN_VOID(); } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index ea22b0604c1..e157214b926 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5179,7 +5179,6 @@ getSubscriptions(Archive *fout) int i_subwalrcvtimeout; int i_subpublications; int i_suborigin; - int i_suboriginremotelsn; int i_subenabled; int i_subfailover; int i_subretaindeadtuples; @@ -5248,11 +5247,9 @@ getSubscriptions(Archive *fout) LOGICALREP_ORIGIN_ANY); if (dopt->binary_upgrade && fout->remoteVersion >= 170000) - appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n" - " s.subenabled,\n"); + appendPQExpBufferStr(query, " s.subenabled,\n"); else - appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n" - " false AS subenabled,\n"); + appendPQExpBufferStr(query, " false AS subenabled,\n"); if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, @@ -5332,7 +5329,6 @@ getSubscriptions(Archive *fout) i_subwalrcvtimeout = PQfnumber(res, "subwalrcvtimeout"); i_subpublications = PQfnumber(res, "subpublications"); i_suborigin = PQfnumber(res, "suborigin"); - i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); subinfo = pg_malloc_array(SubscriptionInfo, ntups); @@ -5385,11 +5381,6 @@ getSubscriptions(Archive *fout) subinfo[i].subpublications = pg_strdup(PQgetvalue(res, i, i_subpublications)); subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); - if (PQgetisnull(res, i, i_suboriginremotelsn)) - subinfo[i].suboriginremotelsn = NULL; - else - subinfo[i].suboriginremotelsn = - pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -5668,37 +5659,15 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) * In binary-upgrade mode, we allow the replication to continue after the * upgrade. */ - if (dopt->binary_upgrade && fout->remoteVersion >= 170000) + if (dopt->binary_upgrade && subinfo->subenabled && fout->remoteVersion >= 170000) { - if (subinfo->suboriginremotelsn) - { - /* - * Preserve the remote_lsn for the subscriber's replication - * origin. This value is required to start the replication from - * the position before the upgrade. This value will be stale if - * the publisher gets upgraded before the subscriber node. - * However, this shouldn't be a problem as the upgrade of the - * publisher ensures that all the transactions were replicated - * before upgrading it. - */ - appendPQExpBufferStr(query, - "\n-- For binary upgrade, must preserve the remote_lsn for the subscriber's replication origin.\n"); - appendPQExpBufferStr(query, - "SELECT pg_catalog.binary_upgrade_replorigin_advance("); - appendStringLiteralAH(query, subinfo->dobj.name, fout); - appendPQExpBuffer(query, ", '%s');\n", subinfo->suboriginremotelsn); - } - - if (subinfo->subenabled) - { - /* - * Enable the subscription to allow the replication to continue - * after the upgrade. - */ - appendPQExpBufferStr(query, - "\n-- For binary upgrade, must preserve the subscriber's running state.\n"); - appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s ENABLE;\n", qsubname); - } + /* + * Enable the subscription to allow the replication to continue + * after the upgrade. + */ + appendPQExpBufferStr(query, + "\n-- For binary upgrade, must preserve the subscriber's running state.\n"); + appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s ENABLE;\n", qsubname); } if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 5a6726d8b12..8f1252b854e 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -729,7 +729,6 @@ typedef struct _SubscriptionInfo char *subwalrcvtimeout; char *subpublications; char *suborigin; - char *suboriginremotelsn; } SubscriptionInfo; /* diff --git a/src/bin/pg_dump/pg_dumpall.c b/src/bin/pg_dump/pg_dumpall.c index b9653f0aefe..711fd66b3f8 100644 --- a/src/bin/pg_dump/pg_dumpall.c +++ b/src/bin/pg_dump/pg_dumpall.c @@ -18,6 +18,7 @@ #include #include +#include "access/xlogdefs.h" #include "catalog/pg_authid_d.h" #include "common/connect.h" #include "common/file_perm.h" @@ -67,6 +68,7 @@ static void dropDBs(PGconn *conn); static void dumpUserConfig(PGconn *conn, const char *username); static void dumpDatabases(PGconn *conn); static void dumpTimestamp(const char *msg); +static void dumpReplicationOrigins(PGconn *conn); static int runPgDump(const char *dbname, const char *create_opts); static void buildShSecLabels(PGconn *conn, const char *catalog_name, Oid objectId, @@ -663,6 +665,13 @@ main(int argc, char *argv[]) /* Dump role GUC privileges */ if (server_version >= 150000 && !skip_acls) dumpRoleGUCPrivs(conn); + + } + + if (!tablespaces_only && !roles_only && binary_upgrade) + { + /* Dump replication origins */ + dumpReplicationOrigins(conn); } /* Dump tablespaces */ @@ -1832,6 +1841,64 @@ dumpTimestamp(const char *msg) fprintf(OPF, "-- %s %s\n\n", msg, buf); } +static void +dumpReplicationOrigins(PGconn *conn) +{ + PQExpBuffer buf = createPQExpBuffer(); + PGresult *res; + int i_roident; + int i_roname; + int i_remotelsn; + + /* Get replication origins from catalogs */ + appendPQExpBufferStr(buf, + "SELECT o.*, os.remote_lsn " + "FROM pg_catalog.pg_replication_origin o " + "LEFT OUTER JOIN pg_catalog.pg_replication_origin_status os ON o.roident = os.local_id "); + + res = executeQuery(conn, buf->data); + + i_roident = PQfnumber(res, "roident"); + i_roname = PQfnumber(res, "roname"); + i_remotelsn = PQfnumber(res, "remote_lsn"); + + if (PQntuples(res) > 0) + fprintf(OPF, "--\n-- Replication Origins \n--\n\n"); + + for (int i = 0; i < PQntuples(res); i++) + { + ReplOriginId roident; + const char *roname; + + roident = atooid(PQgetvalue(res, i, i_roident)); + roname = PQgetvalue(res, i, i_roname); + + resetPQExpBuffer(buf); + + appendPQExpBufferStr(buf, "\n-- For binary upgrade, must preserve replication origin roident and remote_lsn\n"); + appendPQExpBuffer(buf, + "SELECT pg_catalog.binary_upgrade_create_replication_origin(" + "'%u'::pg_catalog.oid, ", roident); + appendStringLiteralConn(buf, roname, conn); + appendPQExpBufferStr(buf, "::pg_catalog.text"); + + if (!PQgetisnull(res, i, i_remotelsn)) + { + appendPQExpBufferStr(buf, ", "); + appendStringLiteralConn(buf, PQgetvalue(res, i, i_remotelsn), conn); + appendPQExpBufferStr(buf, "::pg_catalog.pg_lsn"); + } + else + appendPQExpBufferStr(buf, ", NULL"); + + appendPQExpBufferStr(buf, ");\n"); + fprintf(OPF, "%s", buf->data); + } + + PQclear(res); + destroyPQExpBuffer(buf); +} + /* * read_dumpall_filters - retrieve database identifier patterns from file * diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index f5c93e611d2..613f64e1e15 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -33,7 +33,7 @@ static void check_for_new_tablespace_dir(void); static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster); static void check_for_unicode_update(ClusterInfo *cluster); static void check_new_cluster_replication_slots(void); -static void check_new_cluster_subscription_configuration(void); +static void check_new_cluster_replication_origins(void); static void check_old_cluster_for_valid_slots(void); static void check_old_cluster_subscription_state(void); static void check_old_cluster_global_names(ClusterInfo *cluster); @@ -651,6 +651,9 @@ check_and_dump_old_cluster(void) check_old_cluster_subscription_state(); } + /* Get replication origin information */ + get_replication_origin_info(&old_cluster); + check_for_data_types_usage(&old_cluster); /* @@ -797,7 +800,8 @@ check_new_cluster(void) check_new_cluster_replication_slots(); - check_new_cluster_subscription_configuration(); + check_new_cluster_replication_origins(); + } @@ -2303,31 +2307,45 @@ check_new_cluster_replication_slots(void) } /* - * check_new_cluster_subscription_configuration() + * check_new_cluster_replication_origins() + * + * Verify that the new cluster has no replication origins. During upgrade, + * pg_upgrade restores replication origins from the old cluster with their + * original OIDs. If the new cluster already contains origins, those OIDs + * may collide, causing the upgrade to fail mid-way. * - * Verify that the max_active_replication_origins configuration specified is - * enough for creating the subscriptions. This is required to create the - * replication origin for each subscription. + * Also verify that the max_active_replication_origins configuration is + * enough for creating all the replication origins. */ static void -check_new_cluster_subscription_configuration(void) +check_new_cluster_replication_origins(void) { - PGresult *res; PGconn *conn; + PGresult *res; + int norigins; int max_active_replication_origins; - /* Subscriptions and their dependencies can be migrated since PG17. */ - if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) + /* Quick return if there are no replication origins to migrate. */ + if (old_cluster.nrepl_origins == 0) return; - /* Quick return if there are no subscriptions to be migrated. */ - if (old_cluster.nsubs == 0) - return; - - prep_status("Checking new cluster configuration for subscriptions"); + prep_status("Checking replication origins in new cluster"); conn = connectToServer(&new_cluster, "template1"); + res = executeQueryOrDie(conn, + "SELECT count(*) " + "FROM pg_catalog.pg_replication_origin"); + + if (PQntuples(res) != 1) + pg_fatal("could not count the number of replication origins"); + + norigins = atoi(PQgetvalue(res, 0, 0)); + PQclear(res); + + if (norigins > 0) + pg_fatal("expected 0 replication origins but found %d", norigins); + res = executeQueryOrDie(conn, "SELECT setting FROM pg_settings " "WHERE name = 'max_active_replication_origins';"); @@ -2335,12 +2353,13 @@ check_new_cluster_subscription_configuration(void) pg_fatal("could not determine parameter settings on new cluster"); max_active_replication_origins = atoi(PQgetvalue(res, 0, 0)); - if (old_cluster.nsubs > max_active_replication_origins) + PQclear(res); + + if (old_cluster.nrepl_origins > max_active_replication_origins) pg_fatal("\"max_active_replication_origins\" (%d) must be greater than or equal to the number of " - "subscriptions (%d) on the old cluster", - max_active_replication_origins, old_cluster.nsubs); + "replication origins (%d) on the old cluster", + max_active_replication_origins, old_cluster.nrepl_origins); - PQclear(res); PQfinish(conn); check_ok(); diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 37fff93892f..36d8570a9ef 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -832,6 +832,29 @@ count_old_cluster_logical_slots(void) return slot_count; } +/* + * get_replication_origin_info() + * + * Gets the information of replication origins in the cluster. + */ +void +get_replication_origin_info(ClusterInfo *cluster) +{ + PGconn *conn; + PGresult *res; + int i_norigins; + + conn = connectToServer(cluster, "template1"); + res = executeQueryOrDie(conn, "SELECT count(*) AS norigins " + "FROM pg_catalog.pg_replication_origin"); + i_norigins = PQfnumber(res, "norigins"); + + cluster->nrepl_origins = atoi(PQgetvalue(res, 0, i_norigins)); + PQclear(res); + + PQfinish(conn); +} + /* * get_subscription_info() * diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index ccd1ac0d013..1dec4cffbfb 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -311,6 +311,7 @@ typedef struct int num_tablespaces; const char *tablespace_suffix; /* directory specification */ int nsubs; /* number of subscriptions */ + int nrepl_origins; /* number of replication origins */ bool sub_retain_dead_tuples; /* whether a subscription enables * retain_dead_tuples. */ } ClusterInfo; @@ -451,6 +452,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db, void get_db_rel_and_slot_infos(ClusterInfo *cluster); int count_old_cluster_logical_slots(void); void get_subscription_info(ClusterInfo *cluster); +void get_replication_origin_info(ClusterInfo *cluster); /* option.c */ diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl index 646767f2a65..2e07277fea9 100644 --- a/src/bin/pg_upgrade/t/004_subscription.pl +++ b/src/bin/pg_upgrade/t/004_subscription.pl @@ -42,7 +42,7 @@ my $connstr = $publisher->connstr . ' dbname=postgres'; # ------------------------------------------------------ # Check that pg_upgrade fails when max_active_replication_origins configured -# in the new cluster is less than the number of subscriptions in the old +# in the new cluster is less than the number of replication origins in the old # cluster. # ------------------------------------------------------ # It is sufficient to use disabled subscription to test upgrade failure. @@ -74,7 +74,7 @@ command_checks_all( ], 1, [ - qr/"max_active_replication_origins" \(0\) must be greater than or equal to the number of subscriptions \(1\) on the old cluster/ + qr/"max_active_replication_origins" \(0\) must be greater than or equal to the number of replication origins \(1\) on the old cluster/ ], [qr//], 'run of pg_upgrade where the new cluster has insufficient max_active_replication_origins' @@ -301,8 +301,38 @@ is($result, qq(t), "Check that the table is in init state"); # Get the replication origin's remote_lsn of the old subscriber my $remote_lsn = $old_sub->safe_psql('postgres', - "SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub4'" + "SELECT os.remote_lsn + FROM pg_replication_origin_status os + JOIN pg_replication_origin o ON o.roident = os.local_id + JOIN pg_subscription s ON o.roname = 'pg_' || s.oid::text + WHERE s.subname = 'regress_sub4'" ); + +# Get the replication origin ids (roident) for all subscriptions, keyed by +# subscription name (which is stable across upgrade, unlike suboid). These +# must be preserved after upgrade. A mismatch would cause spurious +# update_origin_differs conflicts. +my %pre_upgrade_roident; +my $roident_rows = $old_sub->safe_psql('postgres', + "SELECT s.subname, o.roident + FROM pg_subscription s + JOIN pg_replication_origin o ON o.roname = 'pg_' || s.oid::text + ORDER BY s.subname" +); +for my $row (split /\n/, $roident_rows) +{ + my ($subname, $roident) = split /\|/, $row; + $pre_upgrade_roident{$subname} = $roident; +} + +# Create a user created replication origin, which should also be preserved after upgrade. +my $user_origin_name = 'regress_user_origin'; +$old_sub->safe_psql('postgres', + "SELECT pg_replication_origin_create('$user_origin_name')"); +$pre_upgrade_roident{$user_origin_name} = $old_sub->safe_psql('postgres', + "SELECT roident FROM pg_replication_origin WHERE roname = '$user_origin_name'" +); + # Have the subscription in disabled state before upgrade $old_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub5 DISABLE"); @@ -378,6 +408,28 @@ regress_sub5|f|f|f), "check that the subscription's running status, failover, and retain_dead_tuples are preserved" ); +# Verify that the subscription related replication origins are preserved after upgrade. +my $post_roident_rows = $new_sub->safe_psql('postgres', + "SELECT s.subname, o.roident + FROM pg_subscription s + JOIN pg_replication_origin o ON o.roname = 'pg_' || s.oid::text + ORDER BY s.subname" +); +for my $row (split /\n/, $post_roident_rows) +{ + my ($subname, $roident) = split /\|/, $row; + is($roident, $pre_upgrade_roident{$subname}, + "roident preserved for subscription '$subname' after upgrade"); +} + +# Verify that user created replication origins are preserved after upgrade. +my $post_user_roident = $new_sub->safe_psql('postgres', + "SELECT roident FROM pg_replication_origin WHERE roname = '$user_origin_name'" +); +is($post_user_roident, $pre_upgrade_roident{$user_origin_name}, + "roident preserved for user-created origin '$user_origin_name' after upgrade" +); + # Subscription relations should be preserved $result = $new_sub->safe_psql('postgres', "SELECT srrelid, srsubstate FROM pg_subscription_rel ORDER BY srrelid"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 3a28406981d..21466d926f1 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11960,10 +11960,6 @@ provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'text oid char pg_lsn', prosrc => 'binary_upgrade_add_sub_rel_state' }, -{ oid => '6320', descr => 'for use by pg_upgrade (remote_lsn for origin)', - proname => 'binary_upgrade_replorigin_advance', proisstrict => 'f', - provolatile => 'v', proparallel => 'u', prorettype => 'void', - proargtypes => 'text pg_lsn', prosrc => 'binary_upgrade_replorigin_advance' }, { oid => '6505', descr => 'for use by pg_upgrade (conflict detection slot)', proname => 'binary_upgrade_create_conflict_detection_slot', proisstrict => 'f', provolatile => 'v', proparallel => 'u', @@ -11973,6 +11969,10 @@ proname => 'binary_upgrade_set_next_pg_subscription_oid', provolatile => 'v', proparallel => 'r', prorettype => 'void', proargtypes => 'oid', prosrc => 'binary_upgrade_set_next_pg_subscription_oid' }, +{ oid => '9161', descr => 'for use by pg_upgrade (replication origin)', + proname => 'binary_upgrade_create_replication_origin', proisstrict => 'f', + provolatile => 'v', proparallel => 'u', prorettype => 'void', + proargtypes => 'oid text pg_lsn', prosrc => 'binary_upgrade_create_replication_origin' }, # conversion functions { oid => '4310', descr => 'internal conversion function for KOI8R to WIN1251', diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index a69faf6eaaf..11ee630fb28 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -14,6 +14,7 @@ #include "access/xlogdefs.h" #include "access/xlogreader.h" #include "catalog/pg_replication_origin.h" +#include "utils/relcache.h" typedef struct xl_replorigin_set { @@ -55,6 +56,8 @@ extern PGDLLIMPORT int max_active_replication_origins; /* API for querying & manipulating replication origins */ extern ReplOriginId replorigin_by_name(const char *roname, bool missing_ok); extern ReplOriginId replorigin_create(const char *roname); +extern void replorigin_create_with_id(ReplOriginId roident, const char *roname, + XLogRecPtr remote_lsn, Relation rel); extern void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait); extern bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname); -- 2.47.3