From 9716e097747ca9797bada35de5ace3c078857115 Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Wed, 22 Sep 2021 19:08:07 +0800 Subject: [PATCH] schema refactor --- src/backend/commands/publicationcmds.c | 276 +++++++++++++-------------------- 1 file changed, 110 insertions(+), 166 deletions(-) diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 68be61a..e6af432 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -513,20 +513,79 @@ InvalidatePublicationRels(List *relids) * Add or remove table to/from publication. */ static void -AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *rels, - List *delrels) +AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, + List *tables, List *schemaidlist) { + List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); Oid pubid = pubform->oid; - Assert(list_length(rels) > 0); + if (!tables && stmt->action != DEFELEM_SET) + return; + + /* Check that user is allowed to manipulate the publication tables. */ + if (tables && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + + rels = OpenTableList(tables); if (stmt->action == DEFELEM_ADD) + { + List *schemas = NIL; + + /* + * Check if the relation is member of the existing schema in the + * publication or member of the schema list specified. + */ + schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid)); + CheckObjSchemaNotAlreadyInPublication(rels, schemas, + PUBLICATIONOBJ_TABLE); PublicationAddTables(pubid, rels, false, stmt); + } else if (stmt->action == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { + List *oldrelids = GetPublicationRelations(pubid, + PUBLICATION_PART_ROOT); + List *delrels = NIL; + ListCell *oldlc; + + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_TABLE); + + /* Calculate which relations to drop. */ + foreach(oldlc, oldrelids) + { + Oid oldrelid = lfirst_oid(oldlc); + ListCell *newlc; + bool found = false; + + foreach(newlc, rels) + { + Relation newrel = (Relation) lfirst(newlc); + + if (RelationGetRelid(newrel) == oldrelid) + { + found = true; + break; + } + } + /* Not yet in the list, open it and add to the list */ + if (!found) + { + Relation oldrel = table_open(oldrelid, + ShareUpdateExclusiveLock); + + delrels = lappend(delrels, oldrel); + } + } + + /* And drop them. */ PublicationDropTables(pubid, delrels, true); /* @@ -534,64 +593,72 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *rels, * skip existing ones when doing catalog update. */ PublicationAddTables(pubid, rels, true, stmt); + + CloseTableList(delrels); } + + CloseTableList(rels); } /* - * Get the relations that should be deleted for the publication. + * Alter the publication schemas. + * + * Add/Remove/Set all tables from schemas to/from publication. */ static void -GetAlterPublicationDelRelations(DefElemAction action, Oid pubid, List *rels, - List **delrels) +AlterPublicationSchemas(AlterPublicationStmt *stmt, + HeapTuple tup, List *schemaidlist) { - List *oldrelids; - ListCell *oldlc; + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); - if (action != DEFELEM_SET) + if (!schemaidlist && stmt->action != DEFELEM_SET) return; - oldrelids = GetPublicationRelations(pubid, PUBLICATION_PART_ROOT); + /* Check that user is allowed to manipulate the publication tables */ + if (schemaidlist && pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); + + if (schemaidlist && (stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) && + !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to add or set schemas"))); - /* Calculate which relations to drop. */ - foreach(oldlc, oldrelids) + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. No need to unlock the schemas, the locks + * will be released automatically at the end of alter publication command. + */ + LockSchemaList(schemaidlist); + if (stmt->action == DEFELEM_ADD) { - Oid oldrelid = lfirst_oid(oldlc); - ListCell *newlc; - bool found = false; + List *rels; + List *reloids; - foreach(newlc, rels) - { - Relation newrel = (Relation) lfirst(newlc); + reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT); + rels = OpenReliIdList(reloids); - if (RelationGetRelid(newrel) == oldrelid) - { - found = true; - break; - } - } - /* Not yet in the list, open it and add to the list */ - if (!found) - *delrels = lappend_oid(*delrels, oldrelid); - } -} - -/* - * Alter the publication schemas. - * - * Add/Remove/Set all tables from schemas to/from publication. - */ -static void -AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, - List *schemaidlist, List *delschemas) -{ - Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, + PUBLICATIONOBJ_REL_IN_SCHEMA); - if (stmt->action == DEFELEM_ADD) + CloseTableList(rels); PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt); + } else if (stmt->action == DEFELEM_DROP) PublicationDropSchemas(pubform->oid, schemaidlist, false); else /* DEFELEM_SET */ { + List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *delschemas = NIL; + + /* Identify which schemas should be dropped */ + delschemas = list_difference_oid(oldschemaids, schemaidlist); + LockSchemaList(delschemas); + /* And drop them */ PublicationDropSchemas(pubform->oid, delschemas, true); @@ -606,74 +673,6 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, } /* - * Check if relations can be in given publication and throws appropriate - * error if not. - */ -static void -CheckPublicationAlterTables(DefElemAction action, HeapTuple tup, - List *rels, List *schemaidlist) -{ - Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); - - /* Check that user is allowed to manipulate the publication tables. */ - if (pubform->puballtables) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR ALL TABLES", - NameStr(pubform->pubname)), - errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); - - if (action == DEFELEM_ADD) - { - /* - * Check if the relation is member of the existing schema in the - * publication or member of the schema list specified. - */ - List *schemas = list_concat(schemaidlist, - GetPublicationSchemas(pubform->oid)); - - CheckObjSchemaNotAlreadyInPublication(rels, schemas, - PUBLICATIONOBJ_TABLE); - } - else if (action == DEFELEM_SET && list_length(schemaidlist)) - { - /* check if relation is member of the schema list specified */ - CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist, - PUBLICATIONOBJ_TABLE); - } -} - -/* - * Check if schemas can be in given publication and throws appropriate - * error if not. - */ -static void -CheckPublicationAlterSchemas(DefElemAction action, HeapTuple tup, - List *schemaidlist, List *relations) -{ - Form_pg_publication pubform; - - pubform = (Form_pg_publication) GETSTRUCT(tup); - - /* Check that user is allowed to manipulate the publication tables */ - if (pubform->puballtables) - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR ALL TABLES", - NameStr(pubform->pubname)), - errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications.")); - - if ((action == DEFELEM_ADD || action == DEFELEM_SET) && !superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("must be superuser to add or set schemas"))); - - if (action == DEFELEM_ADD) - CheckObjSchemaNotAlreadyInPublication(relations, schemaidlist, - PUBLICATIONOBJ_REL_IN_SCHEMA); -} - -/* * Alter the existing publication. * * This is dispatcher function for AlterPublicationOptions, @@ -718,63 +717,8 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) LockDatabaseObject(PublicationRelationId, pubform->oid, 0, AccessExclusiveLock); - if (list_length(relations)) - { - List *rels; - List *delschemas = NIL; - List *delrelids = NIL; - List *delrels = NIL; - - if (stmt->action == DEFELEM_SET && !list_length(schemaidlist)) - { - delschemas = GetPublicationSchemas(pubform->oid); - LockSchemaList(delschemas); - } - - rels = OpenTableList(relations); - GetAlterPublicationDelRelations(stmt->action, pubform->oid, rels, - &delrelids); - - CheckPublicationAlterTables(stmt->action, tup, rels, schemaidlist); - delrels = OpenReliIdList(delrelids); - - /* remove the existing schemas from the publication */ - PublicationDropSchemas(pubform->oid, delschemas, false); - - AlterPublicationTables(stmt, tup, rels, delrels); - CloseTableList(delrels); - CloseTableList(rels); - } - - if (list_length(schemaidlist)) - { - List *pubrelids = NIL; - List *pubrels = NIL; - List *delschemas = NIL; - - if (stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) - pubrelids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ROOT); - - if (stmt->action == DEFELEM_SET) - { - List *oldschemaids = GetPublicationSchemas(pubform->oid); - - delschemas = list_difference_oid(oldschemaids, schemaidlist); - LockSchemaList(delschemas); - } - - LockSchemaList(schemaidlist); - pubrels = OpenReliIdList(pubrelids); - CheckPublicationAlterSchemas(stmt->action, tup, schemaidlist, - pubrels); - - if (stmt->action == DEFELEM_SET && !list_length(relations)) - PublicationDropTables(pubform->oid, pubrels, false); - - CloseTableList(pubrels); - AlterPublicationSchemas(stmt, tup, schemaidlist, delschemas); - } + AlterPublicationTables(stmt, tup, relations, schemaidlist); + AlterPublicationSchemas(stmt, tup, schemaidlist); } /* Cleanup. */ -- 2.7.2.windows.1