Re: Support logical replication of DDLs

From: vignesh C <vignesh21(at)gmail(dot)com>
To: Zheng Li <zhengli10(at)gmail(dot)com>
Cc: Ajin Cherian <itsajin(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Japin Li <japinli(at)hotmail(dot)com>, rajesh singarapu <rajesh(dot)rs0541(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Support logical replication of DDLs
Date: 2022-10-31 10:47:46
Message-ID: CALDaNm30PFSLoBjb_dewmRwbXSmytn1xt8yapjjmstHXGxgbRg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general pgsql-hackers

On Thu, 27 Oct 2022 at 16:02, vignesh C <vignesh21(at)gmail(dot)com> wrote:
>
> On Thu, 27 Oct 2022 at 02:09, Zheng Li <zhengli10(at)gmail(dot)com> wrote:
> >
> > > Adding support for deparsing of CREATE/ALTER/DROP LANGUAGE for ddl replication.
> >
> > Adding support for deparsing of:
> > COMMENT
> > ALTER DEFAULT PRIVILEGES
> > CREATE/DROP ACCESS METHOD
>
> Adding support for deparsing of:
> ALTER/DROP ROUTINE
>
> The patch also includes fixes for the following issues:

Few comments:
1) If the function specifies table without schema, should we include
the schema name too, else it will fail with "relation does not exist"
in subscription.
+ /* Add the function definition */
+ (void) SysCacheGetAttr(PROCOID, procTup,
Anum_pg_proc_prosqlbody, &isnull);
+ if (procForm->prolang == SQLlanguageId && !isnull)
+ {
+ StringInfoData buf;
+
+ initStringInfo(&buf);
+ print_function_sqlbody(&buf, procTup);
+
+ append_string_object(createFunc, "%{definition}s", buf.data);
+ }
ex:
CREATE PROCEDURE insert_data(a integer)
LANGUAGE SQL
AS $$
INSERT INTO tbl VALUES (a);
$$;

2) This function should handle "alter procedure" too:
+/*
+ * Deparse an AlterFunctionStmt (ALTER FUNCTION/ROUTINE)
+ *
+ * Given a function OID and the parse tree that created it, return the JSON
+ * blob representing the alter command.
+ */
+static ObjTree *
+deparse_AlterFunction(Oid objectId, Node *parsetree)
+{
+ AlterFunctionStmt *node = (AlterFunctionStmt *) parsetree;
+ ObjTree *alterFunc;
+ ObjTree *sign;
+ HeapTuple procTup;

Currently "alter procedure" statement are replicated as "alter
function" statements in the subscriber.

3) In few of the extensions we execute "alter operator family" like in
hstore extension, we should exclude replicating "alter operator
family" when create extension is in progress:
/* Don't deparse SQL commands generated while creating extension */
if (cmd->in_extension)
return NULL;

The above check should be included in the below code, else the create
extension statment will fail as internal statements will be executed:

