diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 73b068dd31c..267b898dca1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -113,18 +113,18 @@ typedef struct PublicationRelKind } PublicationRelKind; static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications); -static void check_publications_origin_sequences(WalReceiverConn *wrconn, - List *publications, - char *origin, - Oid *subrel_local_oids, - int subrel_count, - char *subname); static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname); +static void check_publications_origin_sequences(WalReceiverConn* wrconn, + List *publications, + char *origin, + Oid *subrel_local_oids, + int subrel_count, + char *subname); static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); @@ -979,9 +979,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, else subrel_local_oids[tbl_count++] = relstate->relid; } - qsort(subrel_local_oids, tbl_count, - sizeof(Oid), oid_cmp); + qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp); check_publications_origin_tables(wrconn, sub->publications, copy_data, sub->retaindeadtuples, sub->origin, subrel_local_oids, tbl_count, @@ -2461,106 +2460,6 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) table_close(rel, RowExclusiveLock); } -static void -check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, - char *origin, Oid *subrel_local_oids, - int subrel_count, char *subname) -{ - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - int i; - Oid tableRow[1] = {TEXTOID}; - List *publist = NIL; - - /* Enable sequence synchronization checks only when origin is 'none' */ - if (!origin || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0) - return; - - initStringInfo(&cmd); - appendStringInfoString(&cmd, - "SELECT DISTINCT P.pubname AS pubname\n" - "FROM pg_publication P,\n" - " LATERAL pg_get_publication_sequences(P.pubname) GPR\n" - " JOIN pg_subscription_rel PS ON (GPR.relid = PS.srrelid),\n" - " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" - "WHERE C.oid = GPR.relid " - " AND P.pubname IN ("); - - GetPublicationsStr(publications, &cmd, true); - appendStringInfoString(&cmd, ")\n"); - - /* - * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION, - * subrel_local_oids contains the list of relations that are already - * present on the subscriber. This check should be skipped as these will - * not be re-synced. - */ - for (i = 0; i < subrel_count; i++) - { - Oid relid = subrel_local_oids[i]; - char *schemaname = get_namespace_name(get_rel_namespace(relid)); - char *tablename = get_rel_name(relid); - - appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", - schemaname, tablename); - } - - res = walrcv_exec(wrconn, cmd.data, 1, tableRow); - pfree(cmd.data); - - if (res->status != WALRCV_OK_TUPLES) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not receive list of replicated relations from the publisher: %s", - res->err))); - - /* Process relations. */ - slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) - { - char *pubname; - bool isnull; - - pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); - Assert(!isnull); - - ExecClearTuple(slot); - publist = list_append_unique(publist, makeString(pubname)); - } - - /* - * Log a warning if the publisher has subscribed to the same sequence from - * some other publisher. We cannot know the origin of sequences data during - * the initial sync. - */ - if (publist) - { - StringInfo pubnames = makeStringInfo(); - StringInfo err_msg = makeStringInfo(); - StringInfo err_hint = makeStringInfo(); - - /* Prepare the list of publication(s) for warning message. */ - GetPublicationsStr(publist, pubnames, false); - - appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"), - subname); - appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins.")); - - ereport(WARNING, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg_internal("%s", err_msg->data), - errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.", - "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.", - list_length(publist), pubnames->data), - errhint_internal("%s", err_hint->data)); - } - - ExecDropSingleTupleTableSlot(slot); - - walrcv_clear_result(res); -} - /* * Check and log a warning if the publisher has subscribed to the same table, * its partition ancestors (if it's a partition), or its partition children (if @@ -2726,6 +2625,116 @@ check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, walrcv_clear_result(res); } +/* + * This function is similar to check_publications_origin_tables and serves + * same purpose for sequences. + * + * In addition to the checks where check_publications_origin_tables is used, + * this function is also used for ALTER SUBSCRIPTION ... REFRESH SEQUENCES. + */ +static void +check_publications_origin_sequences(WalReceiverConn* wrconn, List* publications, + char *origin, Oid *subrel_local_oids, + int subrel_count, char *subname) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + int i; + Oid tableRow[1] = { TEXTOID }; + List *publist = NIL; + + /* + * Enable sequence synchronization checks only when origin is 'none', to + * ensure that sequence data from other origins is not inadvertently + * copied. + */ + if (!origin || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0) + return; + + initStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT DISTINCT P.pubname AS pubname\n" + "FROM pg_publication P,\n" + " LATERAL pg_get_publication_sequences(P.pubname) GPR\n" + " JOIN pg_subscription_rel PS ON (GPR.relid = PS.srrelid),\n" + " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPR.relid AND P.pubname IN ("); + + GetPublicationsStr(publications, &cmd, true); + appendStringInfoString(&cmd, ")\n"); + + /* + * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION, + * subrel_local_oids contains the list of relations that are already + * present on the subscriber. This check should be skipped as these will + * not be re-synced. + */ + for (i = 0; i < subrel_count; i++) + { + Oid relid = subrel_local_oids[i]; + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *seqname = get_rel_name(relid); + + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, seqname); + } + + res = walrcv_exec(wrconn, cmd.data, 1, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated relations from the publisher: %s", + res->err))); + + /* Process relations. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char* pubname; + bool isnull; + + pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + publist = list_append_unique(publist, makeString(pubname)); + } + + /* + * Log a warning if the publisher has subscribed to the same sequence from + * some other publisher. We cannot know the origin of sequences data during + * the initial sync. + */ + if (publist) + { + StringInfo pubnames = makeStringInfo(); + StringInfo err_msg = makeStringInfo(); + StringInfo err_hint = makeStringInfo(); + + /* Prepare the list of publication(s) for warning message. */ + GetPublicationsStr(publist, pubnames, false); + + appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"), + subname); + appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins.")); + + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg_internal("%s", err_msg->data), + errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.", + "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.", + list_length(publist), pubnames->data), + errhint_internal("%s", err_hint->data)); + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + /* * Determine whether the retain_dead_tuples can be enabled based on the * publisher's status.