Re: Handle infinite recursion in logical replication setup

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: vignesh C <vignesh21(at)gmail(dot)com>
Cc: "kuroda(dot)hayato(at)fujitsu(dot)com" <kuroda(dot)hayato(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Handle infinite recursion in logical replication setup
Date: 2022-03-07 08:57:47
Message-ID: CAHut+PvWVnv2Zqt8C--wYkDWe2F=1m-aiGOqAMM4MfjAB+we2w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi Vignesh,

Here are some review comments for patch v2.

======

1. Question about syntax

I already posted some questions about why the syntax is on the CREATE
SUBSCRCRIBER side.
IMO "local_only" is a publisher option, so it seemed more natural to
me for it to be specified as a "publish" option.

Ref [1] my original question + suggestion for Option 2
Ref [2] some other examples of subscribing to multiple-publishers

Anyway, +1 to see what other people think.

~~~

2. ALTER

(related also to the question about syntax)

If subscribing to multiple publications then ALTER is going to change
the 'local_only' for all of them, which might not be what you want
(??)

~~~

3. subscription_parameter

(related also to the question about syntax)

CREATE SUBSCRIPTION subscription_name
CONNECTION 'conninfo'
PUBLICATION publication_name [, ...]
[ WITH ( subscription_parameter [= value] [, ... ] ) ]

~

That WITH is for *subscription* options, not the publication options.

So IMO 'local_only' intuitively seems like "local" means local where
the subscriber is.

So, if the Option 1 syntax is chosen (see comment #1) then I think the
option name maybe should change to be something more like
'publish_local_only' or something similar to be more clear what local
actually means.

~~~

4. contrib/test_decoding/test_decoding.c

@@ -484,6 +487,16 @@ pg_decode_filter(LogicalDecodingContext *ctx,
return false;
}

+static bool
+pg_decode_filter_remotedata(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (data->only_local && origin_id != InvalidRepOriginId)
+ return true;
+ return false;
+}

4a. Maybe needs function comment.

4b. Missing blank line following this function

~~~

5. General - please check all of the patch.

There seems inconsistency with the member names, local variable names,
parameter names etc. There are all variations of:

- only_local
- onlylocaldata
- onlylocal_data
- etc

Please try using the same name everywhere for everything if possible.

~~~

6. src/backend/replication/logical/decode.c - FilterRemoteOriginData

@@ -585,7 +594,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
message = (xl_logical_message *) XLogRecGetData(r);

if (message->dbId != ctx->slot->data.database ||
- FilterByOrigin(ctx, origin_id))
+ FilterByOrigin(ctx, origin_id) ||
+ FilterRemoteOriginData(ctx, origin_id))
return;

I noticed that every call to FilterRemoteOriginData has an associated
preceding call to FilterByOrigin. It might be worth just combining the
logic into FilterByOrigin. Then none of that calling code (9 x places)
would need to change at all.

~~~

7. src/backend/replication/logical/logical.c - CreateInitDecodingContext

@@ -451,6 +453,8 @@ CreateInitDecodingContext(const char *plugin,
*/
ctx->twophase &= slot->data.two_phase;

+ ctx->onlylocal_data &= slot->data.onlylocal_data;

The equivalent 'twophase' option had a big comment. Probably this new
option should also have a similar comment?

~~~

8. src/backend/replication/logical/logical.c - filter_remotedata_cb_wrapper

+bool
+filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ Assert(!ctx->fast_forward);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "filter_remoteorigin";

There is no consistency between the function and the name:

"filter_remoteorigin" versus filter_remotedata_cb.

A similar inconsistency for this is elsewhere. See review comment #9

~~~

9. src/backend/replication/pgoutput/pgoutput.c

@@ -215,6 +217,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
+ cb->filter_remotedata_cb = pgoutput_remoteorigin_filter;

Inconsistent names for the member and function.

filter_remotedata_cb VS pgoutput_remoteorigin_filter.

~~~

10. src/backend/replication/pgoutput/pgoutput.c

@@ -1450,6 +1465,16 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
return false;
}

+static bool
+pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ if (data->onlylocal_data && origin_id != InvalidRepOriginId)
+ return true;
+ return false;
+}
/*
* Shutdown the output plugin.
*

10a. Add a function comment.

10b. Missing blank line after the function

~~~

11. src/backend/replication/slotfuncs.c - pg_create_logical_replication_slot

@@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2);
bool two_phase = PG_GETARG_BOOL(3);
+ bool onlylocal_data = PG_GETARG_BOOL(4);
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;

Won't there be some PG Docs needing to be updated now there is another
parameter?

~~~

12. src/include/catalog/pg_proc.dat - pg_get_replication_slots

I did not see any update for pg_get_replication_slots, but you added
the 4th parameter elsewhere. Is something missing here?

~~~

13. src/include/replication/logical.h

@@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext
*/
bool twophase_opt_given;

+ bool onlylocal_data;
+

I think the new member needs some comment.

~~~

14. src/include/replication/walreceiver.h

@@ -183,6 +183,7 @@ typedef struct
bool streaming; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
+ bool onlylocal_data;
} logical;
} proto;
} WalRcvStreamOptions;

I think the new member needs some comment.

~~~

15. src/test/regress/sql/subscription.sql

ALTER SUBSCRIPTION test missing?

------
[1] https://www.postgresql.org/message-id/CAHut%2BPsAWaETh9VMymbBfMrqiE1KuqMq%2BwpBg0s7eMzwLATr%2Bw%40mail.gmail.com
[2] https://www.postgresql.org/message-id/CAHut%2BPvQonJd5epJBM0Yfh1499mL9kTL9a%3DGrMhvnL6Ok05zqw%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Kapila 2022-03-07 09:31:15 Re: Handle infinite recursion in logical replication setup
Previous Message Amit Kapila 2022-03-07 08:54:43 Re: Handle infinite recursion in logical replication setup