Re: Support logical replication of DDLs

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: Zheng Li <zhengli10(at)gmail(dot)com>
Cc: Ajin Cherian <itsajin(at)gmail(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Japin Li <japinli(at)hotmail(dot)com>, rajesh singarapu <rajesh(dot)rs0541(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Support logical replication of DDLs
Date: 2022-11-14 23:57:08
Message-ID: CAHut+PsERMFwO8oK3LFH_3CRG+512T+ay_viWzrgNetbH2MwxA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general pgsql-hackers

Here are some review comments for v32-0002

======

1. Commit message

Comment says:
While creating a publication, we register a command end
trigger that deparses the DDL as a JSON blob, and WAL logs it. The event
trigger is automatically removed at the time of drop publication.

SUGGESTION (uppercase the SQL)
During CREATE PUBLICATION we register a command end trigger that
deparses the DDL as a JSON blob, and WAL logs it. The event
trigger is automatically removed at the time of DROP PUBLICATION.

~~~

2.

Comment says:
This is a POC patch to show how using event triggers and DDL deparsing
facilities we can implement DDL replication. So, the implementation is
restricted to CREATE TABLE/ALTER TABLE/DROP TABLE commands.

~

Still correct or old comment gone stale?

~~~

3.

Comment says:
Note that the replication for ALTER INDEX command is still under
progress.

~

Still correct or old comment gone stale?

======

4. GENERAL - Patch order.

Somehow, I feel this v32-0002 patch and the v32-0001 patch should be
swapped. IIUC this one seems to me to be the "core" framework for the
DDL message replication but the other 0001 was more like just the
implements of all the supported different *kinds* of DDL JSON blobs.
So actually this patch seems more like the mandatory one and the other
one can just evolve as it gets more supported JSON.

~~~

5. GENERAL - naming

The DDL suffix 'msg' or 'message' seemed sometimes unnecessary because
there is no ambiguity that this is a message for DDL replication, so
the shorter name conveys the same amount of information, doesn't it?

e.g. Maybe reconsider some of these ones (probably there are others)...

src/include/replication/decode.h
logicalddlmsg_decode -> Why not call this function logicalddl_decode?

src/include/replication/logicalproto.h:
LOGICAL_REP_MSG_DDLMESSAGE -> Why not call it 'LOGICAL_REP_MSG_DDL'?
logicalrep_write_ddlmessage -> Why not call this function logicalrep_write_ddl?
logicalrep_read_ddlmessage -> Why not call this function logicalrep_read_ddl?

src/include/replication/output_plugin.h:
'ddlmessage_cb' -> Why not call it 'ddl_cb'?
'stream_ddlmessage_cb' -> Why not call it 'stream_ddl_cb'?

src/include/replication/reorderbuffer.h:
- 'REORDER_BUFFER_CHANGE_DDL' --> Why not call it 'REORDER_BUFFER_CHANGE_DDL'?
- 'ddlmsg' -> Why not call it 'ddl'?
- 'ddlmessage' -> Why not call it 'ddl'?
- 'stream_ddlmessage' -> Why not call it 'stream_ddl'?

======

src/backend/access/rmgrdesc/Makefile

6.

@@ -19,6 +19,7 @@ OBJS = \
hashdesc.o \
heapdesc.o \
logicalmsgdesc.o \
+ logicalddlmsgdesc.o \

Change should be in alphabetical order.

======

src/backend/access/rmgrdesc/logicalddlmsgdesc.c

7. logicalddlmsg_identify

+const char *
+logicalddlmsg_identify(uint8 info)
+{
+ if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_DDL_MESSAGE)
+ return "DDL MESSAGE";
+
+ return NULL;
+}

The logicalrep_message_type (see below) said "DDL", so maybe this
should also just say "DDL" instead of "DDL MESSAGE"

@@ -1218,6 +1264,8 @@ logicalrep_message_type(LogicalRepMsgType action)
return "TYPE";
case LOGICAL_REP_MSG_MESSAGE:
return "MESSAGE";
+ case LOGICAL_REP_MSG_DDLMESSAGE:
+ return "DDL";

======

src/backend/commands/event_trigger.c

8. start/end

+/*
+ * publication_deparse_ddl_command_start
+ *
+ * Deparse the ddl command and log it.
+ */
+Datum
+publication_deparse_ddl_command_start(PG_FUNCTION_ARGS)
...
+/*
+ * publication_deparse_ddl_command_end
+ *
+ * Deparse the ddl command and log it.
+ */
+Datum
+publication_deparse_ddl_command_end(PG_FUNCTION_ARGS)

The start/end function comments are the same -- there should be some
more explanation to say what they are for.

~~~

9. publication_deparse_ddl_command_start

+ char *command = psprintf("Drop table command start");

Huh? So this function is only for this specific case of DROP TABLE? If
correct, then I think that should be commented on or asserted
somewhere.

~

10.

+ /* extract the relid from the parse tree */
+ foreach(cell1, stmt->objects)

Uppercase comment

~

11.

+ if (relpersist == RELPERSISTENCE_TEMP)
+ {
+ table_close(relation, NoLock);
+ continue;
+ }
+
+ LogLogicalDDLMessage("deparse", address.objectId, DCT_TableDropStart,
+ command, strlen(command) + 1);
+
+ if (relation)
+ table_close(relation, NoLock);

This code looks overly complex. Can't it just be like below?

SUGGESTION

if (relpersist != RELPERSISTENCE_TEMP)
LogLogicalDDLMessage("deparse", address.objectId, DCT_TableDropStart,
command, strlen(command) + 1);

if (relation)
table_close(relation, NoLock);

~~~

12. publication_deparse_table_rewrite

+ if (relpersist == RELPERSISTENCE_TEMP)
+ return PointerGetDatum(NULL);
+
+ /* Deparse the DDL command and WAL log it to allow decoding of the same. */
+ json_string = deparse_utility_command(cmd, false);
+
+ if (json_string != NULL)
+ LogLogicalDDLMessage("deparse", cmd->d.alterTable.objectId, DCT_TableAlter,
+ json_string, strlen(json_string) + 1);
+
+ return PointerGetDatum(NULL);

Similar to previous comment I think this can be simplified so there is
only one return

SUGGESTION

if (relpersist != RELPERSISTENCE_TEMP)
{
/* Deparse the DDL command and WAL log it to allow decoding of the same. */
json_string = deparse_utility_command(cmd, false);

if (json_string != NULL)
LogLogicalDDLMessage("deparse", cmd->d.alterTable.objectId, DCT_TableAlter,
json_string, strlen(json_string) + 1);
}

return PointerGetDatum(NULL);

~~~

13. publication_deparse_ddl_command_end

+ if (relpersist == RELPERSISTENCE_TEMP)
+ continue;
+
+ /*
+ * Deparse the DDL command and WAL log it to allow decoding of the
+ * same.
+ */
+ json_string = deparse_utility_command(cmd, false);
+
+ if (json_string == NULL)
+ continue;
+
+ LogLogicalDDLMessage("deparse", relid, type, json_string,
+ strlen(json_string) + 1);

Maybe this logic is simpler without all the continue?

SUGGESTION

if (relpersist != RELPERSISTENCE_TEMP)
{
/*
* Deparse the DDL command and WAL log it to allow decoding of the
* same.
*/
json_string = deparse_utility_command(cmd, false);

if (json_string != NULL)
LogLogicalDDLMessage("deparse", relid, type, json_string,
strlen(json_string) + 1);
}

~

14.

+ if (strcmp(obj->objecttype, "table") == 0)
+ cmdtype = DCT_TableDropEnd;
+ else if (strcmp(obj->objecttype, "sequence") == 0 ||
+ strcmp(obj->objecttype, "schema") == 0 ||
+ strcmp(obj->objecttype, "index") == 0 ||
+ strcmp(obj->objecttype, "function") == 0 ||
+ strcmp(obj->objecttype, "procedure") == 0 ||
+ strcmp(obj->objecttype, "operator") == 0 ||
+ strcmp(obj->objecttype, "operator class") == 0 ||
+ strcmp(obj->objecttype, "operator family") == 0 ||
+ strcmp(obj->objecttype, "cast") == 0 ||
+ strcmp(obj->objecttype, "type") == 0 ||
+ strcmp(obj->objecttype, "domain") == 0 ||
+ strcmp(obj->objecttype, "trigger") == 0 ||
+ strcmp(obj->objecttype, "conversion") == 0 ||
+ strcmp(obj->objecttype, "policy") == 0 ||
+ strcmp(obj->objecttype, "rule") == 0 ||
+ strcmp(obj->objecttype, "extension") == 0 ||
+ strcmp(obj->objecttype, "foreign-data wrapper") == 0 ||
+ strcmp(obj->objecttype, "text search configuration") == 0 ||
+ strcmp(obj->objecttype, "text search dictionary") == 0 ||
+ strcmp(obj->objecttype, "text search parser") == 0 ||
+ strcmp(obj->objecttype, "text search template") == 0 ||
+ strcmp(obj->objecttype, "transform") == 0 ||
+ strcmp(obj->objecttype, "server") == 0 ||
+ strcmp(obj->objecttype, "collation") == 0 ||
+ strcmp(obj->objecttype, "user mapping") == 0 ||
+ strcmp(obj->objecttype, "language") == 0 ||
+ strcmp(obj->objecttype, "view") == 0 ||
+ strcmp(obj->objecttype, "materialized view") == 0 ||
+ strcmp(obj->objecttype, "statistics object") == 0 ||
+ strcmp(obj->objecttype, "access method") == 0)
+ cmdtype = DCT_ObjectDrop;
+ else
+ continue;
+
+ /* Change foreign-data wrapper to foreign data wrapper */
+ if (strncmp(obj->objecttype, "foreign-data wrapper", 20) == 0)
+ {
+ tmptype = pstrdup("foreign data wrapper");
+ command = deparse_drop_command(obj->objidentity, tmptype,
+ stmt->behavior);
+ }
+
+ /* Change statistics object to statistics */
+ else if (strncmp(obj->objecttype, "statistics object",
+ strlen("statistics object")) == 0)
+ {
+ tmptype = pstrdup("statistics");
+ command = deparse_drop_command(obj->objidentity, tmptype,
+ stmt->behavior);
+ }
+
+ /*
+ * object identity needs to be modified to make the drop work
+ *
+ * FROM: <role> on server <servername> TO : for >role> server
+ * <servername>
+ *
+ */
+ else if (strncmp(obj->objecttype, "user mapping", 12) == 0)
+ {
+ char *on_server;
+
+ tmptype = palloc(strlen(obj->objidentity) + 2);
+ on_server = strstr(obj->objidentity, "on server");
+
+ sprintf((char *) tmptype, "for ");
+ strncat((char *) tmptype, obj->objidentity, on_server - obj->objidentity);
+ strcat((char *) tmptype, on_server + 3);
+ command = deparse_drop_command(tmptype, obj->objecttype,
+ stmt->behavior);
+ }
+ else
+ command = deparse_drop_command(obj->objidentity, obj->objecttype,
+ stmt->behavior);

14a.
Why are some of these implemented as strcmp and others are implemented
as strncmp?

~

14b.
The mass strcmp seems inefficient. The same could be done in other ways like:
- use a single strstr call (where all the possibilities are in one large string)
- pass string representation of some enum and just switch on it
- etc.

~

15.

+ /*
+ * object identity needs to be modified to make the drop work
+ *
+ * FROM: <role> on server <servername> TO : for >role> server
+ * <servername>
+ *
+ */

The comment needs fixing.

~

16.

+ if (command == NULL)
+ continue;
+
+ LogLogicalDDLMessage("deparse", obj->address.objectId, cmdtype,
+ command, strlen(command) + 1);

SUGGESTION

if (command)
LogLogicalDDLMessage("deparse", obj->address.objectId, cmdtype,
command, strlen(command) + 1);

======

src/backend/commands/publicationcmds.c

17. CreateDDLReplicaEventTrigger

+ static const char *trigger_name_prefix = "pg_deparse_trig_%s_%u";
+ static const char *trigger_func_prefix = "publication_deparse_%s";

17a.
I felt the ddl deparse trigger name should have the name "ddl" in it somewhere

~

17b.
Why are these called "prefixes" ?? - They looked more just like name
format strings to me.

~~~

18. CreatePublication

+ /*
+ * Create an event trigger to allow logging of DDL statements.
+ *
+ * TODO: We need to find a better syntax to allow replication of DDL
+ * statements.
+ *
+ * XXX: This code is just to show the replication of CREATE/ALTER/DROP
+ * TABLE works. We need to enhance this once the approach for DDL
+ * replication is finalized.
+ */
+ if (pubactions.pubddl)

This comment needs updating.

~

19.

+ CommandTag end_commands[] = {
+ CMDTAG_CREATE_ACCESS_METHOD,
+ CMDTAG_DROP_ACCESS_METHOD,
+ CMDTAG_ALTER_DEFAULT_PRIVILEGES,
+ CMDTAG_COMMENT,
+ CMDTAG_CREATE_LANGUAGE,
+ CMDTAG_ALTER_LANGUAGE,
+ CMDTAG_DROP_LANGUAGE,
+ CMDTAG_CREATE_VIEW,
+ CMDTAG_ALTER_VIEW,
+ CMDTAG_DROP_VIEW,
+ CMDTAG_CREATE_MATERIALIZED_VIEW,

19a.
Some better ordering (e.g. A-Z) can be done here, and maybe use blank
lines to make the groupings more obbious.

~

19b.
Wouldn't it be better to declare these static?

======

src/backend/replication/logical/Makefile

20.

OBJS = \
decode.o \
+ ddlmessage.o\
launcher.o \
Change should be in alphabetical order.

======

src/backend/replication/logical/ddlmessage.c

21. File Comment

+ * Unlike generic logical messages, these DDL messages have only transactional
+ * mode.Note by default DDLs in PostgreSQL are transactional.

Missing space before "Note"

~~~

22. LogLogicalDDLMessage

+ /*
+ * Ensure we have a valid transaction id.
+ */
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();

Single line comment should be OK here

~

23.

+ /* trailing zero is critical; see logicalddlmsg_desc */

Uppercase comment

~

24.

+ /* allow origin filtering */

Uppercase comment

======

src/backend/replication/logical/proto.c

25. logicalrep_read_ddlmessage

+ uint8 flags;
+ char *msg;
+
+ //TODO double check when do we need to get TransactionId.
+
+ flags = pq_getmsgint(in, 1);
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in ddl message", flags);
+ *lsn = pq_getmsgint64(in);
+ *prefix = pq_getmsgstring(in);
+ *sz = pq_getmsgint(in, 4);
+ msg = (char *) pq_getmsgbytes(in, *sz);
+
+ return msg;

25a.
This code will fail if the associated *write* function has sent a xid.
Maybe additional param is needed to tell it when to read the xid?

~

25b.
Will be tidier to have a blank line after the elog

~~~

26. logicalrep_write_ddlmessage

+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);

Perhaps this "write" function should *always* write the xid even if it
is invalid because then the "read" function will know to always read
it.

======

src/backend/replication/logical/reorderbuffer.c

27. ReorderBufferQueueDDLMessage

+ Assert(xid != InvalidTransactionId);

SUGGESTION
Assert(TransactionIdIsValid(xid));

~~~

28. ReorderBufferSerializeChange

+ data += sizeof(int);
+ memcpy(data, change->data.ddlmsg.prefix,
+ prefix_size);
+ data += prefix_size;

Unnecessary wrapping of memcpy.

~

29.

+ memcpy(data, &change->data.ddlmsg.cmdtype, sizeof(int));
+ data += sizeof(int);

Would that be better to write as:

sizeof(DeparsedCommandType) instead of sizeof(int)

~~~

30. ReorderBufferChangeSize

+ case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+ {
+ Size prefix_size = strlen(change->data.ddlmsg.prefix) + 1;
+
+ sz += prefix_size + change->data.ddlmsg.message_size +
+ sizeof(Size) + sizeof(Size) + sizeof(Oid) + sizeof(int);

sizeof(DeparsedCommandType) instead of sizeof(int)

~~~

31. ReorderBufferRestoreChange

+ case REORDER_BUFFER_CHANGE_DDLMESSAGE:
+ {
+ Size prefix_size;
+
+ /* read prefix */
+ memcpy(&prefix_size, data, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(&change->data.ddlmsg.relid, data, sizeof(Oid));
+ data += sizeof(Oid);
+ memcpy(&change->data.ddlmsg.cmdtype, data, sizeof(int));
+ data += sizeof(int);
+ change->data.ddlmsg.prefix = MemoryContextAlloc(rb->context, prefix_size);
+ memcpy(change->data.ddlmsg.prefix, data, prefix_size);
+ Assert(change->data.ddlmsg.prefix[prefix_size - 1] == '\0');
+ data += prefix_size;
+
+ /* read the message */
+ memcpy(&change->data.msg.message_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.msg.message = MemoryContextAlloc(rb->context,
+ change->data.msg.message_size);
+ memcpy(change->data.msg.message, data,
+ change->data.msg.message_size);
+ data += change->data.msg.message_size;

31a.
sizeof(DeparsedCommandType) better instead of sizeof(int)?

~

31b.
Uppercase the comments

======

src/backend/replication/logical/worker.c

32. preprocess_create_table

+/* Remove the data population from the command */
+static void
+preprocess_create_table(RawStmt *command)

The comment is too short. Needs more explanation than this.

~~~

33. handle_create_table

+/*
+ * Handle CREATE TABLE command
+ *
+ * Call AddSubscriptionRelState for CREATE TABEL command to set the relstate to
+ * SUBREL_STATE_READY so DML changes on this new table can be
replicated without
+ * having to manually run "alter subscription ... refresh publication"
+ */

Typo "TABEL"

~~~

34. handle_create_table

+ switch (commandTag)
+ {
+ case CMDTAG_CREATE_TABLE:
+ {
+ CreateStmt *cstmt = (CreateStmt *) command->stmt;
+
+ rv = cstmt->relation;
+ }
+ break;
+ default:
+ break;
+ }
+
+ if (!rv)
+ return;

This switch seems overcomplicated since the function only cares about
CMDTAG_CREATE_TABLE.

SUGGESTION

if (commandTag == CMDTAG_CREATE_TABLE)
{
CreateStmt *cstmt = (CreateStmt *) command->stmt;
rv = cstmt->relation;
}
else
{
return;
}

~

35.

+ if (relnamespace != InvalidOid)
+ relid = get_relname_relid(relname, relnamespace);
+ else
+ relid = RelnameGetRelid(relname);
+
+ if (relid != InvalidOid)
+ {

35a.
Maybe better to use the OidIsValid() macro for these places

~

35b.
I'm not 100% sure of this logic. Is it even *possible* for these to be
InvalidOid -- e.g. I thought the CREATE TABLE would have failed
already if this was the case. Maybe these checks can be changed to
Asserts?

~~~

36. apply_handle_ddl

+
+static void
+apply_handle_ddl(StringInfo s)

Missing function comment

======

src/backend/replication/pgoutput/pgoutput.c

37. pgoutput_change

@@ -1377,9 +1386,22 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
ReorderBufferChangeType action = change->action;
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
+ bool table_rewrite = false;

update_replication_progress(ctx, false);

+ /*
+ * For heap rewrites, we might need to replicate them if the rewritten
+ * table publishes rewrite ddl message. So get the actual relation here
+ * and check the pubaction later.
+ */
+ if (relation->rd_rel->relrewrite)
+ {
+ table_rewrite = true;
+ relation = RelationIdGetRelation(relation->rd_rel->relrewrite);
+ targetrel = relation;
+ }
+
if (!is_publishable_relation(relation))
return;

@@ -1413,6 +1435,13 @@ pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Assert(false);
}

+ /*
+ * We don't publish table rewrite change unless we publish the rewrite ddl
+ * message.
+ */
+ if (table_rewrite && !relentry->pubactions.pubddl)
+ return;
+

Something does not seem right. Other code later in this function takes
care to call RelationClose(relation), but in the above change, the
logic is just returning without closing anything.

~~~

38. pgoutput_message

@@ -1671,8 +1714,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,

static void
pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
- XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
- const char *message)
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size sz, const char *message)
{

This change of wrapping seems unrelated , so should not be done in this patch.

~~~

39. pgoutput_ddlmessage

+static void
+pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ const char *prefix, Oid relid, DeparsedCommandType cmdtype,
+ Size sz, const char *message)

Missing function comment.

~

40.

+ switch (cmdtype)

40a.
Might be tidier to have a consistent space *before* each case of this switch.

~

40b.
I felt it was too confusing having some of the switch case break and
some of the switch cases return from the function -- e.g It seems
difficult to know what conditions will execute the code that follows
the switch. Maybe all this needs to be refactored somehow, or just
commented on more.

======

src/bin/pg_dump/pg_dump.c

41. getPublications

- if (fout->remoteVersion >= 130000)
+ if (fout->remoteVersion >= 150000)

Should be >= 160000, right?

~

42.

else if (fout->remoteVersion >= 110000)
appendPQExpBufferStr(query,
"SELECT p.tableoid, p.oid, p.pubname, "
"p.pubowner, "
- "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete,
p.pubtruncate, false AS pubviaroot "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete,
p.pubtruncate, false as p.pubddl, false AS pubviaroot "
"FROM pg_publication p");
else
appendPQExpBufferStr(query,
"SELECT p.tableoid, p.oid, p.pubname, "
"p.pubowner, "
- "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS
pubtruncate, false AS pubviaroot "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS
pubtruncate, false as p.pubddl, false AS pubviaroot "
"FROM pg_publication p");

Use uppercase 'AS' for consistency with other code.

======

src/bin/pg_dump/pg_dump.h

43. PublicationInfo

@@ -620,6 +620,7 @@ typedef struct _PublicationInfo
bool pubdelete;
bool pubtruncate;
bool pubviaroot;
+ bool pubddl;
} PublicationInfo;

IMO the new member should be adjacent to the other 'publish' parameter
values like pubdelete/pubtruncate.

======

src/bin/psql/describe.c

44. listPublications

+ if (pset.sversion >= 140000)
+ appendPQExpBuffer(&buf,
+ ",\n pubddl AS \"%s\"",
+ gettext_noop("DDLs"));

44a.
Should that be 160000?

~

44b.
IMO it would be better if "DLLs" column appeared adjacent to that the
other 'publish' parameter option values. (e.g. these are not even the
same column ordering as pg_dump).

~~~

45. describePublications

has_pubtruncate = (pset.sversion >= 110000);
has_pubviaroot = (pset.sversion >= 130000);
+ has_pubddl = (pset.sversion >= 150000);

Shouldn't that be 160000?

~

46.

@@ -6313,6 +6319,9 @@ describePublications(const char *pattern)
if (has_pubviaroot)
appendPQExpBufferStr(&buf,
", pubviaroot");
+ if (has_pubddl)
+ appendPQExpBufferStr(&buf,
+ ", pubddl");

IMO it would be better if "DLLs" column appeared adjacent to that the
other 'publish' parameter option values. (e.g. these are not even the
same column ordering as pg_dump).

======

src/include/catalog/pg_proc.dat

47.

+{ oid => '4644', descr => 'trigger for ddl command deparse',
+ proname => 'publication_deparse_ddl_command_end', prorettype =>
'event_trigger',
+ proargtypes => '', prosrc => 'publication_deparse_ddl_command_end' },

Why doesn't the description say 'end'?

======

src/include/catalog/pg_publication.h

48. FormData_pg_publication

+
+ /* true if table creations are published */
+ bool pubddl;
} FormData_pg_publication;

Why just table publications? I thought it was for EVERYTHING.

~~~

49. PublicationActions

+ bool pubddl;
} PublicationActions;

