From e2773f54d42fb7ee41562d720861e5ede9fecc6f Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Tue, 3 Apr 2018 22:13:07 -0400 Subject: [PATCH v18 2/2] Logical replication support for TRUNCATE --- doc/src/sgml/catalogs.sgml | 8 ++ doc/src/sgml/logical-replication.sgml | 13 +-- doc/src/sgml/protocol.sgml | 46 +++++++++ doc/src/sgml/ref/create_publication.sgml | 10 +- src/backend/catalog/pg_publication.c | 1 + src/backend/commands/publicationcmds.c | 20 +++- src/backend/replication/logical/proto.c | 45 +++++++++ src/backend/replication/logical/worker.c | 57 +++++++++++ src/backend/replication/pgoutput/pgoutput.c | 16 ++- src/backend/utils/cache/relcache.c | 3 +- src/bin/pg_dump/pg_dump.c | 33 ++++-- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/t/002_pg_dump.pl | 2 +- src/bin/psql/describe.c | 26 ++++- src/include/catalog/pg_publication.h | 7 +- src/include/replication/logicalproto.h | 4 + src/test/regress/expected/publication.out | 84 ++++++++-------- src/test/subscription/t/010_truncate.pl | 106 ++++++++++++++++++++ 18 files changed, 406 insertions(+), 76 deletions(-) create mode 100644 src/test/subscription/t/010_truncate.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index d6a9d8c580..e8efa13e8d 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5518,6 +5518,14 @@ <structname>pg_publication</structname> Columns If true, DELETE operations are replicated for tables in the publication. + + + pubtruncate + bool + + If true, TRUNCATE operations are replicated for + tables in the publication. + diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 75551d8ee1..151e773fc2 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -108,8 +108,8 @@ Publication Publications can choose to limit the changes they produce to - any combination of INSERT, UPDATE, and - DELETE, similar to how triggers are fired by + any combination of INSERT, UPDATE, + DELETE, and TRUNCATE, similar to how triggers are fired by particular event types. By default, all operation types are replicated. @@ -364,15 +364,6 @@ Restrictions - - - TRUNCATE commands are not replicated. This can, of - course, be worked around by using DELETE instead. To - avoid accidental TRUNCATE invocations, you can revoke - the TRUNCATE privilege from tables. - - - Large objects (see ) are not replicated. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 8c488506fa..83fe7804d0 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6763,6 +6763,52 @@ Logical Replication Message Formats + + +Truncate + + + + + + + + Byte1('T') + + + + Identifies the message as a truncate message. + + + + + + Int32 + + + + ID of the relation corresponding to the ID in the relation + message. + + + + + + Int8 + + + + Option bits for TRUNCATE: + 1 for CASCADE, 2 for RESTART IDENTITY + + + + + + + + + diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index bfe12d5f41..99f87ca393 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -106,10 +106,11 @@ Parameters This parameter determines which DML operations will be published by the new publication to the subscribers. The value is comma-separated list of operations. The allowed operations are - insert, update, and - delete. The default is to publish all actions, + insert, update, + delete, and truncate. + The default is to publish all actions, and so the default value for this option is - 'insert, update, delete'. + 'insert, update, delete, truncate'. @@ -168,8 +169,7 @@ Notes - TRUNCATE and DDL operations - are not published. + DDL operations are not published. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index ba18258ebb..ec3bd1d22d 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -376,6 +376,7 @@ GetPublication(Oid pubid) pub->pubactions.pubinsert = pubform->pubinsert; pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; + pub->pubactions.pubtruncate = pubform->pubtruncate; ReleaseSysCache(tup); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 9c5aa9ebc2..29992d4a0e 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -62,7 +62,8 @@ parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, - bool *publish_delete) + bool *publish_delete, + bool *publish_truncate) { ListCell *lc; @@ -72,6 +73,7 @@ parse_publication_options(List *options, *publish_insert = true; *publish_update = true; *publish_delete = true; + *publish_truncate = true; /* Parse options */ foreach(lc, options) @@ -96,6 +98,7 @@ parse_publication_options(List *options, *publish_insert = false; *publish_update = false; *publish_delete = false; + *publish_truncate = false; *publish_given = true; publish = defGetString(defel); @@ -116,6 +119,8 @@ parse_publication_options(List *options, *publish_update = true; else if (strcmp(publish_opt, "delete") == 0) *publish_delete = true; + else if (strcmp(publish_opt, "truncate") == 0) + *publish_truncate = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -145,6 +150,7 @@ CreatePublication(CreatePublicationStmt *stmt) bool publish_insert; bool publish_update; bool publish_delete; + bool publish_truncate; AclResult aclresult; /* must have CREATE privilege on database */ @@ -181,7 +187,8 @@ CreatePublication(CreatePublicationStmt *stmt) parse_publication_options(stmt->options, &publish_given, &publish_insert, - &publish_update, &publish_delete); + &publish_update, &publish_delete, + &publish_truncate); values[Anum_pg_publication_puballtables - 1] = BoolGetDatum(stmt->for_all_tables); @@ -191,6 +198,8 @@ CreatePublication(CreatePublicationStmt *stmt) BoolGetDatum(publish_update); values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); + values[Anum_pg_publication_pubtruncate - 1] = + BoolGetDatum(publish_truncate); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -237,11 +246,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, bool publish_insert; bool publish_update; bool publish_delete; + bool publish_truncate; ObjectAddress obj; parse_publication_options(stmt->options, &publish_given, &publish_insert, - &publish_update, &publish_delete); + &publish_update, &publish_delete, + &publish_truncate); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -258,6 +269,9 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); replaces[Anum_pg_publication_pubdelete - 1] = true; + + values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate); + replaces[Anum_pg_publication_pubtruncate - 1] = true; } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 948343e4ae..2fa6f8393d 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -26,6 +26,9 @@ */ #define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define TRUNCATE_CASCADE (1<<0) +#define TRUNCATE_RESTART_SEQS (1<<1) + static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple); @@ -292,6 +295,48 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) return relid; } +/* + * Write TRUNCATE to the output stream. + */ +void +logicalrep_write_truncate(StringInfo out, Relation rel, + bool cascade, bool restart_seqs) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'T'); /* action TRUNCATE */ + + /* use Oid as relation identifier */ + pq_sendint32(out, RelationGetRelid(rel)); + + /* encode and send truncate flags */ + if (cascade) + flags |= TRUNCATE_CASCADE; + if (restart_seqs) + flags |= TRUNCATE_RESTART_SEQS; + pq_sendint8(out, flags); +} + +/* + * Read TRUNCATE from stream. + */ +LogicalRepRelId +logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs) +{ + LogicalRepRelId relid; + uint8 flags; + + /* read the relation id */ + relid = pq_getmsgint(in, 4); + + /* read and decode truncate flags */ + flags = pq_getmsgint(in, 1); + *cascade = (flags & TRUNCATE_CASCADE) > 0; + *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0; + + return relid; +} + /* * Write relation description to the output stream. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fdace7eea2..7c9c24ae9f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -30,10 +30,12 @@ #include "access/xact.h" #include "access/xlog_internal.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -83,6 +85,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/timeout.h" #include "utils/tqual.h" #include "utils/syscache.h" @@ -888,6 +891,56 @@ apply_handle_delete(StringInfo s) CommandCounterIncrement(); } +/* + * Handle TRUNCATE message. + * + * TODO: FDW support + */ +static void +apply_handle_truncate(StringInfo s) +{ + LogicalRepRelMapEntry *rel; + LogicalRepRelId relid; + bool cascade = false; + bool restart_seqs = false; + List *rels = NIL; + List *relids = NIL; + List *relids_logged = NIL; + + ensure_transaction(); + + relid = logicalrep_read_truncate(s, &cascade, &restart_seqs); + rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + return; + } + + /* Check if we can do the truncate. */ + check_relation_updatable(rel); + + rels = lappend(rels, rel->localrel); + relids = lappend_oid(relids, rel->localreloid); + if (RelationIsLogicallyLogged(rel->localrel)) + relids_logged = lappend_oid(relids, rel->localreloid); + + /* + * Even if we used CASCADE on the upstream master we explicitly + * default to replaying changes without further cascading. + * This might be later changeable with a user specified option. + */ + ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); + + logicalrep_rel_close(rel, NoLock); + + CommandCounterIncrement(); +} + /* * Logical replication protocol message dispatcher. @@ -919,6 +972,10 @@ apply_dispatch(StringInfo s) case 'D': apply_handle_delete(s); break; + /* TRUNCATE */ + case 'T': + apply_handle_truncate(s); + break; /* RELATION */ case 'R': apply_handle_relation(s); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index aa9cf5b54e..27d28259f2 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -281,6 +281,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!relentry->pubactions.pubdelete) return; break; + case REORDER_BUFFER_CHANGE_TRUNCATE: + if (!relentry->pubactions.pubtruncate) + return; + break; default: Assert(false); } @@ -354,6 +358,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, else elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); break; + case REORDER_BUFFER_CHANGE_TRUNCATE: + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_truncate(ctx->out, relation, + change->data.truncate_msg.cascade, + change->data.truncate_msg.restart_seqs); + OutputPluginWrite(ctx, true); + break; default: Assert(false); } @@ -504,7 +515,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) * we only need to consider ones that the subscriber requested. */ entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; foreach(lc, data->publications) { @@ -515,10 +526,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete) + entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 69a2114a10..6a67c185b0 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5339,6 +5339,7 @@ GetRelationPublicationActions(Relation relation) pubactions->pubinsert |= pubform->pubinsert; pubactions->pubupdate |= pubform->pubupdate; pubactions->pubdelete |= pubform->pubdelete; + pubactions->pubtruncate |= pubform->pubtruncate; ReleaseSysCache(tup); @@ -5347,7 +5348,7 @@ GetRelationPublicationActions(Relation relation) * other publications. */ if (pubactions->pubinsert && pubactions->pubupdate && - pubactions->pubdelete) + pubactions->pubdelete && pubactions->pubtruncate) break; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index b8d65a9ee3..f5286f890e 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3712,6 +3712,7 @@ getPublications(Archive *fout) int i_pubinsert; int i_pubupdate; int i_pubdelete; + int i_pubtruncate; int i, ntups; @@ -3723,12 +3724,20 @@ getPublications(Archive *fout) resetPQExpBuffer(query); /* Get the publications. */ - appendPQExpBuffer(query, - "SELECT p.tableoid, p.oid, p.pubname, " - "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete " - "FROM pg_publication p", - username_subquery); + if (fout->remoteVersion >= 110000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate " + "FROM pg_publication p", + username_subquery); + else + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate " + "FROM pg_publication p", + username_subquery); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -3742,6 +3751,7 @@ getPublications(Archive *fout) i_pubinsert = PQfnumber(res, "pubinsert"); i_pubupdate = PQfnumber(res, "pubupdate"); i_pubdelete = PQfnumber(res, "pubdelete"); + i_pubtruncate = PQfnumber(res, "pubtruncate"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -3762,6 +3772,8 @@ getPublications(Archive *fout) (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0); pubinfo[i].pubdelete = (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0); + pubinfo[i].pubtruncate = + (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); if (strlen(pubinfo[i].rolname) == 0) write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n", @@ -3829,6 +3841,15 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo) first = false; } + if (pubinfo->pubtruncate) + { + if (!first) + appendPQExpBufferStr(query, ", "); + + appendPQExpBufferStr(query, "truncate"); + first = false; + } + appendPQExpBufferStr(query, "');\n"); ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId, diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index a4d6d926a8..c2314758de 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -595,6 +595,7 @@ typedef struct _PublicationInfo bool pubinsert; bool pubupdate; bool pubdelete; + bool pubtruncate; } PublicationInfo; /* diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 1bea6ae81d..c14664ddaf 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -4401,7 +4401,7 @@ create_order => 50, create_sql => 'CREATE PUBLICATION pub1;', regexp => qr/^ - \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete');\E + \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate');\E /xm, like => { binary_upgrade => 1, diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 0c3be1f504..75a1e42cee 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5187,7 +5187,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -5207,13 +5207,17 @@ listPublications(const char *pattern) " puballtables AS \"%s\",\n" " pubinsert AS \"%s\",\n" " pubupdate AS \"%s\",\n" - " pubdelete AS \"%s\"\n", + " pubdelete AS \"%s\"", gettext_noop("Name"), gettext_noop("Owner"), gettext_noop("All tables"), gettext_noop("Inserts"), gettext_noop("Updates"), gettext_noop("Deletes")); + if (pset.sversion >= 110000) + appendPQExpBuffer(&buf, + ",\n pubtruncate AS \"%s\"", + gettext_noop("Truncates")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -5254,6 +5258,7 @@ describePublications(const char *pattern) PQExpBufferData buf; int i; PGresult *res; + bool has_pubtruncate; if (pset.sversion < 100000) { @@ -5265,13 +5270,19 @@ describePublications(const char *pattern) return true; } + has_pubtruncate = (pset.sversion >= 110000); + initPQExpBuffer(&buf); printfPQExpBuffer(&buf, "SELECT oid, pubname,\n" " pg_catalog.pg_get_userbyid(pubowner) AS owner,\n" - " puballtables, pubinsert, pubupdate, pubdelete\n" - "FROM pg_catalog.pg_publication\n"); + " puballtables, pubinsert, pubupdate, pubdelete"); + if (has_pubtruncate) + appendPQExpBuffer(&buf, + ", pubtruncate"); + appendPQExpBuffer(&buf, + "\nFROM pg_catalog.pg_publication\n"); processSQLNamePattern(pset.db, &buf, pattern, false, false, NULL, "pubname", NULL, @@ -5317,6 +5328,9 @@ describePublications(const char *pattern) printTableOpt myopt = pset.popt.topt; printTableContent cont; + if (has_pubtruncate) + ncols++; + initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); printTableInit(&cont, &myopt, title.data, ncols, nrows); @@ -5326,12 +5340,16 @@ describePublications(const char *pattern) printTableAddHeader(&cont, gettext_noop("Inserts"), true, align); printTableAddHeader(&cont, gettext_noop("Updates"), true, align); printTableAddHeader(&cont, gettext_noop("Deletes"), true, align); + if (has_pubtruncate) + printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false); + if (has_pubtruncate) + printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); if (!puballtables) { diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 37e77b8be7..b643c489cd 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -49,6 +49,9 @@ CATALOG(pg_publication,6104) /* true if deletes are published */ bool pubdelete; + /* true if truncates are published */ + bool pubtruncate; + } FormData_pg_publication; /* ---------------- @@ -63,19 +66,21 @@ typedef FormData_pg_publication *Form_pg_publication; * ---------------- */ -#define Natts_pg_publication 6 +#define Natts_pg_publication 7 #define Anum_pg_publication_pubname 1 #define Anum_pg_publication_pubowner 2 #define Anum_pg_publication_puballtables 3 #define Anum_pg_publication_pubinsert 4 #define Anum_pg_publication_pubupdate 5 #define Anum_pg_publication_pubdelete 6 +#define Anum_pg_publication_pubtruncate 7 typedef struct PublicationActions { bool pubinsert; bool pubupdate; bool pubdelete; + bool pubtruncate; } PublicationActions; typedef struct Publication diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 116f16f42d..f635ed82c1 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -97,6 +97,10 @@ extern void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); +extern void logicalrep_write_truncate(StringInfo out, Relation rel, + bool cascade, bool restart_seqs); +extern LogicalRepRelId logicalrep_read_truncate(StringInfo in, + bool *cascade, bool *restart_seqs); extern void logicalrep_write_rel(StringInfo out, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, Oid typoid); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 0c86c647bc..afbbdd543d 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -21,20 +21,20 @@ ERROR: unrecognized publication parameter: foo CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum'); ERROR: unrecognized "publish" value: "cluster" \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes ---------------------+--------------------------+------------+---------+---------+--------- - testpib_ins_trunct | regress_publication_user | f | t | f | f - testpub_default | regress_publication_user | f | f | t | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------+--------------------------+------------+---------+---------+---------+----------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f + testpub_default | regress_publication_user | f | f | t | f | f (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes ---------------------+--------------------------+------------+---------+---------+--------- - testpib_ins_trunct | regress_publication_user | f | t | f | f - testpub_default | regress_publication_user | f | t | t | t + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------+--------------------------+------------+---------+---------+---------+----------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f + testpub_default | regress_publication_user | f | t | t | t | f (2 rows) --- adding tables @@ -76,10 +76,10 @@ Publications: "testpub_foralltables" \dRp+ testpub_foralltables - Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | t | t | t | f + Publication testpub_foralltables + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | t | t | t | f | f (1 row) DROP TABLE testpub_tbl2; @@ -89,19 +89,19 @@ CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub3 + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub4 + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t Tables: "public.testpub_tbl3" @@ -119,10 +119,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub_fortbl + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -165,10 +165,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -210,10 +210,10 @@ DROP TABLE testpub_parted; DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | f (1 row) -- fail - must be owner of publication @@ -223,20 +223,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes --------------+--------------------------+------------+---------+---------+--------- - testpub_foo | regress_publication_user | f | t | t | t + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +-------------+--------------------------+------------+---------+---------+---------+----------- + testpub_foo | regress_publication_user | f | t | t | t | f (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes ------------------+---------------------------+------------+---------+---------+--------- - testpub_default | regress_publication_user2 | f | t | t | t + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +-----------------+---------------------------+------------+---------+---------+---------+----------- + testpub_default | regress_publication_user2 | f | t | t | t | f (1 row) DROP PUBLICATION testpub_default; diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl new file mode 100644 index 0000000000..864135f007 --- /dev/null +++ b/src/test/subscription/t/010_truncate.pl @@ -0,0 +1,106 @@ +# Test TRUNCATE +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE SEQUENCE seq1 OWNED BY tab1.a" +); +$node_subscriber->safe_psql('postgres', + "ALTER SEQUENCE seq1 START 101" +); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr application_name=sub1' PUBLICATION pub1"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr application_name=sub2' PUBLICATION pub2"); + +$node_publisher->wait_for_catchup('sub1'); + +# insert data to truncate + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)"); + +$node_publisher->wait_for_catchup('sub1'); + +# truncate and check + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), + 'truncate replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT nextval('seq1')"); +is($result, qq(1), + 'sequence not restarted'); + +# truncate with restart identity + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT nextval('seq1')"); +is($result, qq(101), + 'truncate restarted identities'); + +# test publication that does not replicate truncate + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(3|1|3), + 'truncate not replicated'); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(0||), + 'truncate replicated after publication change'); -- 2.17.0