From 7ee846501f8532cc6b6ebc06b07b8555ae8a57f4 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Thu, 5 Apr 2018 11:47:06 -0400 Subject: [PATCH v19 2/2] Logical replication support for TRUNCATE --- doc/src/sgml/catalogs.sgml | 8 + doc/src/sgml/logical-replication.sgml | 13 +- doc/src/sgml/protocol.sgml | 56 +++++++ doc/src/sgml/ref/create_publication.sgml | 10 +- src/backend/catalog/pg_publication.c | 1 + src/backend/commands/publicationcmds.c | 20 ++- src/backend/replication/logical/proto.c | 55 +++++++ src/backend/replication/logical/worker.c | 68 +++++++++ src/backend/replication/pgoutput/pgoutput.c | 129 +++++++++++----- src/backend/utils/cache/relcache.c | 3 +- src/bin/pg_dump/pg_dump.c | 33 +++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/t/002_pg_dump.pl | 2 +- src/bin/psql/describe.c | 26 +++- src/include/catalog/pg_publication.h | 7 +- src/include/replication/logicalproto.h | 4 + src/test/regress/expected/publication.out | 84 +++++----- src/test/subscription/t/010_truncate.pl | 161 ++++++++++++++++++++ 18 files changed, 571 insertions(+), 110 deletions(-) create mode 100644 src/test/subscription/t/010_truncate.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index d6a9d8c580..e8efa13e8d 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5518,6 +5518,14 @@ <structname>pg_publication</structname> Columns If true, DELETE operations are replicated for tables in the publication. + + + pubtruncate + bool + + If true, TRUNCATE operations are replicated for + tables in the publication. + diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 75551d8ee1..151e773fc2 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -108,8 +108,8 @@ Publication Publications can choose to limit the changes they produce to - any combination of INSERT, UPDATE, and - DELETE, similar to how triggers are fired by + any combination of INSERT, UPDATE, + DELETE, and TRUNCATE, similar to how triggers are fired by particular event types. By default, all operation types are replicated. @@ -364,15 +364,6 @@ Restrictions - - - TRUNCATE commands are not replicated. This can, of - course, be worked around by using DELETE instead. To - avoid accidental TRUNCATE invocations, you can revoke - the TRUNCATE privilege from tables. - - - Large objects (see ) are not replicated. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index b94dd4ac65..004b36084f 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6774,6 +6774,62 @@ Logical Replication Message Formats + + +Truncate + + + + + + + + Byte1('T') + + + + Identifies the message as a truncate message. + + + + + + Int32 + + + + Number of relations + + + + + + Int8 + + + + Option bits for TRUNCATE: + 1 for CASCADE, 2 for RESTART IDENTITY + + + + + + Int32 + + + + ID of the relation corresponding to the ID in the relation + message. This field is repeated for each relation. + + + + + + + + + diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index bfe12d5f41..99f87ca393 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -106,10 +106,11 @@ Parameters This parameter determines which DML operations will be published by the new publication to the subscribers. The value is comma-separated list of operations. The allowed operations are - insert, update, and - delete. The default is to publish all actions, + insert, update, + delete, and truncate. + The default is to publish all actions, and so the default value for this option is - 'insert, update, delete'. + 'insert, update, delete, truncate'. @@ -168,8 +169,7 @@ Notes - TRUNCATE and DDL operations - are not published. + DDL operations are not published. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index ba18258ebb..ec3bd1d22d 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -376,6 +376,7 @@ GetPublication(Oid pubid) pub->pubactions.pubinsert = pubform->pubinsert; pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; + pub->pubactions.pubtruncate = pubform->pubtruncate; ReleaseSysCache(tup); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 9c5aa9ebc2..29992d4a0e 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -62,7 +62,8 @@ parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, - bool *publish_delete) + bool *publish_delete, + bool *publish_truncate) { ListCell *lc; @@ -72,6 +73,7 @@ parse_publication_options(List *options, *publish_insert = true; *publish_update = true; *publish_delete = true; + *publish_truncate = true; /* Parse options */ foreach(lc, options) @@ -96,6 +98,7 @@ parse_publication_options(List *options, *publish_insert = false; *publish_update = false; *publish_delete = false; + *publish_truncate = false; *publish_given = true; publish = defGetString(defel); @@ -116,6 +119,8 @@ parse_publication_options(List *options, *publish_update = true; else if (strcmp(publish_opt, "delete") == 0) *publish_delete = true; + else if (strcmp(publish_opt, "truncate") == 0) + *publish_truncate = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -145,6 +150,7 @@ CreatePublication(CreatePublicationStmt *stmt) bool publish_insert; bool publish_update; bool publish_delete; + bool publish_truncate; AclResult aclresult; /* must have CREATE privilege on database */ @@ -181,7 +187,8 @@ CreatePublication(CreatePublicationStmt *stmt) parse_publication_options(stmt->options, &publish_given, &publish_insert, - &publish_update, &publish_delete); + &publish_update, &publish_delete, + &publish_truncate); values[Anum_pg_publication_puballtables - 1] = BoolGetDatum(stmt->for_all_tables); @@ -191,6 +198,8 @@ CreatePublication(CreatePublicationStmt *stmt) BoolGetDatum(publish_update); values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); + values[Anum_pg_publication_pubtruncate - 1] = + BoolGetDatum(publish_truncate); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -237,11 +246,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, bool publish_insert; bool publish_update; bool publish_delete; + bool publish_truncate; ObjectAddress obj; parse_publication_options(stmt->options, &publish_given, &publish_insert, - &publish_update, &publish_delete); + &publish_update, &publish_delete, + &publish_truncate); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -258,6 +269,9 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); replaces[Anum_pg_publication_pubdelete - 1] = true; + + values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate); + replaces[Anum_pg_publication_pubtruncate - 1] = true; } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 948343e4ae..edc97a7662 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -26,6 +26,9 @@ */ #define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define TRUNCATE_CASCADE (1<<0) +#define TRUNCATE_RESTART_SEQS (1<<1) + static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple); @@ -292,6 +295,58 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) return relid; } +/* + * Write TRUNCATE to the output stream. + */ +void +logicalrep_write_truncate(StringInfo out, + int nrelids, + Oid relids[], + bool cascade, bool restart_seqs) +{ + int i; + uint8 flags = 0; + + pq_sendbyte(out, 'T'); /* action TRUNCATE */ + + pq_sendint32(out, nrelids); + + /* encode and send truncate flags */ + if (cascade) + flags |= TRUNCATE_CASCADE; + if (restart_seqs) + flags |= TRUNCATE_RESTART_SEQS; + pq_sendint8(out, flags); + + for (i = 0; i < nrelids; i++) + pq_sendint32(out, relids[i]); +} + +/* + * Read TRUNCATE from stream. + */ +List * +logicalrep_read_truncate(StringInfo in, + bool *cascade, bool *restart_seqs) +{ + int i; + int nrelids; + List *relids = NIL; + uint8 flags; + + nrelids = pq_getmsgint(in, 4); + + /* read and decode truncate flags */ + flags = pq_getmsgint(in, 1); + *cascade = (flags & TRUNCATE_CASCADE) > 0; + *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0; + + for (i = 0; i < nrelids; i++) + relids = lappend_oid(relids, pq_getmsgint(in, 4)); + + return relids; +} + /* * Write relation description to the output stream. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fdace7eea2..18766364f7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -30,10 +30,12 @@ #include "access/xact.h" #include "access/xlog_internal.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -83,6 +85,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/timeout.h" #include "utils/tqual.h" #include "utils/syscache.h" @@ -888,6 +891,67 @@ apply_handle_delete(StringInfo s) CommandCounterIncrement(); } +/* + * Handle TRUNCATE message. + * + * TODO: FDW support + */ +static void +apply_handle_truncate(StringInfo s) +{ + bool cascade = false; + bool restart_seqs = false; + List *remote_relids = NIL; + List *remote_rels = NIL; + List *rels = NIL; + List *relids = NIL; + List *relids_logged = NIL; + ListCell *lc; + + ensure_transaction(); + + remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); + + foreach(lc, remote_relids) + { + LogicalRepRelId relid = lfirst_oid(lc); + LogicalRepRelMapEntry *rel; + + rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + continue; + } + + remote_rels = lappend(remote_rels, rel); + rels = lappend(rels, rel->localrel); + relids = lappend_oid(relids, rel->localreloid); + if (RelationIsLogicallyLogged(rel->localrel)) + relids_logged = lappend_oid(relids, rel->localreloid); + } + + /* + * Even if we used CASCADE on the upstream master we explicitly + * default to replaying changes without further cascading. + * This might be later changeable with a user specified option. + */ + ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); + + foreach(lc, remote_rels) + { + LogicalRepRelMapEntry *rel = lfirst(lc); + + logicalrep_rel_close(rel, NoLock); + } + + CommandCounterIncrement(); +} + /* * Logical replication protocol message dispatcher. @@ -919,6 +983,10 @@ apply_dispatch(StringInfo s) case 'D': apply_handle_delete(s); break; + /* TRUNCATE */ + case 'T': + apply_handle_truncate(s); + break; /* RELATION */ case 'R': apply_handle_relation(s); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index aa9cf5b54e..e31ad7e027 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static void pgoutput_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, int nrelations, Relation relations[], + ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); @@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->startup_cb = pgoutput_startup; cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; + cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -250,6 +254,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +/* + * Write the relation schema if the current schema haven't been sent yet. + */ +static void +maybe_send_schema(LogicalDecodingContext *ctx, + Relation relation, RelationSyncEntry *relentry) +{ + if (!relentry->schema_sent) + { + TupleDesc desc; + int i; + + desc = RelationGetDescr(relation); + + /* + * Write out type info if needed. We do that only for user created + * types. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped) + continue; + + if (att->atttypid < FirstNormalObjectId) + continue; + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_typ(ctx->out, att->atttypid); + OutputPluginWrite(ctx, false); + } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); + relentry->schema_sent = true; + } +} + /* * Sends the decoded DML over wire. */ @@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - /* - * Write the relation schema if the current schema haven't been sent yet. - */ - if (!relentry->schema_sent) - { - TupleDesc desc; - int i; - - desc = RelationGetDescr(relation); - - /* - * Write out type info if needed. We do that only for user created - * types. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); - - if (att->attisdropped) - continue; - - if (att->atttypid < FirstNormalObjectId) - continue; - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); - OutputPluginWrite(ctx, false); - relentry->schema_sent = true; - } + maybe_send_schema(ctx, relation, relentry); /* Send the data */ switch (change->action) @@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + MemoryContext old; + RelationSyncEntry *relentry; + int i; + int nrelids; + Oid *relids; + + old = MemoryContextSwitchTo(data->context); + + relids = palloc0(nrelations * sizeof(Oid)); + nrelids = 0; + + for (i = 0; i < nrelations; i++) + { + Relation relation = relations[i]; + Oid relid = RelationGetRelid(relation); + + relentry = get_rel_sync_entry(data, relid); + + if (!is_publishable_relation(relation)) + continue; + + if (!relentry->pubactions.pubtruncate) + continue; + + relids[nrelids++] = relid; + maybe_send_schema(ctx, relation, relentry); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_truncate(ctx->out, + nrelids, + relids, + change->data.truncate_msg.cascade, + change->data.truncate_msg.restart_seqs); + OutputPluginWrite(ctx, true); + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} + /* * Currently we always forward. */ @@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) * we only need to consider ones that the subscriber requested. */ entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; foreach(lc, data->publications) { @@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete) + entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 69a2114a10..6a67c185b0 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5339,6 +5339,7 @@ GetRelationPublicationActions(Relation relation) pubactions->pubinsert |= pubform->pubinsert; pubactions->pubupdate |= pubform->pubupdate; pubactions->pubdelete |= pubform->pubdelete; + pubactions->pubtruncate |= pubform->pubtruncate; ReleaseSysCache(tup); @@ -5347,7 +5348,7 @@ GetRelationPublicationActions(Relation relation) * other publications. */ if (pubactions->pubinsert && pubactions->pubupdate && - pubactions->pubdelete) + pubactions->pubdelete && pubactions->pubtruncate) break; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index d066f4f00b..69016a6c4d 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3712,6 +3712,7 @@ getPublications(Archive *fout) int i_pubinsert; int i_pubupdate; int i_pubdelete; + int i_pubtruncate; int i, ntups; @@ -3723,12 +3724,20 @@ getPublications(Archive *fout) resetPQExpBuffer(query); /* Get the publications. */ - appendPQExpBuffer(query, - "SELECT p.tableoid, p.oid, p.pubname, " - "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete " - "FROM pg_publication p", - username_subquery); + if (fout->remoteVersion >= 110000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate " + "FROM pg_publication p", + username_subquery); + else + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate " + "FROM pg_publication p", + username_subquery); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); @@ -3742,6 +3751,7 @@ getPublications(Archive *fout) i_pubinsert = PQfnumber(res, "pubinsert"); i_pubupdate = PQfnumber(res, "pubupdate"); i_pubdelete = PQfnumber(res, "pubdelete"); + i_pubtruncate = PQfnumber(res, "pubtruncate"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -3762,6 +3772,8 @@ getPublications(Archive *fout) (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0); pubinfo[i].pubdelete = (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0); + pubinfo[i].pubtruncate = + (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); if (strlen(pubinfo[i].rolname) == 0) write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n", @@ -3829,6 +3841,15 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo) first = false; } + if (pubinfo->pubtruncate) + { + if (!first) + appendPQExpBufferStr(query, ", "); + + appendPQExpBufferStr(query, "truncate"); + first = false; + } + appendPQExpBufferStr(query, "');\n"); ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId, diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index a4d6d926a8..c2314758de 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -595,6 +595,7 @@ typedef struct _PublicationInfo bool pubinsert; bool pubupdate; bool pubdelete; + bool pubtruncate; } PublicationInfo; /* diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index acdfde2a1e..25852b903c 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2038,7 +2038,7 @@ create_order => 50, create_sql => 'CREATE PUBLICATION pub1;', regexp => qr/^ - \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete');\E + \QCREATE PUBLICATION pub1 WITH (publish = 'insert, update, delete, truncate');\E /xm, like => { %full_runs, diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 0c3be1f504..75a1e42cee 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5187,7 +5187,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -5207,13 +5207,17 @@ listPublications(const char *pattern) " puballtables AS \"%s\",\n" " pubinsert AS \"%s\",\n" " pubupdate AS \"%s\",\n" - " pubdelete AS \"%s\"\n", + " pubdelete AS \"%s\"", gettext_noop("Name"), gettext_noop("Owner"), gettext_noop("All tables"), gettext_noop("Inserts"), gettext_noop("Updates"), gettext_noop("Deletes")); + if (pset.sversion >= 110000) + appendPQExpBuffer(&buf, + ",\n pubtruncate AS \"%s\"", + gettext_noop("Truncates")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -5254,6 +5258,7 @@ describePublications(const char *pattern) PQExpBufferData buf; int i; PGresult *res; + bool has_pubtruncate; if (pset.sversion < 100000) { @@ -5265,13 +5270,19 @@ describePublications(const char *pattern) return true; } + has_pubtruncate = (pset.sversion >= 110000); + initPQExpBuffer(&buf); printfPQExpBuffer(&buf, "SELECT oid, pubname,\n" " pg_catalog.pg_get_userbyid(pubowner) AS owner,\n" - " puballtables, pubinsert, pubupdate, pubdelete\n" - "FROM pg_catalog.pg_publication\n"); + " puballtables, pubinsert, pubupdate, pubdelete"); + if (has_pubtruncate) + appendPQExpBuffer(&buf, + ", pubtruncate"); + appendPQExpBuffer(&buf, + "\nFROM pg_catalog.pg_publication\n"); processSQLNamePattern(pset.db, &buf, pattern, false, false, NULL, "pubname", NULL, @@ -5317,6 +5328,9 @@ describePublications(const char *pattern) printTableOpt myopt = pset.popt.topt; printTableContent cont; + if (has_pubtruncate) + ncols++; + initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); printTableInit(&cont, &myopt, title.data, ncols, nrows); @@ -5326,12 +5340,16 @@ describePublications(const char *pattern) printTableAddHeader(&cont, gettext_noop("Inserts"), true, align); printTableAddHeader(&cont, gettext_noop("Updates"), true, align); printTableAddHeader(&cont, gettext_noop("Deletes"), true, align); + if (has_pubtruncate) + printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false); + if (has_pubtruncate) + printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); if (!puballtables) { diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 37e77b8be7..b643c489cd 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -49,6 +49,9 @@ CATALOG(pg_publication,6104) /* true if deletes are published */ bool pubdelete; + /* true if truncates are published */ + bool pubtruncate; + } FormData_pg_publication; /* ---------------- @@ -63,19 +66,21 @@ typedef FormData_pg_publication *Form_pg_publication; * ---------------- */ -#define Natts_pg_publication 6 +#define Natts_pg_publication 7 #define Anum_pg_publication_pubname 1 #define Anum_pg_publication_pubowner 2 #define Anum_pg_publication_puballtables 3 #define Anum_pg_publication_pubinsert 4 #define Anum_pg_publication_pubupdate 5 #define Anum_pg_publication_pubdelete 6 +#define Anum_pg_publication_pubtruncate 7 typedef struct PublicationActions { bool pubinsert; bool pubupdate; bool pubdelete; + bool pubtruncate; } PublicationActions; typedef struct Publication diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 116f16f42d..92e88d3127 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -97,6 +97,10 @@ extern void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple); extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); +extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], + bool cascade, bool restart_seqs); +extern List *logicalrep_read_truncate(StringInfo in, + bool *cascade, bool *restart_seqs); extern void logicalrep_write_rel(StringInfo out, Relation rel); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, Oid typoid); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 0c86c647bc..afbbdd543d 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -21,20 +21,20 @@ ERROR: unrecognized publication parameter: foo CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum'); ERROR: unrecognized "publish" value: "cluster" \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes ---------------------+--------------------------+------------+---------+---------+--------- - testpib_ins_trunct | regress_publication_user | f | t | f | f - testpub_default | regress_publication_user | f | f | t | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------+--------------------------+------------+---------+---------+---------+----------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f + testpub_default | regress_publication_user | f | f | t | f | f (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes ---------------------+--------------------------+------------+---------+---------+--------- - testpib_ins_trunct | regress_publication_user | f | t | f | f - testpub_default | regress_publication_user | f | t | t | t + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------+--------------------------+------------+---------+---------+---------+----------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f + testpub_default | regress_publication_user | f | t | t | t | f (2 rows) --- adding tables @@ -76,10 +76,10 @@ Publications: "testpub_foralltables" \dRp+ testpub_foralltables - Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | t | t | t | f + Publication testpub_foralltables + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | t | t | t | f | f (1 row) DROP TABLE testpub_tbl2; @@ -89,19 +89,19 @@ CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub3 + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub4 + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t Tables: "public.testpub_tbl3" @@ -119,10 +119,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub_fortbl + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -165,10 +165,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -210,10 +210,10 @@ DROP TABLE testpub_parted; DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes ---------------------------+------------+---------+---------+--------- - regress_publication_user | f | t | t | t + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | f (1 row) -- fail - must be owner of publication @@ -223,20 +223,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes --------------+--------------------------+------------+---------+---------+--------- - testpub_foo | regress_publication_user | f | t | t | t + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +-------------+--------------------------+------------+---------+---------+---------+----------- + testpub_foo | regress_publication_user | f | t | t | t | f (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes ------------------+---------------------------+------------+---------+---------+--------- - testpub_default | regress_publication_user2 | f | t | t | t + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates +-----------------+---------------------------+------------+---------+---------+---------+----------- + testpub_default | regress_publication_user2 | f | t | t | t | f (1 row) DROP PUBLICATION testpub_default; diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl new file mode 100644 index 0000000000..8ea4ab624f --- /dev/null +++ b/src/test/subscription/t/010_truncate.pl @@ -0,0 +1,161 @@ +# Test TRUNCATE +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 9; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)"); + +$node_subscriber->safe_psql('postgres', + "CREATE SEQUENCE seq1 OWNED BY tab1.a" +); +$node_subscriber->safe_psql('postgres', + "ALTER SEQUENCE seq1 START 101" +); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr application_name=sub1' PUBLICATION pub1"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr application_name=sub2' PUBLICATION pub2"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr application_name=sub3' PUBLICATION pub3"); + +$node_publisher->wait_for_catchup('sub1'); + +# insert data to truncate + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)"); + +$node_publisher->wait_for_catchup('sub1'); + +# truncate and check + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), + 'truncate replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT nextval('seq1')"); +is($result, qq(1), + 'sequence not restarted'); + +# truncate with restart identity + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT nextval('seq1')"); +is($result, qq(101), + 'truncate restarted identities'); + +# test publication that does not replicate truncate + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(3|1|3), + 'truncate not replicated'); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(0||), + 'truncate replicated after publication change'); + +# test multiple tables connected by foreign keys + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab3 VALUES (1), (2), (3)"); +$node_subscriber->safe_psql('postgres', "INSERT INTO tab4 VALUES (11, 1), (111, 1), (22, 2)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab3, tab4"); + +$node_publisher->wait_for_catchup('sub3'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab3"); +is($result, qq(0||), + 'truncate of multiple tables replicated'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(x), max(x) FROM tab4"); +is($result, qq(0||), + 'truncate of multiple tables replicated'); + +# test truncate of multiple tables, some of which are not published + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub2"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub2"); + +$node_subscriber->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (3)"); +$node_subscriber->safe_psql('postgres', "INSERT INTO tab2 VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), + 'truncate of multiple tables some not published'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(3|1|3), + 'truncate of multiple tables some not published'); -- 2.17.0