This might be OK for POC, but for the real feature, I think this
should be more fine-grained than this all-or-nothing DDL.

======

src/include/replication/ddlmessage.h

50.

+{
+ Oid dbId; /* database Oid emitted from */
+ Size prefix_size; /* length of prefix */
+ Oid relid; /* id of the table */
+ DeparsedCommandType cmdtype; /* type of sql command */
+ Size message_size; /* size of the message */
+
+ /*
+ * payload, including null-terminated prefix of length prefix_size
+ */
+ char message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_ddl_message;

50a.
The prefix_size comment needs to say /* length of the prefix
(including '\0' terminator) */

~

50b.
'relid' seems specific to TABLE DDL. Will future versions have many
more Oid members here? Or should this be a union member or a generic
name like 'objid'?

~~~

51. XLOG_LOGICAL_DDL_MESSAGE

+/* RMGR API*/
+#define XLOG_LOGICAL_DDL_MESSAGE 0x00

0x00 is same value as XLOG_LOGICAL_MESSAGE in message.h. That doesn't
seem correct because then how will those different messages be
identified?

======

src/include/replication/logicalproto.h

52. LogicalRepMsgType

@@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
+ LOGICAL_REP_MSG_DDLMESSAGE = 'L',
LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',

The name already includes _MSG_ so why say MESSAGE again? IMO this
should be called just LOGICAL_REP_MSG_DDL. See general comment.

~~~

53.

extern void logicalrep_write_message(StringInfo out, TransactionId
xid, XLogRecPtr lsn,
- bool transactional, const char *prefix, Size sz, const char *message);
+ bool transactional, const char *prefix,
+ Size sz, const char *message);

