Re: Support logical replication of DDLs

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: shveta malik <shveta(dot)malik(at)gmail(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Ajin Cherian <itsajin(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Runqi Tian <runqidev(at)gmail(dot)com>, Tom Lane <tgl(at)sss(dot)pgh(dot)pa(dot)us>, li jie <ggysxcq(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Japin Li <japinli(at)hotmail(dot)com>, rajesh singarapu <rajesh(dot)rs0541(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Zheng Li <zhengli10(at)gmail(dot)com>
Subject: Re: Support logical replication of DDLs
Date: 2023-04-21 00:26:27
Message-ID: CAHut+PtOODRybaptKRKUWZnGw-PZuLF2BxaitnMSNeAiU8-yPg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general pgsql-hackers

Here are some more review comments for the patch 0002-2023_04_07-2

This was a WIP review in parts because the patch was quite large:

WIP part 1 [1] was posted 17/4.
WIP part 2 [2] was posted 17/4.
WIP part 3 [3] was posted 19/4.
WIP part 4 is this post. (This is my final WIP part for this 0002 patch)

======
contrib/test_decoding/sql/ddl.sql

1.
+SELECT 'ddl msg2' FROM pg_logical_emit_ddl_message('ddl msg2', 16394,
1, '{"fmt": "CREATE SCHEMA %{if_not_exists}s %{name}I
%{authorization}s", "name": "foo", "authorization": {"fmt":
"AUTHORIZATION %{authorization_role}I", "present": false,
"authorization_role": null}, "if_not_exists": ""}');

I wasn't entirely sure what are these tests showing. It seems to do
nothing but hardwire a bunch of random stuff and then print it out
again. Am I missing something?

And are those just bogus content payloads? Maybe they are valid JSON
but AFAICT nobody is using them. What is the advantage of using this
bogus payload data instead of just a simple string like "DDL message
content goes here"?

======
contrib/test_decoding/test_decoding.c

2. _PG_output_plugin_init

cb->message_cb = pg_decode_message;
+ cb->ddl_cb = pg_decode_ddl_message;
cb->filter_prepare_cb = pg_decode_filter_prepare;

Where is the 'stream_ddl_cb' to match this?

~~~

3. pg_decode_ddl_message

+static void
+pg_decode_ddl_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, const char *prefix, Oid relid,
+ DeparsedCommandType cmdtype, Size sz, const char *message)
+{
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfo(ctx->out, "message: prefix: %s, relid: %u, ",
+ prefix, relid);

Should the appendStringInfo say "DDL message:" instead of "message"? I
can't tell if this was deliberate or a cut/paste error from the
existing code.

~~~

4. pg_decode_ddl_message

+ appendStringInfo(ctx->out, "sz: %zu content:", sz);
+ appendBinaryStringInfo(ctx->out, message, sz);
+ OutputPluginWrite(ctx, true);

4a.
Should there be a whitespace after that last 'content:' before the
binary content?

~

4b.
Is it necessary to say this is 'Binary'? I thought this payload was
only JSON text data.

======
src/backend/replication/logical/ddltrigger.c

5.
+/*-------------------------------------------------------------------------
+ *
+ * ddltrigger.c
+ * Logical DDL messages.
+ *
+ * Copyright (c) 2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/ddltrigger.c
+ *
+ * NOTES
+ *
+ * Deparse the ddl command and log it.
+ *
+ * ---------------------------------------------------------------------------
+ */

~

5a.
Just saying "Logical DDL messages" is the same header comment as in
the other new file ddlmessges.c, so it looks like a cut/paste issue.

~

5b.
Should say 2023.

~~~

6. publication_deparse_ddl_command_start

+/*
+ * Deparse the ddl command and log it prior to
+ * execution. Currently only used for DROP TABLE command
+ * so that catalog can be accessed before being deleted.
+ * This is to check if the table is part of the publication
+ * or not.
+ */
+Datum
+publication_deparse_ddl_command_start(PG_FUNCTION_ARGS)
+{
+ EventTriggerData *trigdata;
+ char *command = psprintf("Drop table command start");

Since information about this only being for DROP is hardcoded and in
the function comment, shouldn't this whole function be renamed to
something DROP-specific? e.g
publication_deparse_ddl_drop_command_start()

~~~

7. publication_deparse_ddl_command_start

+ if (relation)
+ table_close(relation, NoLock);

I thought this check was not needed because the relation was already
checked earlier in this function so it cannot be NULL here.

~~~

8. publication_deparse_table_rewrite

+ char relpersist;
+ CollectedCommand *cmd;
+ char *json_string;

The json_string can be declared later within the scope that uses it,
instead of here at the top.

~~~

9. publication_deparse_ddl_command_end

+ ListCell *lc;
+ slist_iter iter;
+ DeparsedCommandType type;
+ Oid relid;
+ char relkind;

9a.
Some of these variable declarations seem misplaced. I think the
'json_string' and the 'type' can be at a lower scope, can't they?

~

9b.
Also IMO it is better to call 'type' as 'cmdtype', so it has the same
name as the variable in the other slist_foreach loop.

~~~

10. publication_deparse_ddl_command_end

+ foreach(lc, currentEventTriggerState->commandList)
+ {
+ char relpersist = RELPERSISTENCE_PERMANENT;
+ CollectedCommand *cmd = lfirst(lc);
+ char *json_string;

This json_string can be declared later only in the scope that uses it.

~~~

11. publication_deparse_ddl_command_end

+ if (cmd->type == SCT_Simple &&
+ !OidIsValid(cmd->d.simple.address.objectId))
+ continue;
+
+ if (cmd->type == SCT_AlterTable)
+ {
+ relid = cmd->d.alterTable.objectId;
+ type = DCT_TableAlter;
+ }
+ else
+ {
+ /* Only SCT_Simple for now */
+ relid = cmd->d.simple.address.objectId;
+ type = DCT_SimpleCmd;
+ }

This code seemed structured a bit strangely to me; The comment /* Only
SCT_Simple for now */ appears to be expecting something that may not
be guaranteed. Perhaps the below-suggested code is closer to what was
intended?

SUGGESTION (should it be like this?)

if (cmd->type == SCT_AlterTable)
{
relid = cmd->d.alterTable.objectId;
type = DCT_TableAlter;
}
else
{
/* Only SCT_Simple for now */
if (cmd->type != SCT_Simple)
continue;

if (!OidIsValid(cmd->d.simple.address.objectId))
continue;
relid = cmd->d.simple.address.objectId;
type = DCT_SimpleCmd;
}

~~~

12. publication_deparse_ddl_command_end

+ slist_foreach(iter, &(currentEventTriggerState->SQLDropList))
+ {

I thought there should be some comment describing the purpose of this 2nd loop.

~~~

13. publication_deparse_ddl_command_end

+ return PointerGetDatum(NULL);
+}
+
+

Double blank lines.

~~~

14. publication_deparse_table_init_write

+ /*
+ * Do not generate wal log for commands whose target table is a temporary
+ * table.
+ *
+ * We will generate wal logs for unlogged tables so that unlogged tables
+ * can also be created and altered on the subscriber side. This makes it
+ * possible to directly replay the SET LOGGED command and the incoming
+ * rewrite message without creating a new table.
+ */
+ if (relpersist != RELPERSISTENCE_PERMANENT)
+ return PointerGetDatum(NULL);
+
+ /* Deparse the DDL command and WAL log it to allow decoding of the same. */
+ json_string = deparse_utility_command(cmd, false);
+
+ if (json_string != NULL)
+ LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_SimpleCmd,
+ json_string, strlen(json_string) + 1);
+
+ return PointerGetDatum(NULL);

Some other code with the same logic to skip temporary tables is
written differently to this. e.g. see
publication_deparse_ddl_command_end, which looks like below:

+ if (relpersist == RELPERSISTENCE_PERMANENT)
+ {
+ /*
+ * Deparse the DDL command and WAL log it to allow decoding of the
+ * same.
+ */
+ json_string = deparse_utility_command(cmd, false);
+
+ if (json_string != NULL)
+ LogLogicalDDLMessage("deparse", relid, type, json_string,
+ strlen(json_string) + 1);
+ }

14a.
I thought this publication_deparse_table_init_write should be coded in
a similar way, instead of having 2x return PointerGetDatum(NULL);

~

14b.
Also, move the 'json_string' into this new scope (similar to the
previous review comments above)

======
src/backend/replication/logical/worker.c

15. General

IMO it might end up being tidier to rename all the DDL-related
functions with 'ddl' in the name:

e.g. preprocess_create_table --> preprocess_ddl_create_table
e.g. handle_create_table --> handle_ddl_create_table

~~~

16. preprocess_create_table

+/*
+ * Special handling for CREATE TABLE AS and SELECT INTO
+ * to not populate data from the source table on the subscriber.
+ * Allow the data to be replicated through INSERTs on the publisher.
+ */
+static void
+preprocess_create_table(RawStmt *command)
+{
+ CommandTag commandTag;
+
+ commandTag = CreateCommandTag((Node *) command);
+
+ switch (commandTag)
+ {
+ case CMDTAG_CREATE_TABLE_AS:
+ case CMDTAG_SELECT_INTO:
+ {
+ CreateTableAsStmt *castmt = (CreateTableAsStmt *) command->stmt;
+
+ if (castmt->objtype == OBJECT_TABLE)
+ {
+ /*
+ * Force skipping data population to avoid data
+ * inconsistency. Data should be replicated from the
+ * publisher instead.
+ */
+ castmt->into->skipData = true;
+ }
+ }
+ break;
+ case CMDTAG_SELECT:
+ {
+ SelectStmt *sstmt = (SelectStmt *) command->stmt;
+
+ if (sstmt->intoClause != NULL)
+ {
+ /*
+ * Force skipping data population to avoid data
+ * inconsistency. Data should be replicated from the
+ * publisher instead.
+ */
+ sstmt->intoClause->skipData = true;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+}

16a.
Maybe just slightly reword the function-header comment.

SUGGESTION
CREATE TABLE AS and SELECT INTO require special handling to force them
to skip populating data from the source table on the subscriber. Data
should be replicated from the publisher instead.

~

16b
I felt it was not really necessary to have those "Force skipping
data..." comments for each of the cases because those comments are
pretty much saying the same thing as the function-header comment. Then
several '{}' can also be removed, so the whole function becomes much
shorter.

~~~

17. handle_create_table

+/*
+ * Handle CREATE TABLE command
+ *
+ * Call AddSubscriptionRelState for CREATE LABEL command to set the relstate to
+ * SUBREL_STATE_READY so DML changes on this new table can be
replicated without
+ * having to manually run "alter subscription ... refresh publication"
+ */
+static void
+handle_create_table(RawStmt *command)

17a.
/CREATE LABEL/CREATE TABLE/

~

17b
"alter subscription ... refresh publication" --> "ALTER SUBSCRIPTION
... REFRESH PUBLICATION"

~

17c.
I think the function name is misleading because this is not really
handling the task of table creation. IIUC the actual DDL for the
CREATE TABLE performed already in the apply_handle_ddl() function. If
that is correct then IMO a better name for this function is more like
postprocess_create_table(). Also, that kind of naming would pair
nicely with the exiting preprocess_create_table().

See also review comment #15 above, so in the end these functions could
be called like:
- preprocess_ddl_create_table
- postprocess_ddl_create_table

~~~

18. handle_create_table

+ commandTag = CreateCommandTag((Node *) command);
+ cstmt = (CreateStmt *) command->stmt;
+ rv = cstmt->relation;
+
+ if (commandTag == CMDTAG_CREATE_TABLE)
+ {
+ cstmt = (CreateStmt *) command->stmt;
+ rv = cstmt->relation;
+ }
+ else
+ {
+ return;
+ }
+
+ if (!rv)
+ return;

This seemed quite strangely coded. Also, the assignment to 'cstmt' and
'rv' are duplicated (??)

SUGGESTION
commandTag = CreateCommandTag((Node *) command);

if (commandTag != CMDTAG_CREATE_TABLE)
return;
cstmt = (CreateStmt *) command->stmt;
rv = cstmt->relation;
if (!rv)
return;
~~~

19. handle_create_table

+ if (relnamespace != InvalidOid)
+ relid = get_relname_relid(relname, relnamespace);
+ else
+ relid = RelnameGetRelid(relname);
+
+ if (OidIsValid(relid))

19a
IMO 'relnamespace' could have an improved name like 'relnamespace_oid'

~

19b.
+ if (relnamespace != InvalidOid)
should match the other check

SUGGESTION
if (OidIsValid(relnamespace))

~~~

20. apply_handle_ddl

+/*
+ * Handle DDL replication messages.
+ */
+static void
+apply_handle_ddl(StringInfo s)

This is an important function for the DDL replication logic; I felt it
should have some descriptive comment to say what it is doing.

~~~

21. apply_handle_ddl

+ commandTag = CreateCommandTag((Node *) command);
+
+ /* If we got a cancel signal in parsing or prior command, quit */
+ CHECK_FOR_INTERRUPTS();
+
+ /* Remove data population from the command */
+ preprocess_create_table(command);

There seems to be an assumption here that the only kind of command
processed here would be TABLE related. Maybe that is currently true,
but shouldn't there be some error checking just to make sure it cannot
execute unexpected commands?
======
src/backend/replication/pgoutput/pgoutput.c

22. PGOutputTxnData

typedef struct PGOutputTxnData
{
bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
+ List *deleted_relids;
} PGOutputTxnData;

I thought the struct comment should also have something to say about
the new field 'deleted_relids', and why it is necessary.

~~~

23. _PG_output_plugin_init

@@ -254,6 +261,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
+ cb->ddl_cb = pgoutput_ddl;
cb->commit_cb = pgoutput_commit_txn;

cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -270,6 +278,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
+ cb->stream_ddl_cb = pgoutput_ddl;

This is not a new issue -- but here are some more examples of what was
already mentioned in one of my previous WIP reviews. This patch needs
to decide if it is going to these as 'ddl_cb' or 'ddl_message_cb'
(similarly for function names and comments and string and variables
etc) and then be consistent everywhere with whatever that decision is.

~~~

24. init_txn_data

+/* Initialize the per-transaction level variable for the given transaction. */
+static void
+init_txn_data(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)

Maybe instead of 'level variable' call this something like:

SUGGESTION
Initialize the per-transaction private data for the given transaction.

~~~

25. clean_txn_data

(Same as previous comment #24).

SUGGESTION
Clean up the per-transaction private data for the given transaction.

~~~

26. init_txn_data/clean_txn_data

Hmm, this refactoring to isolate the alloc/free of this private data
and to delegate to these new functions from a number of places looked
to me more like a bug-fix which is not really related to the DDL
replication. I guess what has happened is that when more information
(field 'deleted_relids') was added to the PGOutputTxnData it exposed
this problem more visibly (??)

To summarize, I thought all this stuff about
init_txn_data/clean_txn_data refactoring should probably be removed
from this patch and instead pushed as a separate bug fix to HEAD.

What do you think?

~~~

27. pgoutput_change

+ /*
+ * We don't publish table rewrite change unless we publish the rewrite ddl
+ * message.
+ */
+ if (table_rewrite && !relentry->pubactions.pubddl_table)
+ return;
+

/change/changes/

~~~

28. pgoutput_change

+ if (table_rewrite)
+ RelationClose(relation);
+

Something doesn't seem right. AFAICT this cleanup code has been added
to match the new code at the top of the function, where the "actual
relation" was fetched.

Meanwhile, there are also some other return points where
'table_rewrite' is true:
e.g.
if (table_rewrite && !relentry->pubactions.pubddl_table)
return;

So why is there no RelationClose(relation) for those other returns?

~~~

29. is_object_published

+/* Check if the given object is published. */
+static bool
+is_object_published(LogicalDecodingContext *ctx, Oid objid)
+{
+ Relation relation = NULL;
+ RelationSyncEntry *relentry;
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ /* First check the DDL command filter. */
+ switch (get_rel_relkind(objid))
+ {
+ case RELKIND_RELATION:
+ relation = RelationIdGetRelation(objid);
+ relentry = get_rel_sync_entry(data, relation);
+ RelationClose(relation);
+
+ /*
+ * Skip sending this ddl if we don't publish ddl message or the ddl
+ * need to be published via its root relation.
+ */
+ if (!relentry->pubactions.pubddl_table ||
+ relentry->publish_as_relid != objid)
+ return false;
+
+ break;
+ default:
+ /* unsupported objects */
+ return false;
+ }
+
+ return true;
+}

The function seems back-to-front. IMO it would be better/safer if the
default (the last return) was false. So, the "Skip sending this ddl
if..." should be reversed to say "Only send this ddl if..." and return
true only in that case.

~~~

30. pgoutput_ddl

+/*
+ * Send the decoded DDL over wire.
+ */
+static void
+pgoutput_ddl(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,

Should that comment be written more like "Send the decoded DDL message"?

~~~

31. pgoutput_ddl

+ switch (cmdtype)
+ {
+ case DCT_TableDropStart:
+ {
+ MemoryContext old;

Probably 'oldctx' would be a more meaningful/typical name for this
instead of just 'old'

~~~

32. pgoutput_ddl

+ case DCT_TableAlter:
+
+ /*
+ * For table rewrite ddl, we first send the original ddl message
+ * to subscriber, then convert the upcoming rewrite INSERT to
+ * UPDATE and send them to subscriber so that the data between
+ * publisher and subscriber can always be consistent.
+ *
+ * We do this way because of two reason:
+ *
+ * (1) The data before the rewrite ddl could already be different
+ * among publisher and subscriber. To make sure the extra data in
+ * subscriber which doesn't exist in publisher also get rewritten,
+ * we need to let the subscriber execute the original rewrite ddl
+ * to rewrite all the data at first.
+ *
+ * (2) the data after executing rewrite ddl could be different
+ * among publisher and subscriber(due to different
+ * functions/operators used during rewrite), so we need to
+ * replicate the rewrite UPDATEs to keep the data consistent.
+ *
+ * TO IMPROVE: We could improve this by letting the subscriber
+ * only rewrite the extra data instead of doing fully rewrite and
+ * use the upcoming rewrite UPDATEs to rewrite the rest data.
+ * Besides, we may not need to send rewrite changes for all type
+ * of rewrite ddl, for example, it seems fine to skip sending
+ * rewrite changes for ALTER TABLE SET LOGGED as the data in the
+ * table doesn't actually be changed.
+ */
+ break;

32a.
I think this giant comment is the same as the Commit Message. A
previous WIP review ([2]?) already gave some suggestions for this
text. Please make sure the text in both places matches.

~

32b.
IIUC this comment is referring to the pgoutput_change code for
REORDER_BUFFER_CHANGE_INSERT which converts to UPDATE for
table_rewrite. If that is correct, probably this comment should
cross-reference to that other function to give the reader more
information.

~

32c.
Instead of "TO IMPROVE", I think it is more conventional to write
"XXX:" in a code comment.

~~~

33. reload_publications

+/* Reload publications if needed. */
+static void
+reload_publications(PGOutputData *data)
+{
+ MemoryContext oldctx;
+
+ if (!publications_valid)
+ {
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ if (data->publications)
+ {
+ list_free_deep(data->publications);
+ data->publications = NIL;
+ }
+ data->publications = LoadPublications(data->publication_names);
+ MemoryContextSwitchTo(oldctx);
+ publications_valid = true;
+ }
+}
+
+

33a.
AFAICT this appears to be a general cleanup refactoring that is not
really related to the DDL replication patch. So I felt this can be
removed from this patch and applied as a separate patch to HEAD.

~

33b.
Double blank lines after this function

~~~

34. get_rel_sync_entry

entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+ entry->pubactions.pubddl_table = false;
entry->new_slot = NULL;
entry->old_slot = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
Continually adding to these assignment has got a bit out of control...
IMO the code now would be better written as:
memset(entry->pubactions, 0, sizeof(entry->pubactions));

And doing this would also be consistent with the similar code for
entry->exprstate (just a couple of lines below here).

~~~

35. get_rel_sync_entry

entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubddl_table = false;

(same as above review comment #35)

IMO all this should be written more simply as:
memset(entry->pubactions, 0, sizeof(entry->pubactions));

------
[1] https://www.postgresql.org/message-id/CAHut%2BPtzpuuRFrLnjkQePq296ip_0WfmQ4CtydM9JDR6gEf%3DQw%40mail.gmail.com
[2] https://www.postgresql.org/message-id/CAHut%2BPtMkVoweJrd%3DSLw7BfpW883skasdnemoj4N19NnyjrT3Q%40mail.gmail.com
[3] https://www.postgresql.org/message-id/CAHut+PuG8J8uA5V-F-o4TczhvFSWGG1B8qL+EZO0HjWWEEYG+g@mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-general by date

  From Date Subject
Next Message Isaiah Langford 2023-04-21 02:41:13 Why does the planner reduce the planned rows when filtering single values in an array
Previous Message Tom Lane 2023-04-20 20:41:56 Re: missing something about json syntax

Browse pgsql-hackers by date

  From Date Subject
Next Message John Naylor 2023-04-21 01:40:11 Re: New committers: Nathan Bossart, Amit Langote, Masahiko Sawada
Previous Message Nathan Bossart 2023-04-20 23:32:43 Re: [PATCH] Extend the length of BackgroundWorker.bgw_library_name