+static ObjTree *
+deparse_AlterOpFamily(CollectedCommand *cmd)
+{
+ ObjTree *alterOpFam;
+ AlterOpFamilyStmt *stmt = (AlterOpFamilyStmt *) cmd->parsetree;
+ HeapTuple ftp;
+ Form_pg_opfamily opfForm;
+ List *list;
+ ListCell *cell;
+
+ ftp = SearchSysCache1(OPFAMILYOID,
+
ObjectIdGetDatum(cmd->d.opfam.address.objectId));
+ if (!HeapTupleIsValid(ftp))
+ elog(ERROR, "cache lookup failed for operator family %u",
+ cmd->d.opfam.address.objectId);
+ opfForm = (Form_pg_opfamily) GETSTRUCT(ftp);
+

4) This if...else can be removed, the nspid and typname can be handled
for others in default. *nspid can be set to InvalidOid at the
beginning.
+ if (type_oid == INTERVALOID ||
+ type_oid == TIMESTAMPOID ||
+ type_oid == TIMESTAMPTZOID ||
+ type_oid == TIMEOID ||
+ type_oid == TIMETZOID)
+ {
+ switch (type_oid)
+ {
+ case INTERVALOID:
+ *typname = pstrdup("INTERVAL");
+ break;
+ case TIMESTAMPTZOID:
+ if (typemod < 0)
+ *typname = pstrdup("TIMESTAMP
WITH TIME ZONE");
+ else
+ /* otherwise, WITH TZ is added
by typmod. */
+ *typname = pstrdup("TIMESTAMP");
+ break;
+ case TIMESTAMPOID:
+ *typname = pstrdup("TIMESTAMP");
+ break;
+ case TIMETZOID:
+ if (typemod < 0)
+ *typname = pstrdup("TIME WITH
TIME ZONE");
+ else
+ /* otherwise, WITH TZ is added
by typmod. */
+ *typname = pstrdup("TIME");
+ break;
+ case TIMEOID:
+ *typname = pstrdup("TIME");
+ break;
+ }
+ *nspid = InvalidOid;
+ }
+ else
+ {
+ /*
+ * No additional processing is required for other
types, so get the
+ * type name and schema directly from the catalog.
+ */
+ *nspid = typeform->typnamespace;
+ *typname = pstrdup(NameStr(typeform->typname));
+ }

5) The following includes are not required in ddl_deparse.c:
#include "catalog/pg_attribute.h"
#include "catalog/pg_class.h"
#include "lib/ilist.h"
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "utils/memutils.h"

6) Inconsistent error reporting:
In few places elog is used and in few places ereport is used:
+ HeapTuple polTup = get_catalog_object_by_oid(polRel,
Anum_pg_policy_oid, policyOid);
+ Form_pg_policy polForm;
+
+ if (!HeapTupleIsValid(polTup))
+ elog(ERROR, "cache lookup failed for policy %u", policyOid);

+ char *rolename;
+
+ roltup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(roleoid));
+ if (!HeapTupleIsValid(roltup))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("role with OID %u does
not exist", roleoid)));

We can try to use the same style of error reporting.

7) There is few small indentation issue, we could run pg_ident:
--- a/src/backend/commands/collationcmds.c
+++ b/src/backend/commands/collationcmds.c
@@ -50,7 +50,8 @@ typedef struct
* CREATE COLLATION
*/
ObjectAddress
-DefineCollation(ParseState *pstate, List *names, List *parameters,
bool if_not_exists)
+DefineCollation(ParseState *pstate, List *names, List *parameters,
+ bool if_not_exists, ObjectAddress *from_collid)
{

8) Inclusion ordering in ddl_deparse.c:
8.a) The following should be slightly reordered
+#include "access/amapi.h"
+#include "access/table.h"
+#include "access/relation.h"

8.b) The following should be slightly reordered
+#include "postgres.h"
+#include "tcop/ddl_deparse.h"
+#include "access/amapi.h"

9) In few places multi line comment can be changed to single line comment:
9.a)
+ /*
+ * Fetch the pg_class tuple of the index relation
+ */

9.b)
+ /*
+ * Fetch the pg_am tuple of the index' access method
+ */

9.c)
+ /*
+ * Reject unsupported case right away.
+ */

10) This should also specify ROUTINE in the comment
/*
* Verbose syntax
*
* ALTER FUNCTION %{signature}s %{definition: }s
*/
alterFunc = new_objtree_VA(node->objtype == OBJECT_ROUTINE ?
"ALTER ROUTINE" : "ALTER FUNCTION", 0);

11) This can be changed in alphabetical order(collation first and then column):
11.a)
+ case OBJECT_COLUMN:
+ return "COLUMN";
+ case OBJECT_COLLATION:
+ return "COLLATION";
+ case OBJECT_CONVERSION:
+ return "CONVERSION";

11.b) similarly here:
case OBJECT_FDW:
return "FOREIGN DATA WRAPPER";
case OBJECT_FOREIGN_SERVER:
return "SERVER";
case OBJECT_FOREIGN_TABLE:
return "FOREIGN TABLE";

