Re: Proposal: Generic WAL logical messages

From: Petr Jelinek <petr(at)2ndquadrant(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, pgsql-hackers(at)postgresql(dot)org, Artur Zakirov <a(dot)zakirov(at)postgrespro(dot)ru>, Simon Riggs <simon(at)2ndquadrant(dot)com>, Craig Ringer <craig(at)2ndquadrant(dot)com>
Subject: Re: Proposal: Generic WAL logical messages
Date: 2016-03-22 13:03:06
Message-ID: 56F1428A.7080506@2ndquadrant.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On 22/03/16 12:47, Andres Freund wrote:
> On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
>
>> +
>> + <sect3 id="logicaldecoding-output-plugin-message">
>> + <title>Generic Message Callback</title>
>> +
>> + <para>
>> + The optional <function>message_cb</function> callback is called whenever
>> + a logical decoding message has been decoded.
>> +<programlisting>
>> +typedef void (*LogicalDecodeMessageCB) (
>> + struct LogicalDecodingContext *,
>> + ReorderBufferTXN *txn,
>> + XLogRecPtr message_lsn,
>> + const char *prefix,
>> + Size message_size,
>> + const char *message
>> +);
>
> I see you removed the transactional parameter. I'm doubtful that that's
> a good idea: It seems like it'd be rather helpful to pass the
> transaction for a nontransaction message that's emitted while an xid was
> assigned?
>

Hmm but won't that give the output plugin even transactions that were
later aborted? That seems quite different behavior from how the txn
parameter works everywhere else.

>
>> +/*
>> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
>> + */
>> +static void
>> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>> +{
>> + SnapBuild *builder = ctx->snapshot_builder;
>> + XLogReaderState *r = buf->record;
>> + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
>> + xl_logical_message *message;
>> +
>> + if (info != XLOG_LOGICAL_MESSAGE)
>> + elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
>> +
>> + message = (xl_logical_message *) XLogRecGetData(r);
>> +
>> + if (message->transactional)
>> + {
>> + if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
>> + return;
>> +
>> + ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
>> + buf->endptr,
>> + message->message, /* first part of message is prefix */
>> + message->message_size,
>> + message->message + message->prefix_size);
>> + }
>> + else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
>> + !SnapBuildXactNeedsSkip(builder, buf->origptr))
>> + {
>> + volatile Snapshot snapshot_now;
>> + ReorderBuffer *rb = ctx->reorder;
>> +
>> + /* setup snapshot to allow catalog access */
>> + snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
>> + SetupHistoricSnapshot(snapshot_now, NULL);
>> + rb->message(rb, NULL, buf->origptr, message->message,
>> + message->message_size,
>> + message->message + message->prefix_size);
>> + TeardownHistoricSnapshot(false);
>> + }
>> +}
>
> A number of things:
> 1) The SnapBuildProcessChange needs to be toplevel, not just for
> transactional messages - we can't yet necessarily build a snapshot.

Nope, the snapshot state is checked in the else if.

> 2) I'm inclined to move even the non-transactional stuff to reorderbuffer.

Well, it's not doing anything with reorderbuffer but sure it can be done
(didn't do it in the attached though).

> 3) This lacks error handling, we surely don't want to error out while
> still having the historic snapshot setup
> 4) Without 3) the volatile is bogus.
> 5) Misses a ReorderBufferProcessXid() call.

Fixed (all 3 above).

>
>
>> @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
>> change->data.tp.oldtuple = NULL;
>> }
>> break;
>> + case REORDER_BUFFER_CHANGE_MESSAGE:
>> + if (change->data.msg.prefix != NULL)
>> + pfree(change->data.msg.prefix);
>> + change->data.msg.prefix = NULL;
>> + if (change->data.msg.message != NULL)
>> + pfree(change->data.msg.message);
>> + change->data.msg.message = NULL;
>> + break;
>
> Hm, this will have some overhead, but I guess the messages won't be
> super frequent, and usually not very large.

Yeah but since we don't really know the size of the future messages it's
hard to have some preallocated buffer for this so I dunno how else to do it.

>
>> +/*
>> + * Queue message into a transaction so it can be processed upon commit.
>> + */
>> +void
>> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
>> + const char *prefix, Size msg_sz, const char *msg)
>> +{
>> + ReorderBufferChange *change;
>> +
>> + Assert(xid != InvalidTransactionId);
>> +
>> + change = ReorderBufferGetChange(rb);
>> + change->action = REORDER_BUFFER_CHANGE_MESSAGE;
>> + change->data.msg.prefix = pstrdup(prefix);
>> + change->data.msg.message_size = msg_sz;
>> + change->data.msg.message = palloc(msg_sz);
>> + memcpy(change->data.msg.message, msg, msg_sz);
>> +
>> + ReorderBufferQueueChange(rb, xid, lsn, change);
>> +}
>
> I'm not sure right now if there's any guarantee that the current memory
> context is meaningful here? IIRC other long-lived allocations explicitly
> use a context?
>

I didn't find any explicit guarantee so I added one.

>> + case REORDER_BUFFER_CHANGE_MESSAGE:
>> + {
>> + char *data;
>> + size_t prefix_size = strlen(change->data.msg.prefix) + 1;
>> +
>> + sz += prefix_size + change->data.msg.message_size;
>> + ReorderBufferSerializeReserve(rb, sz);
>> +
>> + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
>> + memcpy(data, change->data.msg.prefix,
>> + prefix_size);
>> + memcpy(data + prefix_size, change->data.msg.message,
>> + change->data.msg.message_size);
>> + break;
>> + }
>
> Can you please include the sizes of the blocks explicitly, rather than
> relying on 0 termination?
>

Okay, I see I did that in WAL, no idea why I didn't do the same here.

>
>> @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
>> PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
>> PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
>> PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
>> +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo,
>> logicalmsg_desc, logicalmsg_identify, NULL, NULL)
>
> Did you consider doing this via the standby rmgr instead?
>

Yes in one of the first versions I did that but Simon didn't like that
in his review as this has nothing to do with standby.

--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services

Attachment Content-Type Size
logical-messages-2016-03-22.patch text/x-diff 40.9 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Fujii Masao 2016-03-22 13:06:06 Re: trivial typo in vacuum progress doc
Previous Message Andres Freund 2016-03-22 13:01:18 Re: NOT EXIST for PREPARE