From: | Peter Smith <smithpb2250(at)gmail(dot)com> |
---|---|
To: | vignesh C <vignesh21(at)gmail(dot)com> |
Cc: | "kuroda(dot)hayato(at)fujitsu(dot)com" <kuroda(dot)hayato(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org> |
Subject: | Re: Handle infinite recursion in logical replication setup |
Date: | 2022-03-07 08:57:47 |
Message-ID: | CAHut+PvWVnv2Zqt8C--wYkDWe2F=1m-aiGOqAMM4MfjAB+we2w@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Hi Vignesh,
Here are some review comments for patch v2.
======
1. Question about syntax
I already posted some questions about why the syntax is on the CREATE
SUBSCRCRIBER side.
IMO "local_only" is a publisher option, so it seemed more natural to
me for it to be specified as a "publish" option.
Ref [1] my original question + suggestion for Option 2
Ref [2] some other examples of subscribing to multiple-publishers
Anyway, +1 to see what other people think.
~~~
2. ALTER
(related also to the question about syntax)
If subscribing to multiple publications then ALTER is going to change
the 'local_only' for all of them, which might not be what you want
(??)
~~~
3. subscription_parameter
(related also to the question about syntax)
CREATE SUBSCRIPTION subscription_name
CONNECTION 'conninfo'
PUBLICATION publication_name [, ...]
[ WITH ( subscription_parameter [= value] [, ... ] ) ]
~
That WITH is for *subscription* options, not the publication options.
So IMO 'local_only' intuitively seems like "local" means local where
the subscriber is.
So, if the Option 1 syntax is chosen (see comment #1) then I think the
option name maybe should change to be something more like
'publish_local_only' or something similar to be more clear what local
actually means.
~~~
4. contrib/test_decoding/test_decoding.c
@@ -484,6 +487,16 @@ pg_decode_filter(LogicalDecodingContext *ctx,
return false;
}
+static bool
+pg_decode_filter_remotedata(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (data->only_local && origin_id != InvalidRepOriginId)
+ return true;
+ return false;
+}
4a. Maybe needs function comment.
4b. Missing blank line following this function
~~~
5. General - please check all of the patch.
There seems inconsistency with the member names, local variable names,
parameter names etc. There are all variations of:
- only_local
- onlylocaldata
- onlylocal_data
- etc
Please try using the same name everywhere for everything if possible.
~~~
6. src/backend/replication/logical/decode.c - FilterRemoteOriginData
@@ -585,7 +594,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf)
message = (xl_logical_message *) XLogRecGetData(r);
if (message->dbId != ctx->slot->data.database ||
- FilterByOrigin(ctx, origin_id))
+ FilterByOrigin(ctx, origin_id) ||
+ FilterRemoteOriginData(ctx, origin_id))
return;
I noticed that every call to FilterRemoteOriginData has an associated
preceding call to FilterByOrigin. It might be worth just combining the
logic into FilterByOrigin. Then none of that calling code (9 x places)
would need to change at all.
~~~
7. src/backend/replication/logical/logical.c - CreateInitDecodingContext
@@ -451,6 +453,8 @@ CreateInitDecodingContext(const char *plugin,
*/
ctx->twophase &= slot->data.two_phase;
+ ctx->onlylocal_data &= slot->data.onlylocal_data;
The equivalent 'twophase' option had a big comment. Probably this new
option should also have a similar comment?
~~~
8. src/backend/replication/logical/logical.c - filter_remotedata_cb_wrapper
+bool
+filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ Assert(!ctx->fast_forward);
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "filter_remoteorigin";
There is no consistency between the function and the name:
"filter_remoteorigin" versus filter_remotedata_cb.
A similar inconsistency for this is elsewhere. See review comment #9
~~~
9. src/backend/replication/pgoutput/pgoutput.c
@@ -215,6 +217,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
+ cb->filter_remotedata_cb = pgoutput_remoteorigin_filter;
Inconsistent names for the member and function.
filter_remotedata_cb VS pgoutput_remoteorigin_filter.
~~~
10. src/backend/replication/pgoutput/pgoutput.c
@@ -1450,6 +1465,16 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
return false;
}
+static bool
+pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ if (data->onlylocal_data && origin_id != InvalidRepOriginId)
+ return true;
+ return false;
+}
/*
* Shutdown the output plugin.
*
10a. Add a function comment.
10b. Missing blank line after the function
~~~
11. src/backend/replication/slotfuncs.c - pg_create_logical_replication_slot
@@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2);
bool two_phase = PG_GETARG_BOOL(3);
+ bool onlylocal_data = PG_GETARG_BOOL(4);
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;
Won't there be some PG Docs needing to be updated now there is another
parameter?
~~~
12. src/include/catalog/pg_proc.dat - pg_get_replication_slots
I did not see any update for pg_get_replication_slots, but you added
the 4th parameter elsewhere. Is something missing here?
~~~
13. src/include/replication/logical.h
@@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext
*/
bool twophase_opt_given;
+ bool onlylocal_data;
+
I think the new member needs some comment.
~~~
14. src/include/replication/walreceiver.h
@@ -183,6 +183,7 @@ typedef struct
bool streaming; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
+ bool onlylocal_data;
} logical;
} proto;
} WalRcvStreamOptions;
I think the new member needs some comment.
~~~
15. src/test/regress/sql/subscription.sql
ALTER SUBSCRIPTION test missing?
------
[1] https://www.postgresql.org/message-id/CAHut%2BPsAWaETh9VMymbBfMrqiE1KuqMq%2BwpBg0s7eMzwLATr%2Bw%40mail.gmail.com
[2] https://www.postgresql.org/message-id/CAHut%2BPvQonJd5epJBM0Yfh1499mL9kTL9a%3DGrMhvnL6Ok05zqw%40mail.gmail.com
Kind Regards,
Peter Smith.
Fujitsu Australia
From | Date | Subject | |
---|---|---|---|
Next Message | Amit Kapila | 2022-03-07 09:31:15 | Re: Handle infinite recursion in logical replication setup |
Previous Message | Amit Kapila | 2022-03-07 08:54:43 | Re: Handle infinite recursion in logical replication setup |