Re: Proposal: Generic WAL logical messages

From: Andres Freund <andres(at)anarazel(dot)de>
To: Petr Jelinek <petr(at)2ndquadrant(dot)com>
Cc: Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, pgsql-hackers(at)postgresql(dot)org, Artur Zakirov <a(dot)zakirov(at)postgrespro(dot)ru>, Simon Riggs <simon(at)2ndquadrant(dot)com>, Craig Ringer <craig(at)2ndquadrant(dot)com>
Subject: Re: Proposal: Generic WAL logical messages
Date: 2016-03-22 11:47:01
Message-ID: 20160322114701.GE3790@awork2.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
> Just noticed there is missing symlink in the pg_xlogdump.

> create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
> create mode 120000 src/bin/pg_xlogdump/logicalmsgdesc.c

Uh, src/bin/pg_xlogdump/logicalmsgdesc.c shouldn't be there. The symlink
is supposed to be automatically created by the Makefile.

Were you perhaps confused because it showed up in git status? If so,
that's probably because it isn't in
src/bin/pg_xlogdump/.gitignore. Perhaps we should change that file to
ignore *desc.c?

> + <row>
> + <entry id="pg-logical-emit-message-text">
> + <indexterm>
> + <primary>pg_logical_emit_message</primary>
> + </indexterm>
> + <literal><function>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
> + </entry>
> + <entry>
> + void
> + </entry>
> + <entry>
> + Write text logical decoding message. This can be used to pass generic
> + messages to logical decoding plugins through WAL. The parameter
> + <parameter>transactional</parameter> specifies if the message should
> + be part of current transaction or if it should be written immediately
> + and decoded as soon as the logical decoding reads the record. The
> + <parameter>prefix</parameter> is textual prefix used by the logical
> + decoding plugins to easily recognize interesting messages for them.
> + The <parameter>content</parameter> is the text of the message.
> + </entry>
> + </row>

s/write/emit/?

> +
> + <sect3 id="logicaldecoding-output-plugin-message">
> + <title>Generic Message Callback</title>
> +
> + <para>
> + The optional <function>message_cb</function> callback is called whenever
> + a logical decoding message has been decoded.
> +<programlisting>
> +typedef void (*LogicalDecodeMessageCB) (
> + struct LogicalDecodingContext *,
> + ReorderBufferTXN *txn,
> + XLogRecPtr message_lsn,
> + const char *prefix,
> + Size message_size,
> + const char *message
> +);

I see you removed the transactional parameter. I'm doubtful that that's
a good idea: It seems like it'd be rather helpful to pass the
transaction for a nontransaction message that's emitted while an xid was
assigned?

> +/*
> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
> + */
> +static void
> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> +{
> + SnapBuild *builder = ctx->snapshot_builder;
> + XLogReaderState *r = buf->record;
> + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
> + xl_logical_message *message;
> +
> + if (info != XLOG_LOGICAL_MESSAGE)
> + elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
> +
> + message = (xl_logical_message *) XLogRecGetData(r);
> +
> + if (message->transactional)
> + {
> + if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
> + return;
> +
> + ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
> + buf->endptr,
> + message->message, /* first part of message is prefix */
> + message->message_size,
> + message->message + message->prefix_size);
> + }
> + else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
> + !SnapBuildXactNeedsSkip(builder, buf->origptr))
> + {
> + volatile Snapshot snapshot_now;
> + ReorderBuffer *rb = ctx->reorder;
> +
> + /* setup snapshot to allow catalog access */
> + snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
> + SetupHistoricSnapshot(snapshot_now, NULL);
> + rb->message(rb, NULL, buf->origptr, message->message,
> + message->message_size,
> + message->message + message->prefix_size);
> + TeardownHistoricSnapshot(false);
> + }
> +}

A number of things:
1) The SnapBuildProcessChange needs to be toplevel, not just for
transactional messages - we can't yet necessarily build a snapshot.
2) I'm inclined to move even the non-transactional stuff to reorderbuffer.
3) This lacks error handling, we surely don't want to error out while
still having the historic snapshot setup
4) Without 3) the volatile is bogus.
5) Misses a ReorderBufferProcessXid() call.

> + * Every message carries prefix to avoid conflicts between different decoding
> + * plugins. The prefix has to be registered before the message using that
> + * prefix can be written to XLOG. The prefix can be registered exactly once to
> + * avoid situation where multiple third party extensions try to use same
> + * prefix.

Outdated afaics?

> @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
> change->data.tp.oldtuple = NULL;
> }
> break;
> + case REORDER_BUFFER_CHANGE_MESSAGE:
> + if (change->data.msg.prefix != NULL)
> + pfree(change->data.msg.prefix);
> + change->data.msg.prefix = NULL;
> + if (change->data.msg.message != NULL)
> + pfree(change->data.msg.message);
> + change->data.msg.message = NULL;
> + break;

Hm, this will have some overhead, but I guess the messages won't be
super frequent, and usually not very large.

> +/*
> + * Queue message into a transaction so it can be processed upon commit.
> + */
> +void
> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
> + const char *prefix, Size msg_sz, const char *msg)
> +{
> + ReorderBufferChange *change;
> +
> + Assert(xid != InvalidTransactionId);
> +
> + change = ReorderBufferGetChange(rb);
> + change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> + change->data.msg.prefix = pstrdup(prefix);
> + change->data.msg.message_size = msg_sz;
> + change->data.msg.message = palloc(msg_sz);
> + memcpy(change->data.msg.message, msg, msg_sz);
> +
> + ReorderBufferQueueChange(rb, xid, lsn, change);
> +}

I'm not sure right now if there's any guarantee that the current memory
context is meaningful here? IIRC other long-lived allocations explicitly
use a context?

> + case REORDER_BUFFER_CHANGE_MESSAGE:
> + {
> + char *data;
> + size_t prefix_size = strlen(change->data.msg.prefix) + 1;
> +
> + sz += prefix_size + change->data.msg.message_size;
> + ReorderBufferSerializeReserve(rb, sz);
> +
> + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
> + memcpy(data, change->data.msg.prefix,
> + prefix_size);
> + memcpy(data + prefix_size, change->data.msg.message,
> + change->data.msg.message_size);
> + break;
> + }

Can you please include the sizes of the blocks explicitly, rather than
relying on 0 termination?

> @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
> PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
> PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
> PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
> +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo,
> logicalmsg_desc, logicalmsg_identify, NULL, NULL)

Did you consider doing this via the standby rmgr instead?

> +typedef struct xl_logical_message
> +{
> + bool transactional; /* is message transactional? */
> + size_t prefix_size; /* length of prefix */
> + size_t message_size; /* size of the message */
> + char message[FLEXIBLE_ARRAY_MEMBER]; /* message including the null
> + * terminated prefx of length
> + * prefix_size */
> +} xl_logical_message;
>

"prefx".

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2016-03-22 11:48:10 Re: Timeline following for logical slots
Previous Message Etsuro Fujita 2016-03-22 11:35:49 Re: Odd system-column handling in postgres_fdw join pushdown patch