Re: Add a hook for handling logical decoding messages on subscribers.

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>
Cc: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Add a hook for handling logical decoding messages on subscribers.
Date: 2026-06-23 08:52:29
Message-ID: CAA4eK1JC9+R4S7L7y3DazOOMpnANVyCpsx84RSYjX6vfxLdK6w@mail.gmail.com
Views: Whole Thread | Raw Message | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tue, Jun 23, 2026 at 4:09 AM Bharath Rupireddy
<bharath(dot)rupireddyforpostgres(at)gmail(dot)com> wrote:
>
> On Fri, Jun 19, 2026 at 3:34 PM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
> >
> > Hi all,
> >
> > Commit ac4645c015 allows pgoutput to send logical decoding messages,
> > but it's limited to applications that use the pgoutput plugin -- the
> > built-in logical replication doesn't use it. I'd like to propose
> > introducing a hook to the logical replication message handling so that
> > extensions can plug in their own handling routine. This feature can be
> > used for extensions to implement DDL replication, function
> > replication, or trigger user-specific routines on the subscriber side.
>
> Thanks for working on this!
>
> > I've attached the PoC patch; it adds a hook function, and adds a new
> > 'message' subscription option that allows the user to request the
> > publisher to send logical decoding messages. Therefore, users need to
> > enable the 'message' option and set up the hook function at server
> > startup in order to receive the messages and trigger the hook
> > function.
>
> I understand the intent of the proposal, but I'd like to get the
> bigger picture first.
>
> Do we have any external modules that actually implement DDL
> replication (or any of the listed use-cases) with a similar hook? Or
> any existing discussion? I could be missing something because I
> haven't looked at all the DDL replication related threads.
>
> Another thing I'm curious about - why a hook? Is the plan to implement
> DDL replication as an external module rather than in core? If DDL
> replication eventually gets into core, I'd expect it to be apply-side
> logic executing the decoded DDL messages directly, not something going
> through a hook.
>

I think it is important to have some example extension implementation
to see how the hook could be utilized. One more use of such a hook
could be to use for audit of DDLs replayed on subscribers. BTW, can we
also consider it as a solution implementing basic DDL replication for
tables? The key question is what if someday we have in-core DDL
replication. I think extensions can still be used to implement
filtering or transformation of DDL. We can implement capture of DDL
using JSON format [1] so that it is forward compatible with in-core
DDL replication. So considering that, the extension handlers will look
like:

_PG_init(void)
{
/* Publisher side */
prev_ProcessUtility = ProcessUtility_hook;
ProcessUtility_hook = ddlrep_ProcessUtility;

.....
/* Subscriber side */
prev_message_handler = logical_message_handler;
logical_message_handler = ddlrep_message_handler;

static void
ddlrep_ProcessUtility(PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc)
{
bool replicate = should_replicate(pstmt);

/*
* Execute the DDL first.
*/
if (prev_ProcessUtility)
prev_ProcessUtility(pstmt, queryString, readOnlyTree, context,
params, queryEnv, dest, qc);
else
standard_ProcessUtility(pstmt, queryString, readOnlyTree, context,
params, queryEnv, dest, qc);

if (replicate)
{
const char *tag =
GetCommandTagName(CreateCommandTag(pstmt->utilityStmt));
char *msg = build_ddl_message(tag, queryString);

/*
* transactional = true → message held in ReorderBuffer, emitted
* to subscribers at COMMIT in WAL order relative to any DML in
* the same transaction.
*
* omit_lsn = false → include the LSN so the subscriber can log
* exactly which WAL position a given DDL came from.
*/
LogLogicalMessage("pg_ddl", msg, strlen(msg),
true /* transactional */,
false /* omit_lsn */);

...

static void
ddlrep_message_handler(const char *prefix,
Size sz,
const char *message,
bool transactional,
XLogRecPtr lsn)
{
char *payload;
char *ddl;
char *search_path;
StringInfoData cmd;
int spi_rc;

/*
* Always pass through to the previous handler first. This ensures
* correct behaviour when chained with other extensions.
*/
if (prev_message_handler)
prev_message_handler(prefix, sz, message, transactional, lsn);

if (strcmp(prefix, "pg_ddl") != 0)
return;

/* Write code to execute/perform DDL. */

When we have a built-in handler then the apply worker carefully
registers the same and gives an ERROR if the extension one is already
registered.

void
ApplyWorkerMain(Datum main_arg)
{
/* ... existing initialisation ... */

/*
* If the subscription requests built-in DDL replication and an
* extension has also registered a logical message hook, both would
* process the same "pg_ddl" messages and execute DDL twice.
* Refuse to start rather than silently corrupt.
*/
if (MySubscription->ddloption != DDL_OPTION_NONE &&
logical_message_hook != NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
....

/*
* No conflict — register the built-in DDL handler when the
* subscription requests DDL replication and no extension owns
* the hook.
*/
if (MySubscription->ddloption != DDL_OPTION_NONE)
logical_message_hook = builtin_ddl_message_handler;

[1]
The JSON format for WAL could be of form to keep it extendable:

{
"version": 1,
"command_tag": "CREATE TABLE",
"object_type": "table",
"schema": "public",
"identity": "public.foo",
"ddl_text": "CREATE TABLE public.foo (id int PRIMARY KEY)",
"search_path": "public"
}

>
> Why not a hook at apply_dispatch to give external modules more freedom
> with the pgoutput plugin?
>

What advantage do you see with the same?

--
With Regards,
Amit Kapila.

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message vignesh C 2026-06-23 08:52:41 Re: Proposal: Conflict log history table for Logical Replication
Previous Message Dean Rasheed 2026-06-23 08:43:27 Re: Add PRODUCT() aggregate function