From 3ea2e0027bdeab5d6877ac7c18c28e12c7406b76 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Thu, 19 Jan 2017 00:59:01 +0100 Subject: [PATCH 6/6] Add RENAME support for PUBLICATIONs and SUBSCRIPTIONs --- src/backend/commands/alter.c | 6 ++++ src/backend/commands/publicationcmds.c | 50 ++++++++++++++++++++++++++++++ src/backend/commands/subscriptioncmds.c | 50 ++++++++++++++++++++++++++++++ src/backend/parser/gram.y | 18 +++++++++++ src/backend/replication/logical/worker.c | 16 +++++++++- src/include/commands/publicationcmds.h | 2 ++ src/include/commands/subscriptioncmds.h | 2 ++ src/test/regress/expected/publication.out | 10 +++++- src/test/regress/expected/subscription.out | 10 +++++- src/test/regress/sql/publication.sql | 6 +++- src/test/regress/sql/subscription.sql | 6 +++- src/test/subscription/t/001_rep_changes.pl | 11 ++++++- 12 files changed, 181 insertions(+), 6 deletions(-) diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 768fcc8..1a4154c 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -351,6 +351,12 @@ ExecRenameStmt(RenameStmt *stmt) case OBJECT_TYPE: return RenameType(stmt); + case OBJECT_PUBLICATION: + return RenamePublication(stmt); + + case OBJECT_SUBSCRIPTION: + return RenameSubscription(stmt); + case OBJECT_AGGREGATE: case OBJECT_COLLATION: case OBJECT_CONVERSION: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 21e523d..8bc4e02 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -752,3 +752,53 @@ AlterPublicationOwner_oid(Oid subid, Oid newOwnerId) heap_close(rel, RowExclusiveLock); } + +/* + * Rename the publication. + */ +ObjectAddress +RenamePublication(RenameStmt *stmt) +{ + Oid pubid; + HeapTuple tup; + Relation rel; + ObjectAddress address; + + rel = heap_open(PublicationRelationId, RowExclusiveLock); + + tup = SearchSysCacheCopy1(PUBLICATIONNAME, + CStringGetDatum(stmt->subname)); + + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication \"%s\" does not exist", stmt->subname))); + + /* make sure the new name doesn't exist */ + if (OidIsValid(get_publication_oid(stmt->newname, true))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_SCHEMA), + errmsg("publication \"%s\" already exists", stmt->newname))); + + pubid = HeapTupleGetOid(tup); + + /* Must be owner. */ + if (!pg_publication_ownercheck(pubid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_PUBLICATION, + stmt->subname); + + /* rename */ + namestrcpy(&(((Form_pg_publication) GETSTRUCT(tup))->pubname), + stmt->newname); + simple_heap_update(rel, &tup->t_self, tup); + CatalogUpdateIndexes(rel, tup); + + InvokeObjectPostAlterHook(PublicationRelationId, pubid, 0); + + ObjectAddressSet(address, PublicationRelationId, pubid); + + heap_close(rel, NoLock); + heap_freetuple(tup); + + return address; +} diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1448ee3..56b254e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -641,3 +641,53 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) heap_close(rel, RowExclusiveLock); } + +/* + * Rename the subscription. + */ +ObjectAddress +RenameSubscription(RenameStmt *stmt) +{ + Oid subid; + HeapTuple tup; + Relation rel; + ObjectAddress address; + + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); + + tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId, + CStringGetDatum(stmt->subname)); + + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("subscription \"%s\" does not exist", stmt->subname))); + + /* make sure the new name doesn't exist */ + if (OidIsValid(get_subscription_oid(stmt->newname, true))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_SCHEMA), + errmsg("subscription \"%s\" already exists", stmt->newname))); + + subid = HeapTupleGetOid(tup); + + /* Must be owner. */ + if (!pg_subscription_ownercheck(subid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION, + stmt->subname); + + /* rename */ + namestrcpy(&(((Form_pg_subscription) GETSTRUCT(tup))->subname), + stmt->newname); + simple_heap_update(rel, &tup->t_self, tup); + CatalogUpdateIndexes(rel, tup); + + InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); + + ObjectAddressSet(address, SubscriptionRelationId, subid); + + heap_close(rel, NoLock); + heap_freetuple(tup); + + return address; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index a8e35fe..712dfdd 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -8475,6 +8475,24 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name n->missing_ok = false; $$ = (Node *)n; } + | ALTER PUBLICATION name RENAME TO name + { + RenameStmt *n = makeNode(RenameStmt); + n->renameType = OBJECT_PUBLICATION; + n->subname = $3; + n->newname = $6; + n->missing_ok = false; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name RENAME TO name + { + RenameStmt *n = makeNode(RenameStmt); + n->renameType = OBJECT_SUBSCRIPTION; + n->subname = $3; + n->newname = $6; + n->missing_ok = false; + $$ = (Node *)n; + } ; opt_column: COLUMN { $$ = COLUMN; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a9a1277..f0b9724 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1252,6 +1252,21 @@ reread_subscription(void) } /* + * Exit if subscription name was changed (it's used for + * fallback_application_name). The launcher will start new worker. + */ + if (strcmp(newsub->name, MySubscription->name) != 0) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because subscription was renamed", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* * Exit if the subscription was removed. * This normally should not happen as the worker gets killed * during DROP SUBSCRIPTION. @@ -1285,7 +1300,6 @@ reread_subscription(void) /* Check for other changes that should never happen too. */ if (newsub->dbid != MySubscription->dbid || - strcmp(newsub->name, MySubscription->name) != 0 || strcmp(newsub->slotname, MySubscription->slotname) != 0) { elog(ERROR, "subscription %u changed unexpectedly", diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 2307cea..4999735 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -25,4 +25,6 @@ extern void RemovePublicationRelById(Oid proid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); +extern ObjectAddress RenamePublication(RenameStmt *stmt); + #endif /* PUBLICATIONCMDS_H */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 1d8e2aa..0333d89 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -24,4 +24,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt); extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId); extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); +extern ObjectAddress RenameSubscription(RenameStmt *stmt); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 47b04f1..e7ccc0c 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -148,7 +148,15 @@ DROP TABLE testpub_tbl1; t | t | t (1 row) -DROP PUBLICATION testpub_default; +ALTER PUBLICATION testpub_default RENAME TO testpub_foo; +\dRp testpub_foo + List of publications + Name | Owner | Inserts | Updates | Deletes +-------------+--------------------------+---------+---------+--------- + testpub_foo | regress_publication_user | t | t | t +(1 row) + +DROP PUBLICATION testpub_foo; DROP PUBLICATION testpib_ins_trunct; DROP SCHEMA pub_test CASCADE; NOTICE: drop cascades to table pub_test.testpub_nopk diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 2ccec98..cb1ab4e 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -61,6 +61,14 @@ ALTER SUBSCRIPTION testsub DISABLE; (1 row) COMMIT; -DROP SUBSCRIPTION testsub NODROP SLOT; +ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; +\dRs + List of subscriptions + Name | Owner | Enabled | Publication +-------------+---------------------------+---------+-------------------- + testsub_foo | regress_subscription_user | f | {testpub,testpub1} +(1 row) + +DROP SUBSCRIPTION testsub_foo NODROP SLOT; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 89a3167..7e15f05 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -73,7 +73,11 @@ DROP TABLE testpub_tbl1; \dRp+ testpub_default -DROP PUBLICATION testpub_default; +ALTER PUBLICATION testpub_default RENAME TO testpub_foo; + +\dRp testpub_foo + +DROP PUBLICATION testpub_foo; DROP PUBLICATION testpib_ins_trunct; DROP SCHEMA pub_test CASCADE; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 68c17d5..fce6069 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -38,7 +38,11 @@ ALTER SUBSCRIPTION testsub DISABLE; COMMIT; -DROP SUBSCRIPTION testsub NODROP SLOT; +ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; + +\dRs + +DROP SUBSCRIPTION testsub_foo NODROP SLOT; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index b51740b..a9c4b01 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -169,8 +169,17 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); is($result, qq(11|0|100), 'check replicated insert after alter publication'); +# check restart on rename +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';") + or die "Timed out while waiting for apply to restart"; + # check all the cleanup -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); -- 2.7.4