Modifying the wrapping of this unrelated function should not be done
in this patch.

======

src/include/replication/reorderbuffer.h

54. REORDER_BUFFER_CHANGE_DDLMESSAGE

@@ -56,6 +58,7 @@ typedef enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
REORDER_BUFFER_CHANGE_DELETE,
+ REORDER_BUFFER_CHANGE_DDLMESSAGE,

Why not call it REORDER_BUFFER_CHANGE_DDL? -- see general review comment

~~~

55. ReorderBufferChange

+ /* DDL Message. */
+ struct
+ {
+ char *prefix;
+ Size message_size;
+ char *message;
+ Oid relid;
+ DeparsedCommandType cmdtype;
+ } ddlmsg;
+

Why not call it ddl? -- see general review comment

======

src/test/regress/expected/psql.out

56.

\dRp "no.such.publication"
- List of publications
- Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
-------+-------+------------+---------+---------+---------+-----------+----------
+ List of publications
+ Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
| Via root | DDLs
+------+-------+------------+---------+---------+---------+-----------+----------+------
(0 rows)

I wondered if "DDLs" belongs adjacent to the
Inserts/Updates/Deletes/Trucates because those are the other "publish"
parameters just like this.

======

src/test/regress/expected/publication.out

57.

(Ditto comment for psql.out)

