From bdded82050841d3b71308ce82110efd21d99ea53 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Sun, 13 Mar 2022 07:38:46 +0100 Subject: [PATCH] fixup! Add support for decoding sequences to built-in replication --- doc/src/sgml/protocol.sgml | 119 ++++++++++++++++++++++ doc/src/sgml/ref/alter_publication.sgml | 2 +- doc/src/sgml/ref/create_publication.sgml | 42 +++++--- src/backend/catalog/pg_publication.c | 8 +- src/backend/commands/subscriptioncmds.c | 2 +- src/backend/parser/gram.y | 14 --- src/backend/replication/logical/worker.c | 2 +- src/bin/pg_dump/pg_dump.c | 6 +- src/test/regress/expected/publication.out | 2 +- src/test/regress/sql/object_address.sql | 1 + src/test/regress/sql/publication.sql | 2 +- src/test/subscription/t/029_sequences.pl | 14 +-- 12 files changed, 165 insertions(+), 49 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 9178c779ba..49c05e1866 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -7055,6 +7055,125 @@ Logical Replication Message Formats + + +Sequence + + + + + + + + Byte1('X') + + + + Identifies the message as a sequence message. + + + + + + Int32 (TransactionId) + + + + Xid of the transaction (only present for streamed transactions). + This field is available since protocol version 2. + + + + + + Int8(0) + + + + Flags; currently unused. + + + + + + Int64 (XLogRecPtr) + + + + The LSN of FIXME. + + + + + + String + + + + Namespace (empty string for pg_catalog). + + + + + + String + + + + Relation name. + + + + + + + Int8 + + + + 1 if the sequence update is transactions, 0 otherwise. + + + + + + + Int64 + + + + last_value value of the sequence. + + + + + + + Int64 + + + + log_cnt value of the sequence. + + + + + + + Int8 + + + + is_called value of the sequence. + + + + + + + + + Type diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index 36c9a5f438..5dacb732b6 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -31,7 +31,7 @@ where publication_object is one of: TABLE [ ONLY ] table_name [ * ] [ WHERE ( expression ) ] [, ... ] - SEQUENCE sequence_name [ * ] [, ... ] + SEQUENCE sequence_name [, ... ] ALL TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] ALL SEQUENCES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index f72318e97d..286529e749 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -66,6 +66,20 @@ Parameters + + FOR SEQUENCE + + + Specifies a list of sequences to add to the publication. + + + + Specifying a sequence that is part of a schema specified by FOR + ALL SEQUENCES IN SCHEMA is not supported. + + + + FOR TABLE @@ -111,26 +125,28 @@ Parameters + FOR ALL SEQUENCES FOR ALL TABLES - Marks the publication as one that replicates changes for all tables in - the database, including tables created in the future. + Marks the publication as one that replicates changes for all sequences/tables in + the database, including sequences/tables created in the future. + FOR ALL SEQUENCES IN SCHEMA FOR ALL TABLES IN SCHEMA - Marks the publication as one that replicates changes for all tables in - the specified list of schemas, including tables created in the future. + Marks the publication as one that replicates changes for all sequences/tables in + the specified list of schemas, including sequences/tables created in the future. - Specifying a schema along with a table which belongs to the specified - schema using FOR TABLE is not supported. + Specifying a schema along with a sequence/table which belongs to the specified + schema using FOR SEQUENCE/FOR TABLE is not supported. @@ -205,10 +221,9 @@ Parameters Notes - If FOR TABLE, FOR ALL TABLES or - FOR ALL TABLES IN SCHEMA are not specified, then the - publication starts out with an empty set of tables. That is useful if - tables or schemas are to be added later. + If FOR TABLE, FOR SEQUENCE, etc. is + not specified, then the publication starts out with an empty set of tables + and sequences. That is useful if objects are to be added later. @@ -223,10 +238,9 @@ Notes - To add a table to a publication, the invoking user must have ownership - rights on the table. The FOR ALL TABLES and - FOR ALL TABLES IN SCHEMA clauses require the invoking - user to be a superuser. + To add a table or sequence to a publication, the invoking user must have + ownership rights on the table or sequence. The FOR ALL + ... clauses require the invoking user to be a superuser. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index d866e8a9b2..8e26e0cee2 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -636,7 +636,7 @@ GetAllSequencesPublications(void) SysScanDesc scan; HeapTuple tup; - /* Find all publications that are marked as for all tables. */ + /* Find all publications that are marked as for all sequences. */ rel = table_open(PublicationRelationId, AccessShareLock); ScanKeyInit(&scankey, @@ -892,11 +892,7 @@ GetAllSchemaPublicationRelations(Oid pubid, bool sequences, } /* - * Gets list of all relation published by FOR ALL TABLES publication(s). - * - * If the publication publishes partition changes via their respective root - * partitioned tables, we must exclude partitions in favor of including the - * root partitioned tables. + * Gets list of all relation published by FOR ALL SEQUENCES publication(s). */ List * GetAllSequencesPublicationRelations(void) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5beb67e765..1c70c4369a 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1859,7 +1859,7 @@ fetch_sequence_list(WalReceiverConn *wrconn, List *publications) if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not receive list of replicated tables from the publisher: %s", + (errmsg("could not receive list of replicated sequences from the publisher: %s", res->err))); /* Process tables. */ diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 9097ac3fab..6ff0ddd62b 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9705,13 +9705,6 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec * * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options] * - * pub_obj is one of: - * - * TABLE table [, ...] - * SEQUENCE table [, ...] - * ALL TABLES IN SCHEMA schema [, ...] - * ALL SEQUENCES IN SCHEMA schema [, ...] - * *****************************************************************************/ CreatePublicationStmt: @@ -9868,13 +9861,6 @@ pub_obj_list: PublicationObjSpec * * ALTER PUBLICATION name SET pub_obj [, ...] * - * pub_obj is one of: - * - * TABLE table_name [, ...] - * SEQUENCE table_name [, ...] - * ALL TABLES IN SCHEMA schema_name [, ...] - * ALL SEQUENCES IN SCHEMA schema_name [, ...] - * *****************************************************************************/ AlterPublicationStmt: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 860c31fa05..1282c15f92 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -142,9 +142,9 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" +#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" -#include "commands/sequence.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/execPartition.h" diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index ef8c6e43c6..35a8fc7631 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3819,19 +3819,19 @@ getPublications(Archive *fout, int *numPublications) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "p.pubowner, " - "p.puballtables, false AS p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS p.pubsequence, p.pubviaroot " + "p.puballtables, false AS puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubsequence, p.pubviaroot " "FROM pg_publication p"); else if (fout->remoteVersion >= 110000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "p.pubowner, " - "p.puballtables, false AS p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS p.pubsequence, false AS pubviaroot " + "p.puballtables, false AS puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubsequence, false AS pubviaroot " "FROM pg_publication p"); else appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "p.pubowner, " - "p.puballtables, false AS p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS p.pubsequence, false AS pubviaroot " + "p.puballtables, false AS puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubsequence, false AS pubviaroot " "FROM pg_publication p"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 620fab87e6..92c50b13ec 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -262,7 +262,7 @@ Sequences from schemas: SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_forschema FOR ALL SEQUENCES IN SCHEMA pub_test; RESET client_min_messages; --- fail - can't create publication with schema and table of the same schema +-- fail - can't create publication with schema and sequence of the same schema CREATE PUBLICATION testpub_for_seq_schema FOR ALL SEQUENCES IN SCHEMA pub_test, SEQUENCE pub_test.testpub_seq1; ERROR: cannot add relation "pub_test.testpub_seq1" to publication DETAIL: Sequence's schema "pub_test" is already part of the publication or part of the specified schema list. diff --git a/src/test/regress/sql/object_address.sql b/src/test/regress/sql/object_address.sql index f90afad804..2f40156eb4 100644 --- a/src/test/regress/sql/object_address.sql +++ b/src/test/regress/sql/object_address.sql @@ -143,6 +143,7 @@ CREATE STATISTICS addr_nsp.gentable_stat ON a, b FROM addr_nsp.gentable; SELECT pg_get_object_address('publication', '{one,two}', '{}'); SELECT pg_get_object_address('subscription', '{one}', '{}'); SELECT pg_get_object_address('subscription', '{one,two}', '{}'); + -- test successful cases WITH objects (type, name, args) AS (VALUES ('table', '{addr_nsp, gentable}'::text[], '{}'::text[]), diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index af665395e1..5043c4bbba 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -144,7 +144,7 @@ CREATE PUBLICATION testpub_forsequence FOR SEQUENCE testpub_seq0; SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub_forschema FOR ALL SEQUENCES IN SCHEMA pub_test; RESET client_min_messages; --- fail - can't create publication with schema and table of the same schema +-- fail - can't create publication with schema and sequence of the same schema CREATE PUBLICATION testpub_for_seq_schema FOR ALL SEQUENCES IN SCHEMA pub_test, SEQUENCE pub_test.testpub_seq1; -- fail - can't add a sequence of the same schema to the schema publication ALTER PUBLICATION testpub_forschema ADD SEQUENCE pub_test.testpub_seq1; diff --git a/src/test/subscription/t/029_sequences.pl b/src/test/subscription/t/029_sequences.pl index cdd7f7f344..9ae3c03d7d 100644 --- a/src/test/subscription/t/029_sequences.pl +++ b/src/test/subscription/t/029_sequences.pl @@ -46,7 +46,7 @@ "ALTER PUBLICATION seq_pub ADD SEQUENCE s"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub WITH (slot_name = seq_sub_slot)" + "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub" ); $node_publisher->wait_for_catchup('seq_sub'); @@ -73,7 +73,7 @@ )); is( $result, '132|0|t', - 'check replicated sequence values on subscriber'); + 'initial test data replicated'); # advance the sequence in a rolled-back transaction - the rollback @@ -96,7 +96,7 @@ )); is( $result, '231|0|t', - 'check replicated sequence values on subscriber'); + 'advance sequence in rolled-back transaction'); # create a new sequence and roll it back - should not be replicated, due to @@ -119,7 +119,7 @@ )); is( $result, '1|0|f', - 'check replicated sequence values on subscriber'); + 'create new sequence and roll it back'); # create a new sequence, advance it in a rolled-back transaction, but commit @@ -150,7 +150,7 @@ )); is( $result, '132|0|t', - 'check replicated sequence values on subscriber'); + 'create sequence, advance it in rolled-back transaction, but commit the create'); # advance the new sequence in a transaction, and roll it back - the rollback @@ -173,7 +173,7 @@ )); is( $result, '231|0|t', - 'check replicated sequence values on subscriber'); + 'advance the new sequence in a transaction and roll it back'); # advance the sequence in a subtransaction - the subtransaction gets rolled @@ -196,7 +196,7 @@ )); is( $result, '330|0|t', - 'check replicated sequence values on subscriber'); + 'advance sequence in a subtransaction'); done_testing(); -- 2.35.1