From 81fc28cedc19fe0f91f882d42989c14113a40f88 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Wed, 24 Feb 2016 17:02:36 +0100 Subject: [PATCH] Logical Decoding Messages --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/ddl.out | 21 ++-- contrib/test_decoding/expected/messages.out | 71 ++++++++++++++ contrib/test_decoding/sql/ddl.sql | 3 +- contrib/test_decoding/sql/messages.sql | 22 +++++ contrib/test_decoding/test_decoding.c | 18 ++++ doc/src/sgml/func.sgml | 45 +++++++++ doc/src/sgml/logicaldecoding.sgml | 38 ++++++++ src/backend/access/rmgrdesc/Makefile | 4 +- src/backend/access/rmgrdesc/logicalmsgdesc.c | 41 ++++++++ src/backend/access/transam/rmgr.c | 1 + src/backend/replication/logical/Makefile | 2 +- src/backend/replication/logical/decode.c | 46 +++++++++ src/backend/replication/logical/logical.c | 38 ++++++++ src/backend/replication/logical/logicalfuncs.c | 27 ++++++ src/backend/replication/logical/message.c | 85 +++++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 121 ++++++++++++++++++++++++ src/backend/replication/logical/snapbuild.c | 19 ++++ src/bin/pg_xlogdump/.gitignore | 20 +--- src/bin/pg_xlogdump/rmgrdesc.c | 1 + src/include/access/rmgrlist.h | 1 + src/include/catalog/pg_proc.h | 4 + src/include/replication/logicalfuncs.h | 2 + src/include/replication/message.h | 41 ++++++++ src/include/replication/output_plugin.h | 13 +++ src/include/replication/reorderbuffer.h | 22 +++++ src/include/replication/snapbuild.h | 2 + 27 files changed, 679 insertions(+), 31 deletions(-) create mode 100644 contrib/test_decoding/expected/messages.out create mode 100644 contrib/test_decoding/sql/messages.sql create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c create mode 100644 src/backend/replication/logical/message.c create mode 100644 src/include/replication/message.h diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 06c9546..309cb0b 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -38,7 +38,7 @@ submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ - decoding_into_rel binary prepared replorigin time + decoding_into_rel binary prepared replorigin time messages regresscheck: | submake-regress submake-test_decoding temp-install $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 77719e8..32cd24d 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc (7 rows) /* - * check that disk spooling works + * check that disk spooling works (also for logical messages) */ BEGIN; CREATE TABLE tr_etoomuch (id serial primary key, data int); INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i); +SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg'); + ?column? +---------------- + tx logical msg +(1 row) + DELETE FROM tr_etoomuch WHERE id < 5000; UPDATE tr_etoomuch SET data = - data WHERE id > 5000; COMMIT; @@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY substring(data, 1, 24) ORDER BY 1,2; - count | min | max --------+-------------------------------------------------+------------------------------------------------------------------------ - 1 | BEGIN | BEGIN - 1 | COMMIT | COMMIT - 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999 -(3 rows) + count | min | max +-------+-----------------------------------------------------------------------+------------------------------------------------------------------------ + 1 | BEGIN | BEGIN + 1 | COMMIT | COMMIT + 1 | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg + 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999 +(4 rows) -- check updates of primary keys work correctly BEGIN; diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out new file mode 100644 index 0000000..2edbf2f --- /dev/null +++ b/contrib/test_decoding/expected/messages.out @@ -0,0 +1,71 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); + ?column? +---------- + msg1 +(1 row) + +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); + ?column? +---------- + msg2 +(1 row) + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3'); + ?column? +---------- + msg3 +(1 row) + +SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4'); + ?column? +---------- + msg4 +(1 row) + +ROLLBACK; +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5'); + ?column? +---------- + msg5 +(1 row) + +SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6'); + ?column? +---------- + msg6 +(1 row) + +SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7'); + ?column? +---------- + msg7 +(1 row) + +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------ + message: transactional: 1 prefix: test, sz: 4 content:msg1 + message: transactional: 0 prefix: test, sz: 4 content:msg2 + message: transactional: 0 prefix: test, sz: 4 content:msg4 + message: transactional: 0 prefix: test, sz: 4 content:msg6 + message: transactional: 1 prefix: test, sz: 4 content:msg5 + message: transactional: 1 prefix: test, sz: 4 content:msg7 +(6 rows) + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + init +(1 row) + diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql index ad928ad..b1f7bf6 100644 --- a/contrib/test_decoding/sql/ddl.sql +++ b/contrib/test_decoding/sql/ddl.sql @@ -108,11 +108,12 @@ DELETE FROM tr_pkey; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); /* - * check that disk spooling works + * check that disk spooling works (also for logical messages) */ BEGIN; CREATE TABLE tr_etoomuch (id serial primary key, data int); INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i); +SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg'); DELETE FROM tr_etoomuch WHERE id < 5000; UPDATE tr_etoomuch SET data = - data WHERE id > 5000; COMMIT; diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql new file mode 100644 index 0000000..37bd100 --- /dev/null +++ b/contrib/test_decoding/sql/messages.sql @@ -0,0 +1,22 @@ +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3'); +SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4'); +ROLLBACK; + +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5'); +SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6'); +SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7'); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 4cf808f..3336e1e 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -21,6 +21,7 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#include "replication/message.h" #include "replication/origin.h" #include "utils/builtins.h" @@ -63,6 +64,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); void _PG_init(void) @@ -82,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; + cb->message_cb = pg_decode_message; } @@ -471,3 +477,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } + +static void +pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", + transactional, prefix, sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); +} diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index ae93e69..7ac71d1 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -18238,6 +18238,51 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); + + + + pg_logical_emit_message + + pg_logical_emit_message(transactional bool, prefix text, content text) + + + void + + + Emit text logical decoding message. This can be used to pass generic + messages to logical decoding plugins through WAL. The parameter + transactional specifies if the message should + be part of current transaction or if it should be written immediately + and decoded as soon as the logical decoding reads the record. The + prefix is textual prefix used by the logical + decoding plugins to easily recognize interesting messages for them. + The content is the text of the message. + + + + + + + >pg_logical_emit_message + + >pg_logical_emit_message(transactional bool, prefix text, content bytea) + + + void + + + Emit binary logical decoding message. This can be used to pass generic + messages to logical decoding plugins through WAL. The parameter + transactional specifies if the message should + be part of current transaction or if it should be written immediately + and decoded as soon as the logical decoding reads the record. The + prefix is textual prefix used by the logical + decoding plugins to easily recognize interesting messages for them. + The content is the binary content of the + message. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 45fdfeb..dd67c8f 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -363,6 +363,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; @@ -602,6 +603,43 @@ typedef bool (*LogicalDecodeFilterByOriginCB) ( more efficient. + + + Generic Message Callback + + + The optional message_cb callback is called whenever + a logical decoding message has been decoded. + +typedef void (*LogicalDecodeMessageCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message +); + + The txn parameter contains meta information about + the transaction, like the time stamp at which it has been committed and + its XID. Note however that it can be NULL when the message is + non-transactional and the XID was not assigned yet in the transaction + which logged the message. The lsn has WAL + position of the message. The transactional says + if he message was sent as transactional or not. + The prefix is arbitrary null-terminated prefix + which can be used for identifying interesting messages for the current + plugin. And finally the message parameter holds + the actual message of message_size size. + + + Extra care should be taken to ensure that the prefix the output plugin + considers interesting is unique. Using name of the extension or the + output plugin itself is often a good choice. + + + diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index c72a1f2..723b4d8 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -9,8 +9,8 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \ - hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \ - replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \ + hashdesc.o heapdesc.o logicalmsgdesc.o mxactdesc.o nbtdesc.o \ + relmapdesc.o replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \ standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicalmsgdesc.c new file mode 100644 index 0000000..b194e14 --- /dev/null +++ b/src/backend/access/rmgrdesc/logicalmsgdesc.c @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * + * logicalmsgdesc.c + * rmgr descriptor routines for replication/logical/message.c + * + * Portions Copyright (c) 2015-2016, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/logicalmsgdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "replication/message.h" + +void +logicalmsg_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_LOGICAL_MESSAGE) + { + xl_logical_message *xlrec = (xl_logical_message *) rec; + + appendStringInfo(buf, "%s message size %zu bytes", + xlrec->transactional ? "transactional" : "nontransactional", + xlrec->message_size); + } +} + +const char * +logicalmsg_identify(uint8 info) +{ + if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE) + return "MESSAGE"; + + return NULL; +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 7c4d773..1a42121 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -23,6 +23,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "replication/message.h" #include "replication/origin.h" #include "storage/standby.h" #include "utils/relmapper.h" diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 8adea13..1d7ca06 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) -OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \ +OBJS = decode.o logical.o logicalfuncs.o message.o origin.o reorderbuffer.o \ snapbuild.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 13af485..01c96f7 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -39,6 +39,7 @@ #include "replication/decode.h" #include "replication/logical.h" +#include "replication/message.h" #include "replication/reorderbuffer.h" #include "replication/origin.h" #include "replication/snapbuild.h" @@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -123,6 +125,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor DecodeHeapOp(ctx, &buf); break; + case RM_LOGICALMSG_ID: + DecodeLogicalMsgOp(ctx, &buf); + break; + /* * Rmgrs irrelevant for logical decoding; they describe stuff not * represented in logical decoding. Add new rmgrs in rmgrlist.h's @@ -457,6 +463,46 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +/* + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). + */ +static void +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogReaderState *r = buf->record; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + Snapshot snapshot; + xl_logical_message *message; + + if (info != XLOG_LOGICAL_MESSAGE) + elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info); + + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + + /* No point in doing anything yet. */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + return; + + message = (xl_logical_message *) XLogRecGetData(r); + + if (message->transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) + return; + else if (!message->transactional && + (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || + SnapBuildXactNeedsSkip(builder, buf->origptr))) + return; + + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr, + message->transactional, + message->message, /* first part of message is prefix */ + message->message_size, + message->message + message->prefix_size); +} + static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2e6d3f9..c06b2fa 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); +static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message); static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); @@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; + ctx->reorder->message = message_cb_wrapper; ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; @@ -702,6 +706,40 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) return ret; } +static void +message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, + const char *prefix, Size message_size, const char *message) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + if (ctx->callbacks.message_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "message"; + state.report_location = message_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = message_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix, + message_size, message); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Set the required catalog xmin horizon for historic snapshots in the current * replication slot. diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index f789fc1..552dac3 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -24,6 +24,8 @@ #include "access/xlog_internal.h" #include "access/xlogutils.h" +#include "access/xact.h" + #include "catalog/pg_type.h" #include "nodes/makefuncs.h" @@ -41,6 +43,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" +#include "replication/message.h" #include "storage/fd.h" @@ -363,3 +366,27 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) { return pg_logical_slot_get_changes_guts(fcinfo, false, true); } + + +/* + * SQL function for writing logical decding message into WAL. + */ +Datum +pg_logical_emit_message_bytea(PG_FUNCTION_ARGS) +{ + bool transactional = PG_GETARG_BOOL(0); + char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1)); + bytea *data = PG_GETARG_BYTEA_PP(2); + XLogRecPtr lsn; + + lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data), + transactional); + PG_RETURN_LSN(lsn); +} + +Datum +pg_logical_emit_message_text(PG_FUNCTION_ARGS) +{ + /* bytea and text are compatible */ + return pg_logical_emit_message_bytea(fcinfo); +} diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c new file mode 100644 index 0000000..85c597b --- /dev/null +++ b/src/backend/replication/logical/message.c @@ -0,0 +1,85 @@ +/*------------------------------------------------------------------------- + * + * message.c + * Generic logical messages. + * + * Copyright (c) 2013-2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/message.c + * + * NOTES + * + * Generic logical messages allow XLOG logging of arbitrary binary blobs that + * get passed to the logical decoding plugin. In normal XLOG processing they + * are same as NOOP. + * + * These messages can be either transactional or non-transactional. + * Transactional messages are part of current transaction and will be sent to + * decoding plugin using in a same way as DML operations. + * Non-transactional messages are sent to the plugin at the time when the + * logical decoding reads them from XLOG. + * + * Every message carries prefix to avoid conflicts between different decoding + * plugins. The plugin authors must take extra care to use unique prefix, + * good options seems to be for example to use the name of the extension. + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" + +#include "catalog/indexing.h" + +#include "nodes/execnodes.h" + +#include "replication/message.h" +#include "replication/logical.h" + +#include "utils/memutils.h" + +/* + * Write logical decoding message into XLog. + */ +XLogRecPtr +LogLogicalMessage(const char *prefix, const char *message, size_t size, + bool transactional) +{ + xl_logical_message xlrec; + + /* + * Force xid to be allocated if we're emitting a transactional message. + */ + if (transactional) + { + Assert(IsTransactionState()); + GetCurrentTransactionId(); + } + + xlrec.transactional = transactional; + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.message_size = size; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage); + XLogRegisterData((char *) prefix, xlrec.prefix_size); + XLogRegisterData((char *) message, size); + + return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); +} + +/* + * Redo is basically just noop for logical decoding messages. + */ +void +logicalmsg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info != XLOG_LOGICAL_MESSAGE) + elog(PANIC, "logicalmsg_redo: unknown op code %u", info); + + /* This is only interesting for logical decoding, see decode.c. */ +} diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index f2b8f4b..1420ddd 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) change->data.tp.oldtuple = NULL; } break; + case REORDER_BUFFER_CHANGE_MESSAGE: + if (change->data.msg.prefix != NULL) + pfree(change->data.msg.prefix); + change->data.msg.prefix = NULL; + if (change->data.msg.message != NULL) + pfree(change->data.msg.message); + change->data.msg.message = NULL; + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: if (change->data.snapshot) { @@ -627,6 +635,61 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferCheckSerializeTXN(rb, txn); } +/* + * Queue message into a transaction so it can be processed upon commit. + */ +void +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, + Size message_size, const char *message) +{ + if (transactional) + { + MemoryContext oldcontext; + ReorderBufferChange *change; + + Assert(xid != InvalidTransactionId); + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_MESSAGE; + change->data.msg.prefix = pstrdup(prefix); + change->data.msg.message_size = message_size; + change->data.msg.message = palloc(message_size); + memcpy(change->data.msg.message, message, message_size); + + ReorderBufferQueueChange(rb, xid, lsn, change); + + MemoryContextSwitchTo(oldcontext); + } + else + { + ReorderBufferTXN *txn = NULL; + volatile Snapshot snapshot_now = snapshot; + + if (xid != InvalidTransactionId) + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* setup snapshot to allow catalog access */ + SetupHistoricSnapshot(snapshot_now, NULL); + PG_TRY(); + { + rb->message(rb, txn, lsn, false, prefix, message_size, message); + + TeardownHistoricSnapshot(false); + } + PG_CATCH(); + { + TeardownHistoricSnapshot(true); + PG_RE_THROW(); + } + PG_END_TRY(); + } +} + + static void AssertTXNLsnOrder(ReorderBuffer *rb) { @@ -1493,6 +1556,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_MESSAGE: + rb->message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: /* get rid of the old */ TeardownHistoricSnapshot(false); @@ -2159,6 +2229,33 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } break; } + case REORDER_BUFFER_CHANGE_MESSAGE: + { + char *data; + Size prefix_size = strlen(change->data.msg.prefix) + 1; + + sz += prefix_size + change->data.msg.message_size + + sizeof(Size) + sizeof(Size); + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* write the prefix including the size */ + memcpy(data, &prefix_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.msg.prefix, + prefix_size); + data += prefix_size; + + /* write the message including the size */ + memcpy(data, &change->data.msg.message_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.msg.message, + change->data.msg.message_size); + data += change->data.msg.message_size; + + break; + } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; @@ -2415,6 +2512,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } break; + case REORDER_BUFFER_CHANGE_MESSAGE: + { + Size prefix_size; + + /* read prefix */ + memcpy(&prefix_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.prefix = MemoryContextAlloc(rb->context, + prefix_size); + memcpy(change->data.msg.prefix, data, prefix_size); + Assert(change->data.msg.prefix[prefix_size-1] == '\0'); + data += prefix_size; + + /* read the messsage */ + memcpy(&change->data.msg.message_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.message = MemoryContextAlloc(rb->context, + change->data.msg.message_size); + memcpy(change->data.msg.message, data, + change->data.msg.message_size); + data += change->data.msg.message_size; + + break; + } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot oldsnap; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 179b85a..b4dc617 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -605,6 +605,25 @@ SnapBuildExportSnapshot(SnapBuild *builder) } /* + * Ensure there is a snapshot and if not build one for current transaction. + */ +Snapshot +SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid) +{ + Assert(builder->state == SNAPBUILD_CONSISTENT); + + /* only build a new snapshot if we don't have a prebuilt one */ + if (builder->snapshot == NULL) + { + builder->snapshot = SnapBuildBuildSnapshot(builder, xid); + /* inrease refcount for the snapshot builder */ + SnapBuildSnapIncRefcount(builder->snapshot); + } + + return builder->snapshot; +} + +/* * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is * any. Aborts the previously started transaction and resets the resource * owner back to its original value. diff --git a/src/bin/pg_xlogdump/.gitignore b/src/bin/pg_xlogdump/.gitignore index eebaf30..c4783f1 100644 --- a/src/bin/pg_xlogdump/.gitignore +++ b/src/bin/pg_xlogdump/.gitignore @@ -1,22 +1,4 @@ /pg_xlogdump # Source files copied from src/backend/access/rmgrdesc/ -/brindesc.c -/clogdesc.c -/committsdesc.c -/dbasedesc.c -/gindesc.c -/gistdesc.c -/hashdesc.c -/heapdesc.c -/mxactdesc.c -/nbtdesc.c -/relmapdesc.c -/replorigindesc.c -/seqdesc.c -/smgrdesc.c -/spgdesc.c -/standbydesc.c -/tblspcdesc.c -/xactdesc.c -/xlogdesc.c +/*desc.c /xlogreader.c diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c index f9cd395..6ba7f22 100644 --- a/src/bin/pg_xlogdump/rmgrdesc.c +++ b/src/bin/pg_xlogdump/rmgrdesc.c @@ -25,6 +25,7 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "replication/message.h" #include "replication/origin.h" #include "rmgrdesc.h" #include "storage/standbydefs.h" diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index fab912d..35c242d 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL) PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL) diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index a595327..3713739 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5114,6 +5114,10 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 DESCR("peek at changes from replication slot"); DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); DESCR("peek at binary changes from replication slot"); +DATA(insert OID = 3577 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ )); +DESCR("emit a textual logical decoding message"); +DATA(insert OID = 3578 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ )); +DESCR("emit a binary logical decoding message"); /* event triggers */ DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ )); diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h index c87a1df..5540414 100644 --- a/src/include/replication/logicalfuncs.h +++ b/src/include/replication/logicalfuncs.h @@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS); extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS); +extern Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS); #endif diff --git a/src/include/replication/message.h b/src/include/replication/message.h new file mode 100644 index 0000000..8b968d5 --- /dev/null +++ b/src/include/replication/message.h @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * message.h + * Exports from replication/logical/message.c + * + * Copyright (c) 2013-2016, PostgreSQL Global Development Group + * + * src/include/replication/message.h + *------------------------------------------------------------------------- + */ +#ifndef PG_LOGICAL_MESSAGE_H +#define PG_LOGICAL_MESSAGE_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" + +/* + * Generic logical decoding message wal record. + */ +typedef struct xl_logical_message +{ + bool transactional; /* is message transactional? */ + Size prefix_size; /* length of prefix */ + Size message_size; /* size of the message */ + char message[FLEXIBLE_ARRAY_MEMBER]; /* message including the null + * terminated prefix of length + * prefix_size */ +} xl_logical_message; + +#define SizeOfLogicalMessage (offsetof(xl_logical_message, message)) + +extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, + size_t size, bool transactional); + +/* RMGR API*/ +#define XLOG_LOGICAL_MESSAGE 0x00 +void logicalmsg_redo(XLogReaderState *record); +void logicalmsg_desc(StringInfo buf, XLogReaderState *record); +const char *logicalmsg_identify(uint8 info); + +#endif /* PG_LOGICAL_MESSAGE_H */ diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 577b12e..3a2ca98 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -74,6 +74,18 @@ typedef void (*LogicalDecodeCommitCB) ( XLogRecPtr commit_lsn); /* + * Called for the generic logical decoding messages. + */ +typedef void (*LogicalDecodeMessageCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + +/* * Filter changes by origin. */ typedef bool (*LogicalDecodeFilterByOriginCB) ( @@ -96,6 +108,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index b52d06a..4c54953 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -54,6 +54,7 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, REORDER_BUFFER_CHANGE_DELETE, + REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, @@ -98,6 +99,14 @@ typedef struct ReorderBufferChange ReorderBufferTupleBuf *newtuple; } tp; + /* Message with arbitrary data. */ + struct + { + char *prefix; + Size message_size; + char *message; + } msg; + /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ Snapshot snapshot; @@ -274,6 +283,15 @@ typedef void (*ReorderBufferCommitCB) ( ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +/* message callback signature */ +typedef void (*ReorderBufferMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, Size sz, + const char *message); + struct ReorderBuffer { /* @@ -300,6 +318,7 @@ struct ReorderBuffer ReorderBufferBeginCB begin; ReorderBufferApplyChangeCB apply_change; ReorderBufferCommitCB commit; + ReorderBufferMessageCB message; /* * Pointer that will be passed untouched to the callbacks. @@ -350,6 +369,9 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, + Size message_size, const char *message); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 75955af..c4127a1 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -63,6 +63,8 @@ extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate); extern void SnapBuildClearExportedSnapshot(void); extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate); +extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, + TransactionId xid); extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); -- 1.9.1