From df101e32c358ac9243285d4e8f125803988e5508 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Thu, 5 Apr 2018 11:46:41 -0400 Subject: [PATCH v19 1/2] Logical decoding of TRUNCATE --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/truncate.out | 25 ++++++ contrib/test_decoding/sql/truncate.sql | 10 +++ contrib/test_decoding/test_decoding.c | 61 +++++++++++++ doc/src/sgml/logicaldecoding.sgml | 27 +++++- src/backend/access/heap/heapam.c | 7 ++ src/backend/access/rmgrdesc/heapdesc.c | 14 +++ src/backend/commands/tablecmds.c | 87 +++++++++++++++++-- src/backend/replication/logical/decode.c | 41 +++++++++ src/backend/replication/logical/logical.c | 43 +++++++++ .../replication/logical/reorderbuffer.c | 35 ++++++++ src/include/access/heapam_xlog.h | 23 ++++- src/include/commands/tablecmds.h | 2 + src/include/replication/output_plugin.h | 10 +++ src/include/replication/reorderbuffer.h | 24 ++++- 15 files changed, 398 insertions(+), 13 deletions(-) create mode 100644 contrib/test_decoding/expected/truncate.out create mode 100644 contrib/test_decoding/sql/truncate.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 6c18189d9d..1d601d8144 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -39,7 +39,7 @@ submake-test_decoding: REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot + spill slot truncate regresscheck: | submake-regress submake-test_decoding temp-install $(pg_regress_check) \ diff --git a/contrib/test_decoding/expected/truncate.out b/contrib/test_decoding/expected/truncate.out new file mode 100644 index 0000000000..be85178206 --- /dev/null +++ b/contrib/test_decoding/expected/truncate.out @@ -0,0 +1,25 @@ +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE tab1 (id serial unique, data int); +CREATE TABLE tab2 (a int primary key, b int); +TRUNCATE tab1; +TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE; +TRUNCATE tab1, tab2; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------ + BEGIN + table public.tab1: TRUNCATE: (no-flags) + COMMIT + BEGIN + table public.tab1: TRUNCATE: restart_seqs cascade + COMMIT + BEGIN + table public.tab1, public.tab2: TRUNCATE: (no-flags) + COMMIT +(9 rows) + diff --git a/contrib/test_decoding/sql/truncate.sql b/contrib/test_decoding/sql/truncate.sql new file mode 100644 index 0000000000..88f113fd5b --- /dev/null +++ b/contrib/test_decoding/sql/truncate.sql @@ -0,0 +1,10 @@ +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE tab1 (id serial unique, data int); +CREATE TABLE tab2 (a int primary key, b int); + +TRUNCATE tab1; +TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE; +TRUNCATE tab1, tab2; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index a94aeeae29..c238f12e66 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -52,6 +52,10 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static void pg_decode_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pg_decode_message(LogicalDecodingContext *ctx, @@ -74,6 +78,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->startup_cb = pg_decode_startup; cb->begin_cb = pg_decode_begin_txn; cb->change_cb = pg_decode_change; + cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; @@ -480,6 +485,62 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static void +pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + TestDecodingData *data; + MemoryContext old; + int i; + + data = ctx->output_plugin_private; + + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_begin(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfoString(ctx->out, "table "); + + for (i = 0; i < nrelations; i++) + { + Oid relid = RelationGetRelid(relations[i]); + + if (i > 0) + appendStringInfoString(ctx->out, ", "); + + appendStringInfoString(ctx->out, + quote_qualified_identifier( + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid))); + } + + appendStringInfoString(ctx->out, ": TRUNCATE:"); + + if (change->data.truncate_msg.restart_seqs + || change->data.truncate_msg.cascade) + { + if (change->data.truncate_msg.restart_seqs) + appendStringInfo(ctx->out, " restart_seqs"); + if (change->data.truncate_msg.cascade) + appendStringInfo(ctx->out, " cascade"); + } + else + appendStringInfoString(ctx->out, " (no-flags)"); + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + OutputPluginWrite(ctx, true); +} + static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index f6b14dccb0..b29cfe6fb4 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -383,6 +383,7 @@ Initialization Function LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; + LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; @@ -394,8 +395,10 @@ Initialization Function The begin_cb, change_cb and commit_cb callbacks are required, while startup_cb, - filter_by_origin_cb + filter_by_origin_cb, truncate_cb, and shutdown_cb are optional. + If truncate_cb is not set but a + TRUNCATE is to be decoded, the action will be ignored. @@ -590,6 +593,28 @@ Change Callback + + Truncate Callback + + + The truncate_cb callback is called for a + TRUNCATE command. + +typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + + The parameters are analogous to the change_cb + callback. However, because TRUNCATE actions on + tables connected by foreign keys need to be executed together, this + callback receives an array of relations instead of just a single one. + See the description of the statement for + details. + + + Origin Filter Callback diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index f96567f5d5..0bafb4fefc 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -9260,6 +9260,13 @@ heap_redo(XLogReaderState *record) case XLOG_HEAP_UPDATE: heap_xlog_update(record, false); break; + case XLOG_HEAP_TRUNCATE: + /* + * TRUNCATE is a no-op because the actions are already logged as + * SMGR WAL records. TRUNCATE WAL record only exists for logical + * decoding. + */ + break; case XLOG_HEAP_HOT_UPDATE: heap_xlog_update(record, true); break; diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c index b00c071cb6..349feb6510 100644 --- a/src/backend/access/rmgrdesc/heapdesc.c +++ b/src/backend/access/rmgrdesc/heapdesc.c @@ -75,6 +75,17 @@ heap_desc(StringInfo buf, XLogReaderState *record) xlrec->new_offnum, xlrec->new_xmax); } + else if (info == XLOG_HEAP_TRUNCATE) + { + xl_heap_truncate *xlrec = (xl_heap_truncate *) rec; + + if (xlrec->flags & XLH_TRUNCATE_CASCADE) + appendStringInfo(buf, "cascade "); + if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS) + appendStringInfo(buf, "restart_seqs "); + appendStringInfo(buf, "nrelids %u", xlrec->nrelids); + /* skip the list of relids */ + } else if (info == XLOG_HEAP_CONFIRM) { xl_heap_confirm *xlrec = (xl_heap_confirm *) rec; @@ -186,6 +197,9 @@ heap_identify(uint8 info) case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE: id = "HOT_UPDATE+INIT"; break; + case XLOG_HEAP_TRUNCATE: + id = "TRUNCATE"; + break; case XLOG_HEAP_CONFIRM: id = "HEAP_CONFIRM"; break; diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index ec2f9616ed..077b954f36 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -16,6 +16,7 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/heapam_xlog.h" #include "access/multixact.h" #include "access/reloptions.h" #include "access/relscan.h" @@ -1322,11 +1323,7 @@ ExecuteTruncate(TruncateStmt *stmt) { List *rels = NIL; List *relids = NIL; - List *seq_relids = NIL; - EState *estate; - ResultRelInfo *resultRelInfos; - ResultRelInfo *resultRelInfo; - SubTransactionId mySubid; + List *relids_logged = NIL; ListCell *cell; /* @@ -1350,6 +1347,9 @@ ExecuteTruncate(TruncateStmt *stmt) truncate_check_rel(rel); rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(rel)) + relids_logged = lappend_oid(relids_logged, myrelid); if (recurse) { @@ -1370,6 +1370,9 @@ ExecuteTruncate(TruncateStmt *stmt) truncate_check_rel(rel); rels = lappend(rels, rel); relids = lappend_oid(relids, childrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(rel)) + relids_logged = lappend_oid(relids_logged, childrelid); } } else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) @@ -1379,7 +1382,34 @@ ExecuteTruncate(TruncateStmt *stmt) errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly."))); } + ExecuteTruncateGuts(rels, relids, relids_logged, + stmt->behavior, stmt->restart_seqs); + + /* And close the rels */ + foreach(cell, rels) + { + Relation rel = (Relation) lfirst(cell); + + heap_close(rel, NoLock); + } +} + +void +ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, + DropBehavior behavior, bool restart_seqs) +{ + List *rels; + List *seq_relids = NIL; + EState *estate; + ResultRelInfo *resultRelInfos; + ResultRelInfo *resultRelInfo; + SubTransactionId mySubid; + ListCell *cell; + Oid *logrelids; + /* + * Open, exclusive-lock, and check all the explicitly-specified relations + * * In CASCADE mode, suck in all referencing relations as well. This * requires multiple iterations to find indirectly-dependent relations. At * each phase, we need to exclusive-lock new rels before looking for their @@ -1387,7 +1417,8 @@ ExecuteTruncate(TruncateStmt *stmt) * soon as we open it, to avoid a faux pas such as holding lock for a long * time on a rel we have no permissions for. */ - if (stmt->behavior == DROP_CASCADE) + rels = list_copy(explicit_rels); + if (behavior == DROP_CASCADE) { for (;;) { @@ -1409,6 +1440,9 @@ ExecuteTruncate(TruncateStmt *stmt) truncate_check_rel(rel); rels = lappend(rels, rel); relids = lappend_oid(relids, relid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(rel)) + relids_logged = lappend_oid(relids_logged, relid); } } } @@ -1421,7 +1455,7 @@ ExecuteTruncate(TruncateStmt *stmt) #ifdef USE_ASSERT_CHECKING heap_truncate_check_FKs(rels, false); #else - if (stmt->behavior == DROP_RESTRICT) + if (behavior == DROP_RESTRICT) heap_truncate_check_FKs(rels, false); #endif @@ -1431,7 +1465,7 @@ ExecuteTruncate(TruncateStmt *stmt) * We want to do this early since it's pointless to do all the truncation * work only to fail on sequence permissions. */ - if (stmt->restart_seqs) + if (restart_seqs) { foreach(cell, rels) { @@ -1586,6 +1620,38 @@ ExecuteTruncate(TruncateStmt *stmt) ResetSequence(seq_relid); } + /* + * Write a WAL record to allow this set of actions to be logically decoded. + * + * Assemble an array of relids so we can write a single WAL record for the + * whole action. + */ + if (list_length(relids_logged) > 0) + { + xl_heap_truncate xlrec; + int i = 0; + + logrelids = palloc(list_length(relids_logged) * sizeof(Oid)); + foreach (cell, relids_logged) + logrelids[i++] = lfirst_oid(cell); + + xlrec.dbId = MyDatabaseId; + xlrec.nrelids = list_length(relids_logged); + xlrec.flags = 0; + if (behavior == DROP_CASCADE) + xlrec.flags |= XLH_TRUNCATE_CASCADE; + if (restart_seqs) + xlrec.flags |= XLH_TRUNCATE_RESTART_SEQS; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfHeapTruncate); + XLogRegisterData((char *) logrelids, list_length(relids_logged) * sizeof(Oid)); + + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + (void) XLogInsert(RM_HEAP_ID, XLOG_HEAP_TRUNCATE); + } + /* * Process all AFTER STATEMENT TRUNCATE triggers. */ @@ -1603,7 +1669,10 @@ ExecuteTruncate(TruncateStmt *stmt) /* We can clean up the EState now */ FreeExecutorState(estate); - /* And close the rels (can't do this while EState still holds refs) */ + /* And close the eventual rels opened by CASCADE + * (can't do this while EState still holds refs) + */ + rels = list_difference_ptr(rels, explicit_rels); foreach(cell, rels) { Relation rel = (Relation) lfirst(cell); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 6eb0d5527e..c94ac29d83 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -65,6 +65,7 @@ static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *bu static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -449,6 +450,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeDelete(ctx, buf); break; + case XLOG_HEAP_TRUNCATE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeTruncate(ctx, buf); + break; + case XLOG_HEAP_INPLACE: /* @@ -825,6 +831,41 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } +/* + * Parse XLOG_HEAP_TRUNCATE from wal + */ +static void +DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_heap_truncate *xlrec; + ReorderBufferChange *change; + + xlrec = (xl_heap_truncate *) XLogRecGetData(r); + + /* only interested in our database */ + if (xlrec->dbId != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_TRUNCATE; + change->origin_id = XLogRecGetOrigin(r); + if (xlrec->flags & XLH_TRUNCATE_CASCADE) + change->data.truncate_msg.cascade = true; + if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS) + change->data.truncate_msg.restart_seqs = true; + change->data.truncate_msg.nrelids = xlrec->nrelids; + change->data.truncate_msg.relids = palloc(xlrec->nrelids * sizeof(Oid)); + memcpy(change->data.truncate_msg.relids, xlrec->relids, + xlrec->nrelids * sizeof(Oid)); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), + buf->origptr, change); +} + /* * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs. * diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3d8ad7ddf8..0737c7b1e7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -62,6 +62,8 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); +static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change); static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); @@ -183,6 +185,7 @@ StartupDecodingContext(List *output_plugin_options, /* wrap output plugin callbacks, so we can add error context information */ ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; + ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; @@ -734,6 +737,46 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (!ctx->callbacks.truncate_cb) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "truncate"; + state.report_location = change->lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = change->lsn; + + ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b4016ed52b..7691c1a8b4 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -415,6 +415,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_TRUNCATE: break; } @@ -1491,6 +1492,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_TRUNCATE: + { + int i; + int nrelids = change->data.truncate_msg.nrelids; + int nrelations = 0; + Relation *relations; + + relations = palloc0(nrelids * sizeof(Relation)); + for (i = 0; i < nrelids; i++) + { + Oid relid = change->data.truncate_msg.relids[i]; + Relation relation; + + relation = RelationIdGetRelation(relid); + + if (relation == NULL) + elog(ERROR, "could not open relation with OID %u", relid); + + if (!RelationIsLogicallyLogged(relation)) + continue; + + relations[nrelations++] = relation; + } + + rb->apply_truncate(rb, txn, nrelations, relations, change); + + for (i = 0; i < nrelations; i++) + RelationClose(relations[i]); + + break; + } + case REORDER_BUFFER_CHANGE_MESSAGE: rb->message(rb, txn, change->lsn, true, change->data.msg.prefix, @@ -2255,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } break; } + case REORDER_BUFFER_CHANGE_TRUNCATE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: @@ -2534,6 +2568,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } /* the base struct contains all the data, easy peasy */ + case REORDER_BUFFER_CHANGE_TRUNCATE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 700e25c36a..0052e4c569 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -32,7 +32,7 @@ #define XLOG_HEAP_INSERT 0x00 #define XLOG_HEAP_DELETE 0x10 #define XLOG_HEAP_UPDATE 0x20 -/* 0x030 is free, was XLOG_HEAP_MOVE */ +#define XLOG_HEAP_TRUNCATE 0x30 #define XLOG_HEAP_HOT_UPDATE 0x40 #define XLOG_HEAP_CONFIRM 0x50 #define XLOG_HEAP_LOCK 0x60 @@ -109,6 +109,27 @@ typedef struct xl_heap_delete #define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8)) +/* + * xl_heap_delete flag values, 8 bits are available. + */ +#define XLH_TRUNCATE_CASCADE (1<<0) +#define XLH_TRUNCATE_RESTART_SEQS (1<<1) + +/* + * For truncate we list all truncated relids in an array, followed by all + * sequence relids that need to be restarted, if any. + * All rels are always within the same database, so we just list dbid once. + */ +typedef struct xl_heap_truncate +{ + Oid dbId; + uint32 nrelids; + uint8 flags; + Oid relids[FLEXIBLE_ARRAY_MEMBER]; +} xl_heap_truncate; + +#define SizeOfHeapTruncate (offsetof(xl_heap_truncate, relids)) + /* * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted * or updated tuple in WAL; we can save a few bytes by reconstructing the diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 04a961d383..70ee3da76b 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -54,6 +54,8 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid, extern void CheckTableNotInUse(Relation rel, const char *stmt); extern void ExecuteTruncate(TruncateStmt *stmt); +extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, + DropBehavior behavior, bool restart_seqs); extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 82875d6b3d..1ee0a56f03 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -61,6 +61,15 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, Relation relation, ReorderBufferChange *change); +/* + * Callback for every TRUNCATE in a successful transaction. + */ +typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + /* * Called for every (explicit or implicit) COMMIT of a successful transaction. */ @@ -98,6 +107,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; + LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index aa430c843c..13a8798115 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -59,7 +59,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, - REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_TRUNCATE }; /* @@ -128,6 +129,18 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; + + /* + * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing + * one set of relations to be truncated. + */ + struct + { + Size nrelids; + bool cascade; + bool restart_seqs; + Oid *relids; + } truncate_msg; } data; /* @@ -283,6 +296,14 @@ typedef void (*ReorderBufferApplyChangeCB) ( Relation relation, ReorderBufferChange *change); +/* truncate callback signature */ +typedef void (*ReorderBufferApplyTruncateCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + /* begin callback signature */ typedef void (*ReorderBufferBeginCB) ( ReorderBuffer *rb, @@ -328,6 +349,7 @@ struct ReorderBuffer */ ReorderBufferBeginCB begin; ReorderBufferApplyChangeCB apply_change; + ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; base-commit: 0a64b45152b593c5eb95f2e88fbce7fbfe84ae7b -- 2.17.0