I wondered if "DDLs" belongs adjacent to the
Inserts/Updates/Deletes/Trucates because those are the other "publish"
parameters just like this.

~~~

58.

Looks like there is a missing regress test case where you actually set
the publish='ddl' and then verify that the DDLs column is correctly
set 't'?

======

59. MISC = typedefs.list

There are missing some typedefs.list changes for this patch. At least
the following:

e.g.
- DeparsedCommandType (from ddlmessage.h)
- xl_logical_ddl_message (from ddlmessage.h)
- LogicalDecodeDDLMessageCB (from output_plugin.h)
- LogicalDecodeStreamDDLMessageCB (from output_plugin.h)
- ReorderBufferDDLMessageCB (from reorderbuffer.h)
- ReorderBufferStreamDDLMessageCB (from reorderbuffer.h)

------

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-general by date

  From Date Subject
Next Message Thomas Munro 2022-11-15 03:03:40 Re: PANIC: could not flush dirty data: Cannot allocate memory
Previous Message Christoph Moench-Tegeder 2022-11-14 21:54:03 Re: PANIC: could not flush dirty data: Cannot allocate memory

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2022-11-14 23:58:29 Re: pg_basebackup's --gzip switch misbehaves
Previous Message Andres Freund 2022-11-14 23:53:28 Re: [RFC] building postgres with meson - v12