From: | Andres Freund <andres(at)anarazel(dot)de> |
---|---|
To: | Petr Jelinek <petr(at)2ndquadrant(dot)com> |
Cc: | Artur Zakirov <a(dot)zakirov(at)postgrespro(dot)ru>, Simon Riggs <simon(at)2ndQuadrant(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>, Craig Ringer <craig(at)2ndquadrant(dot)com> |
Subject: | Re: Proposal: Generic WAL logical messages |
Date: | 2016-02-27 00:05:09 |
Message-ID: | 20160227000509.2zms6difabot7lys@alap3.anarazel.de |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Hi,
I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
not much documentation about what it actually is supposed to
acomplish. Afaics you're basically forced to use
shared_preload_libraries with it right now? Also, iterating through a
linked list everytime something is logged doesn't seem very satisfying?
On 2016-02-24 18:35:16 +0100, Petr Jelinek wrote:
> +SET synchronous_commit = on;
> +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
> + ?column?
> +----------
> + init
> +(1 row)
> +SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
> + ?column?
> +----------
> + msg1
> +(1 row)
Hm. Somehow 'sending' a message seems wrong here. Maybe 'emit'?
> + <row>
> + <entry id="pg-logical-send-message-text">
> + <indexterm>
> + <primary>pg_logical_send_message</primary>
> + </indexterm>
> + <literal><function>pg_logical_send_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
> + </entry>
> + <entry>
> + void
> + </entry>
> + <entry>
> + Write text logical decoding message. This can be used to pass generic
> + messages to logical decoding plugins through WAL. The parameter
> + <parameter>transactional</parameter> specifies if the message should
> + be part of current transaction or if it should be written and decoded
> + immediately. The <parameter>prefix</parameter> has to be prefix which
> + was registered by a plugin. The <parameter>content</parameter> is
> + content of the message.
> + </entry>
> + </row>
It's not decoded immediately, even if emitted non-transactionally.
> + <sect3 id="logicaldecoding-output-plugin-message">
> + <title>Generic Message Callback</title>
> +
> + <para>
> + The optional <function>message_cb</function> callback is called whenever
> + a logical decoding message has been decoded.
> +<programlisting>
> +typedef void (*LogicalDecodeMessageCB) (
> + struct LogicalDecodingContext *,
> + ReorderBufferTXN *txn,
> + XLogRecPtr message_lsn,
> + bool transactional,
> + const char *prefix,
> + Size message_size,
> + const char *message
> +);
We should at least document what txn is set to if not transactional.
> +void
> +logicalmsg_desc(StringInfo buf, XLogReaderState *record)
> +{
> + char *rec = XLogRecGetData(record);
> + xl_logical_message *xlrec = (xl_logical_message *) rec;
> +
> + appendStringInfo(buf, "%s message size %zu bytes",
> + xlrec->transactional ? "transactional" : "nontransactional",
> + xlrec->message_size);
> +}
Shouldn't we check
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if XLogRecGetInfo(record) == XLOG_LOGICAL_MESSAGE
here?
> +const char *
> +logicalmsg_identify(uint8 info)
> +{
> + return NULL;
> +}
Huh?
> +void
> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
> + bool transactional, const char *prefix, Size msg_sz,
> + const char *msg)
> +{
> + ReorderBufferTXN *txn = NULL;
> +
> + if (transactional)
> + {
> + ReorderBufferChange *change;
> +
> + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
> +
> + Assert(xid != InvalidTransactionId);
> + Assert(txn != NULL);
> +
> + change = ReorderBufferGetChange(rb);
> + change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> + change->data.msg.transactional = true;
> + change->data.msg.prefix = pstrdup(prefix);
> + change->data.msg.message_size = msg_sz;
> + change->data.msg.message = palloc(msg_sz);
> + memcpy(change->data.msg.message, msg, msg_sz);
> +
> + ReorderBufferQueueChange(rb, xid, lsn, change);
> + }
> + else
> + {
> + rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
> + }
> +}
This approach prohibts catalog access when processing a nontransaction
message as there's no snapshot set up.
> + case REORDER_BUFFER_CHANGE_MESSAGE:
> + {
> + char *data;
> + size_t prefix_size = strlen(change->data.msg.prefix) + 1;
> +
> + sz += prefix_size + change->data.msg.message_size;
> + ReorderBufferSerializeReserve(rb, sz);
> +
> + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
> + memcpy(data, change->data.msg.prefix,
> + prefix_size);
> + memcpy(data + prefix_size, change->data.msg.message,
> + change->data.msg.message_size);
> + break;
> + }
> case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
> {
> Snapshot snap;
> @@ -2354,6 +2415,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
> data += len;
> }
> break;
> + case REORDER_BUFFER_CHANGE_MESSAGE:
> + {
> + Size message_size = change->data.msg.message_size;
> + Size prefix_size = strlen(data) + 1;
> +
> + change->data.msg.prefix = pstrdup(data);
> + change->data.msg.message = palloc(message_size);
> + memcpy(change->data.msg.message, data + prefix_size,
> + message_size);
> +
> + data += prefix_size + message_size;
> + }
Please add a test exercising these paths.
Greetings,
Andres Freund
From | Date | Subject | |
---|---|---|---|
Next Message | Andres Freund | 2016-02-27 00:07:41 | Re: proposal: get oldest LSN - function |
Previous Message | Andres Freund | 2016-02-26 23:44:14 | Re: Move PinBuffer and UnpinBuffer to atomics |