Re: Support logical replication of DDLs

From: Ajin Cherian <itsajin(at)gmail(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Zheng Li <zhengli10(at)gmail(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Japin Li <japinli(at)hotmail(dot)com>, rajesh singarapu <rajesh(dot)rs0541(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Support logical replication of DDLs
Date: 2022-12-12 13:13:55
Message-ID: CAFPTHDbaFByXyzEts+wZR5JLcCit50_S_j7qMF4tNeSvSuxZDA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general pgsql-hackers

On Tue, Nov 15, 2022 at 10:57 AM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> ======
>
> src/backend/replication/logical/worker.c
>
> 32. preprocess_create_table
>
> +/* Remove the data population from the command */
> +static void
> +preprocess_create_table(RawStmt *command)
>
> The comment is too short. Needs more explanation than this.
>

fixed.

> ~~~
>
> 33. handle_create_table
>
> +/*
> + * Handle CREATE TABLE command
> + *
> + * Call AddSubscriptionRelState for CREATE TABEL command to set the relstate to
> + * SUBREL_STATE_READY so DML changes on this new table can be
> replicated without
> + * having to manually run "alter subscription ... refresh publication"
> + */
>
> Typo "TABEL"
>

fixed.

> ~~~
>
> 34. handle_create_table
>
> + switch (commandTag)
> + {
> + case CMDTAG_CREATE_TABLE:
> + {
> + CreateStmt *cstmt = (CreateStmt *) command->stmt;
> +
> + rv = cstmt->relation;
> + }
> + break;
> + default:
> + break;
> + }
> +
> + if (!rv)
> + return;
>
> This switch seems overcomplicated since the function only cares about
> CMDTAG_CREATE_TABLE.
>
> SUGGESTION
>
> if (commandTag == CMDTAG_CREATE_TABLE)
> {
> CreateStmt *cstmt = (CreateStmt *) command->stmt;
> rv = cstmt->relation;
> }
> else
> {
> return;
> }
>

fixed as suggested.

> ~
>
> 35.
>
> + if (relnamespace != InvalidOid)
> + relid = get_relname_relid(relname, relnamespace);
> + else
> + relid = RelnameGetRelid(relname);
> +
> + if (relid != InvalidOid)
> + {
>
> 35a.
> Maybe better to use the OidIsValid() macro for these places
>

fixed.

> ~
>
> 35b.
> I'm not 100% sure of this logic. Is it even *possible* for these to be
> InvalidOid -- e.g. I thought the CREATE TABLE would have failed
> already if this was the case. Maybe these checks can be changed to
> Asserts?
>

Theoretically somebody could have deleted the table in the meantime.

> ~~~
>
> 36. apply_handle_ddl
>
> +
> +static void
> +apply_handle_ddl(StringInfo s)
>
> Missing function comment
>

added comment.

> ======
>
> src/backend/replication/pgoutput/pgoutput.c
>
> 37. pgoutput_change
>
> @@ -1377,9 +1386,22 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> ReorderBufferChangeType action = change->action;
> TupleTableSlot *old_slot = NULL;
> TupleTableSlot *new_slot = NULL;
> + bool table_rewrite = false;
>
> update_replication_progress(ctx, false);
>
> + /*
> + * For heap rewrites, we might need to replicate them if the rewritten
> + * table publishes rewrite ddl message. So get the actual relation here
> + * and check the pubaction later.
> + */
> + if (relation->rd_rel->relrewrite)
> + {
> + table_rewrite = true;
> + relation = RelationIdGetRelation(relation->rd_rel->relrewrite);
> + targetrel = relation;
> + }
> +
> if (!is_publishable_relation(relation))
> return;
>
> @@ -1413,6 +1435,13 @@ pgoutput_change(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
> Assert(false);
> }
>
> + /*
> + * We don't publish table rewrite change unless we publish the rewrite ddl
> + * message.
> + */
> + if (table_rewrite && !relentry->pubactions.pubddl)
> + return;
> +
>
> Something does not seem right. Other code later in this function takes
> care to call RelationClose(relation), but in the above change, the
> logic is just returning without closing anything.
>

There is code just above this where you return if the publication action
does not match the action.

> ~~~
>
> 38. pgoutput_message
>
> @@ -1671,8 +1714,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
> ReorderBufferTXN *txn,
>
> static void
> pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> - XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
> - const char *message)
> + XLogRecPtr message_lsn, bool transactional,
> + const char *prefix, Size sz, const char *message)
> {
>
> This change of wrapping seems unrelated , so should not be done in this patch.
>

removed.

> ~~~
>
> 39. pgoutput_ddlmessage
>
> +static void
> +pgoutput_ddlmessage(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> + XLogRecPtr message_lsn,
> + const char *prefix, Oid relid, DeparsedCommandType cmdtype,
> + Size sz, const char *message)
>
> Missing function comment.
>

Added comment.

> ~
>
> 40.
>
> + switch (cmdtype)
>
> 40a.
> Might be tidier to have a consistent space *before* each case of this switch.
>

fixed.

> ~
>
> 40b.
> I felt it was too confusing having some of the switch case break and
> some of the switch cases return from the function -- e.g It seems
> difficult to know what conditions will execute the code that follows
> the switch. Maybe all this needs to be refactored somehow, or just
> commented on more.
>

added more comments.

> ======
>
> src/bin/pg_dump/pg_dump.c
>
> 41. getPublications
>
> - if (fout->remoteVersion >= 130000)
> + if (fout->remoteVersion >= 150000)
>
> Should be >= 160000, right?
>

fixed.

> ~
>
> 42.
>
> else if (fout->remoteVersion >= 110000)
> appendPQExpBufferStr(query,
> "SELECT p.tableoid, p.oid, p.pubname, "
> "p.pubowner, "
> - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete,
> p.pubtruncate, false AS pubviaroot "
> + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete,
> p.pubtruncate, false as p.pubddl, false AS pubviaroot "
> "FROM pg_publication p");
> else
> appendPQExpBufferStr(query,
> "SELECT p.tableoid, p.oid, p.pubname, "
> "p.pubowner, "
> - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS
> pubtruncate, false AS pubviaroot "
> + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS
> pubtruncate, false as p.pubddl, false AS pubviaroot "
> "FROM pg_publication p");
>
> Use uppercase 'AS' for consistency with other code.
>

fixed.

> ======
>
> src/bin/pg_dump/pg_dump.h
>
> 43. PublicationInfo
>
> @@ -620,6 +620,7 @@ typedef struct _PublicationInfo
> bool pubdelete;
> bool pubtruncate;
> bool pubviaroot;
> + bool pubddl;
> } PublicationInfo;
>
> IMO the new member should be adjacent to the other 'publish' parameter
> values like pubdelete/pubtruncate.
>

I have moved this member up.

> ======
>
> src/bin/psql/describe.c
>
> 44. listPublications
>
> + if (pset.sversion >= 140000)
> + appendPQExpBuffer(&buf,
> + ",\n pubddl AS \"%s\"",
> + gettext_noop("DDLs"));
>
> 44a.
> Should that be 160000?
>

updated.

> ~
>
> 44b.
> IMO it would be better if "DLLs" column appeared adjacent to that the
> other 'publish' parameter option values. (e.g. these are not even the
> same column ordering as pg_dump).
>
> ~~~
>
> 45. describePublications
>
> has_pubtruncate = (pset.sversion >= 110000);
> has_pubviaroot = (pset.sversion >= 130000);
> + has_pubddl = (pset.sversion >= 150000);
>
> Shouldn't that be 160000?
>
> ~
>
> 46.
>
> @@ -6313,6 +6319,9 @@ describePublications(const char *pattern)
> if (has_pubviaroot)
> appendPQExpBufferStr(&buf,
> ", pubviaroot");
> + if (has_pubddl)
> + appendPQExpBufferStr(&buf,
> + ", pubddl");
>
> IMO it would be better if "DLLs" column appeared adjacent to that the
> other 'publish' parameter option values. (e.g. these are not even the
> same column ordering as pg_dump).
>

Will fix this in a future patch.

>
> ======
>
> src/include/catalog/pg_proc.dat
>
> 47.
>
> +{ oid => '4644', descr => 'trigger for ddl command deparse',
> + proname => 'publication_deparse_ddl_command_end', prorettype =>
> 'event_trigger',
> + proargtypes => '', prosrc => 'publication_deparse_ddl_command_end' },
>
> Why doesn't the description say 'end'?
>

fixed this.

> ======
>
> src/include/catalog/pg_publication.h
>
> 48. FormData_pg_publication
>
> +
> + /* true if table creations are published */
> + bool pubddl;
> } FormData_pg_publication;
>
> Why just table publications? I thought it was for EVERYTHING.
>

fixed.

> ~~~
>
> 49. PublicationActions
>
> + bool pubddl;
> } PublicationActions;
>
> This might be OK for POC, but for the real feature, I think this
> should be more fine-grained than this all-or-nothing DDL.
>

