Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?

From: Euler Taveira <euler(dot)taveira(at)2ndquadrant(dot)com>
To: David Pirotte <dpirotte(at)gmail(dot)com>
Cc: Ashutosh Bapat <ashutosh(dot)bapat(dot)oss(at)gmail(dot)com>, Dave Cramer <davecramer(at)gmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Andres Freund <andres(at)anarazel(dot)de>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>, ashutosh(dot)bapat(at)enterprisedb(dot)com
Subject: Re: Any objections to implementing LogicalDecodeMessageCB for pgoutput?
Date: 2020-11-25 03:28:30
Message-ID: CAH503wB6nEEvtkQKJQEw3RH2GYq+tW156bf06in5Lwzj_4ZU7w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Wed, 18 Nov 2020 at 03:04, David Pirotte <dpirotte(at)gmail(dot)com> wrote:

> On Fri, Nov 6, 2020 at 7:05 AM Ashutosh Bapat <
> ashutosh(dot)bapat(dot)oss(at)gmail(dot)com> wrote:
>
>> +/*
>> + * Write MESSAGE to stream
>> + */
>> +void
>> +logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn,
>> XLogRecPtr lsn,
>> + bool transactional, const char *prefix, Size sz,
>> + const char *message)
>> +{
>> + uint8 flags = 0;
>> +
>> + pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
>> +
>>
>> Similar to the UPDATE/DELETE/INSERT records decoded when streaming is
>> being
>> used, we need to add transaction id for transactional messages. May be we
>> add
>> that even in case of non-streaming case and use it to decide whether it's
>> a
>> transactional message or not. That might save us a byte when we are
>> adding a
>> transaction id.
>>
>
> I also reviewed your patch. This feature would be really useful for
replication
scenarios. Supporting this feature means that you don't need to use a table
to
pass messages from one node to another one. Here are a few comments/ideas.

@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s);
return;

+ case LOGICAL_REP_MSG_MESSAGE:
+ return;
+

I added a comment explaining that this message is not used by logical
replication but it could possibly be useful for other applications using
pgoutput. See patch 0003.

Andres mentioned in this thread [1] that we could simplify the
parse_output_parameters. I refactored this function to pass only
PGOutputData
to it and also move enable_streaming to this struct. I use a similar
approach
in wal2json; it is easier to get the options since it is available in the
logical decoding context. See patch 0004.

> My preference is to add in the xid when streaming is enabled. (1) It is a
> more consistent implementation with the other message types, and (2) it
> saves 3 bytes when streaming is disabled. I've attached an updated patch.
> It is not a strong preference, though, if you suggest a different approach.
>
>
I agree with this approach. xid is available in the BEGIN message if the
MESSAGE is transactional. For non-transactional messages, xid is not
available.
Your implementation is not consistent with the other pgoutput_XXX functions
that check in_streaming in the pgoutput_XXX and pass parameters to other
functions that require it. See patch 005.

The last patch 0006 overhauls your tests. I added/changed some comments,
replaced identifiers with uppercase letters, used 'pgoutput' as prefix,
checked
the prefix, and avoided a checkpoint during the test. There are possibly
other
improvements that I didn't mention here. Maybe you can use
encode(substr(data,
1, 1), 'escape') instead of comparing the ASCII code (77).

> Should we add the logical message to the WAL downstream so that it flows
>>
> further down to a cascaded logical replica. Should that be controlled
>> by an option?
>>
>
> Hmm, I can't think of a use case for this, but perhaps someone could. Do
> you, or does anyone, have something in mind? I think we provide a lot of
> value with logical messages in pgoutput without supporting consumption from
> a downstream replica, so perhaps this is better considered separately.
>
> If we want this, I think we would add a "messages" option on the
> subscription. If present, the subscriber will receive messages and pass
> them to any downstream subscribers. I started working on this and it does
> expand the change's footprint. As is, a developer would consume messages by
> connecting to a pgoutput slot on the message's origin. (e.g. via Debezium
> or a custom client) The subscription and logical worker infrastructure
> don't know about messages, but they would need to in order to support
> consuming an origin's messages on a downstream logical replica. In
> any case, I'll keep working on it so we can see what it looks like.
>
> The decision to send received messages to downstream nodes should be made
by
the subscriber. If the subscriber wants to replicate messages to downstream
nodes, the worker should call LogLogicalMessage.

This does not belong to this patch but when/if this patch is committed, I
will
submit a patch to filter messages by prefix. wal2json has a similar
(filter-msg-prefixes / add-msg-prefixes) feature and it is useful for cases
where you are handling multiple output plugins like wal2json and pgoutput.
The
idea is to avoid sending useless messages to some node that (i) don't know
how
to process it and (ii) has no interest in it.

PS> I'm attaching David's patches (0001 and 0002) again to keep cfbot happy.

[1]
https://www.postgresql.org/message-id/20200908191823.pmsoobzearkrmtg4%40alap3.anarazel.de

--
Euler Taveira http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Attachment Content-Type Size
0001-Add-logical-decoding-messages-to-pgoutput.patch text/x-patch 13.7 KB
0003-Explain-why-this-message-is-ignored.patch text/x-patch 1017 bytes
0005-Adjust-in_streaming-for-messages.patch text/x-patch 3.4 KB
0002-Add-xid-to-messages-when-streaming.patch text/x-patch 2.9 KB
0004-Simplify-parse_output_parameters-function.patch text/x-patch 4.6 KB
0006-Overhaul-tests.patch text/x-patch 9.3 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2020-11-25 03:41:25 Re: Add table access method as an option to pgbench
Previous Message Michael Paquier 2020-11-25 03:21:57 Re: Removal of currtid()/currtid2() and some table AM cleanup