11.c) similarly here:
case OBJECT_FUNCTION:
return "FUNCTION";
case OBJECT_ROUTINE:
return "ROUTINE";
case OBJECT_INDEX:
return "INDEX";

11.d) similarly here:
case OBJECT_OPCLASS:
return "OPERATOR CLASS";
case OBJECT_OPERATOR:
return "OPERATOR";
case OBJECT_OPFAMILY:
return "OPERATOR FAMILY";

11.e) similarly here:
case OBJECT_TRIGGER:
return "TRIGGER";
case OBJECT_TSCONFIGURATION:
return "TEXT SEARCH CONFIGURATION";

/*
* case OBJECT_TSCONFIG_MAPPING:
* return "TEXT SEARCH CONFIGURATION MAPPING";
*/
case OBJECT_TSDICTIONARY:
return "TEXT SEARCH DICTIONARY";
case OBJECT_TSPARSER:
return "TEXT SEARCH PARSER";
case OBJECT_TSTEMPLATE:
return "TEXT SEARCH TEMPLATE";
case OBJECT_TYPE:
return "TYPE";

12) new_objtree can be used instead of new_objtree_VA when there is no
arguments, one additional check can be avoided

12.a) alterFunc = new_objtree_VA(node->objtype == OBJECT_ROUTINE ?
"ALTER ROUTINE" : "ALTER FUNCTION", 0);

12.b) ObjTree *tmpobj = new_objtree_VA("", 0);

12.c) tmpobj = new_objtree_VA(strVal(defel->arg), 0);

12.d) tmpobj = new_objtree_VA("ROWS", 0);

12.e) grantStmt = new_objtree_VA(fmt, 0);

12.f) tmp = new_objtree_VA("ALL PRIVILEGES", 0);

12.g) tmpobj2 = new_objtree_VA("FOR ORDER BY", 0);

12.h) composite = new_objtree_VA("CREATE TYPE", 0);

12.i) tmp = new_objtree_VA("OPTIONS", 0);

12.j) tmp = new_objtree_VA("NO HANDLER", 0);

12.k) .... similarly in few more places .....

13) In a few places we use RowExclusiveLock and in a few places we use
AccessShareLock, is this intentional?
+ ObjTree *tmp;
+
+ rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock);
+
+ fdwTup = SearchSysCache1(FOREIGNDATAWRAPPEROID,
+
ObjectIdGetDatum(objectId));

+ List *list = NIL;
+ ListCell *cell;
+
+ pg_extension = table_open(ExtensionRelationId, AccessShareLock);

There are similar instances elsewhere too.

14) Can the else statment be removed, since we are not appending anything?
+ if (typForm->typnotnull)
+ append_string_object(createDomain, "%{not_null}s", "NOT NULL");
+ else
+ append_string_object(createDomain, "%{not_null}s", "");

15) This might not be supported currently, this might be a dead code
as we will be throwing an error "CREATE EXTENSION ... FROM is no
longer supported"
+ else if (strcmp(opt->defname, "old_version") == 0)
+ {
+ tmp = new_objtree_VA("FROM %{version}L", 2,
+
"type", ObjTypeString, "from",
+
"version", ObjTypeString, defGetString(opt));
+ list = lappend(list, new_object_object(tmp));
+ }

Regards,
Vignesh

In response to

Responses

Browse pgsql-general by date

  From Date Subject
Next Message Gus Spier 2022-10-31 10:50:42 Off-topic? How to extract database statements from JPA?
Previous Message Peter Smith 2022-10-31 08:07:37 Re: Support logical replication of DDLs

Browse pgsql-hackers by date

  From Date Subject
Next Message Bharath Rupireddy 2022-10-31 10:51:06 Re: heavily contended lwlocks with long wait queues scale badly
Previous Message Amit Kapila 2022-10-31 10:44:06 Re: Perform streaming logical transactions by background workers and parallel apply