yes, we will need to rethink this.

> ======
>
> src/include/replication/ddlmessage.h
>
> 50.
>
> +{
> + Oid dbId; /* database Oid emitted from */
> + Size prefix_size; /* length of prefix */
> + Oid relid; /* id of the table */
> + DeparsedCommandType cmdtype; /* type of sql command */
> + Size message_size; /* size of the message */
> +
> + /*
> + * payload, including null-terminated prefix of length prefix_size
> + */
> + char message[FLEXIBLE_ARRAY_MEMBER];
> +} xl_logical_ddl_message;
>
>
> 50a.
> The prefix_size comment needs to say /* length of the prefix
> (including '\0' terminator) */
>

fixed.

> ~
>
> 50b.
> 'relid' seems specific to TABLE DDL. Will future versions have many
> more Oid members here? Or should this be a union member or a generic
> name like 'objid'?
>

it is specific to tables, this is only to check if the table is part
of publication.
All other objects are taken as a catch-all.

> ~~~
>
> 51. XLOG_LOGICAL_DDL_MESSAGE
>
> +/* RMGR API*/
> +#define XLOG_LOGICAL_DDL_MESSAGE 0x00
>
> 0x00 is same value as XLOG_LOGICAL_MESSAGE in message.h. That doesn't
> seem correct because then how will those different messages be
> identified?
>

Currently logical messages are not handled by subscriptions, so the
same value is overloaded.

> ======
>
> src/include/replication/logicalproto.h
>
> 52. LogicalRepMsgType
>
> @@ -61,6 +61,7 @@ typedef enum LogicalRepMsgType
> LOGICAL_REP_MSG_RELATION = 'R',
> LOGICAL_REP_MSG_TYPE = 'Y',
> LOGICAL_REP_MSG_MESSAGE = 'M',
> + LOGICAL_REP_MSG_DDLMESSAGE = 'L',
> LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
>
> The name already includes _MSG_ so why say MESSAGE again? IMO this
> should be called just LOGICAL_REP_MSG_DDL. See general comment.
>

fixed.

> ~~~
>
> 53.
>
> extern void logicalrep_write_message(StringInfo out, TransactionId
> xid, XLogRecPtr lsn,
> - bool transactional, const char *prefix, Size sz, const char *message);
> + bool transactional, const char *prefix,
> + Size sz, const char *message);
>
> Modifying the wrapping of this unrelated function should not be done
> in this patch.
>

fixed.

> ======
>
> src/include/replication/reorderbuffer.h
>
> 54. REORDER_BUFFER_CHANGE_DDLMESSAGE
>
> @@ -56,6 +58,7 @@ typedef enum ReorderBufferChangeType
> REORDER_BUFFER_CHANGE_INSERT,
> REORDER_BUFFER_CHANGE_UPDATE,
> REORDER_BUFFER_CHANGE_DELETE,
> + REORDER_BUFFER_CHANGE_DDLMESSAGE,
>
> Why not call it REORDER_BUFFER_CHANGE_DDL? -- see general review comment
>

fixed.

> ~~~
>
> 55. ReorderBufferChange
>
> + /* DDL Message. */
> + struct
> + {
> + char *prefix;
> + Size message_size;
> + char *message;
> + Oid relid;
> + DeparsedCommandType cmdtype;
> + } ddlmsg;
> +
>
> Why not call it ddl? -- see general review comment
>

fixed.

> ======
>
> src/test/regress/expected/psql.out
>
> 56.
>
> \dRp "no.such.publication"
> - List of publications
> - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
> -------+-------+------------+---------+---------+---------+-----------+----------
> + List of publications
> + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
> | Via root | DDLs
> +------+-------+------------+---------+---------+---------+-----------+----------+------
> (0 rows)
>
> I wondered if "DDLs" belongs adjacent to the
> Inserts/Updates/Deletes/Trucates because those are the other "publish"
> parameters just like this.
>

Will fix this in a future patch.

> ======
>
> src/test/regress/expected/publication.out
>
> 57.
>
> (Ditto comment for psql.out)
>
> I wondered if "DDLs" belongs adjacent to the
> Inserts/Updates/Deletes/Trucates because those are the other "publish"
> parameters just like this.
>
> ~~~
>
> 58.
>
> Looks like there is a missing regress test case where you actually set
> the publish='ddl' and then verify that the DDLs column is correctly
> set 't'?
>

lots of tests missing, will update in a future patch.

> ======
>
> 59. MISC = typedefs.list
>
> There are missing some typedefs.list changes for this patch. At least
> the following:
>
> e.g.
> - DeparsedCommandType (from ddlmessage.h)
> - xl_logical_ddl_message (from ddlmessage.h)
> - LogicalDecodeDDLMessageCB (from output_plugin.h)
> - LogicalDecodeStreamDDLMessageCB (from output_plugin.h)
> - ReorderBufferDDLMessageCB (from reorderbuffer.h)
> - ReorderBufferStreamDDLMessageCB (from reorderbuffer.h)
>

added.

regards,
Ajin Cherian
Fujitsu Australia

Attachment Content-Type Size
v46-0004-Test-cases-for-DDL-replication.patch application/octet-stream 24.6 KB
v46-0001-Functions-to-deparse-DDL-commands.patch application/octet-stream 317.5 KB
v46-0003-Support-CREATE-TABLE-AS-SELECT-INTO.patch application/octet-stream 15.6 KB
v46-0002-Support-DDL-replication.patch application/octet-stream 132.2 KB
v46-0005-Skip-ALTER-TABLE-subcommands-generated-for.patch application/octet-stream 2.2 KB
v46-0006-Support-DDL-replication-of-alter-type-having-USI.patch application/octet-stream 8.9 KB
v46-0007-Introduce-the-test_ddl_deparse_regress-test-module.-.patch application/octet-stream 41.0 KB

In response to

Responses

Browse pgsql-general by date

  From Date Subject
Next Message Adrien Nayrat 2022-12-12 16:53:40 PITR and instance without any activity
Previous Message Ajin Cherian 2022-12-12 12:20:29 Re: Support logical replication of DDLs

Browse pgsql-hackers by date

  From Date Subject
Next Message Мельников Игорь 2022-12-12 13:16:21 Re: Add PL/pgSQL extra check no_data_found
Previous Message Ashutosh Bapat 2022-12-12 13:05:45 Re: Infinite Interval