Re: Handle infinite recursion in logical replication setup

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: vignesh C <vignesh21(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(dot)oss(at)gmail(dot)com>, "kuroda(dot)hayato(at)fujitsu(dot)com" <kuroda(dot)hayato(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Handle infinite recursion in logical replication setup
Date: 2022-06-03 05:30:49
Message-ID: CAHut+PuTYrsRpDnyGSJ-Giwt1S9dUXRr4mAwSCx4viA-ndrqdw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Please find below my review comments for patch v17-0002:

======

1. Commit message

This patch adds a new SUBSCRIPTION parameter "origin". It Specifies whether
the subscription will request the publisher to only send changes that
originated locally, or to send any changes regardless of origin.

"It Specifies" -> "It specifies"

~~~

2. Commit message

Usage:
CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999'
PUBLICATION pub1 with (origin = local);

"with" -> "WITH"

======

3. doc/src/sgml/catalogs.sgml

+ <para>
+ Possible origin values are <literal>local</literal>,
+ <literal>any</literal>, or <literal>NULL</literal> if none is specified.
+ If <literal>local</literal>, the subscription will request the
+ publisher to only send changes that originated locally. If
+ <literal>any</literal>, the publisher sends any changes regardless of
+ their origin.
+ </para></entry>

Should this also mention that NULL (default) is equivalent to 'any'?

SUGGESTION
If <literal>any</literal> (or <literal>NULL</literal>), the publisher
sends any changes regardless of their origin.

======

4. src/backend/catalog/pg_subscription.c

@@ -72,6 +72,16 @@ GetSubscription(Oid subid, bool missing_ok)
sub->twophasestate = subform->subtwophasestate;
sub->disableonerr = subform->subdisableonerr;

+ datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_suborigin,
+ &isnull);
+
+ if (!isnull)
+ sub->origin = TextDatumGetCString(datum);
+ else
+ sub->origin = NULL;
+
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
tup,

Missing comment like the nearby code has:
/* Get origin */

======

5. src/backend/replication/logical/worker.c

+/* Macro for comparing string fields that might be NULL */
+#define equalstr(a, b) \
+ (((a) != NULL && (b) != NULL) ? (strcmp(a, b) == 0) : (a) == (b))
+

Should that have some extra parens for the macro args?
e.g. "strcmp((a), (b))"

~~~

6. src/backend/replication/logical/worker.c - maybe_reread_subscription

@@ -3059,6 +3063,7 @@ maybe_reread_subscription(void)
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
+ equalstr(newsub->origin, MySubscription->origin) ||
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{

Is that right? Shouldn't it say !equalstr(...)?

======

7. src/backend/replication/pgoutput/pgoutput.c - parse_output_parameters

@@ -380,6 +382,16 @@ parse_output_parameters(List *options, PGOutputData *data)

data->two_phase = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "origin") == 0)
+ {
+ if (origin_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ origin_option_given = true;
+
+ data->origin = defGetString(defel);
+ }

Should this function also be validating that the origin parameter
value is only permitted to be one of "local" or "any"?

~~~

8. src/backend/replication/pgoutput/pgoutput.c - pgoutput_origin_filter

@@ -1698,12 +1710,20 @@ pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
}

/*
- * Currently we always forward.
+ * Return true if the data source (origin) is remote and user has requested
+ * only local data, false otherwise.
*/
static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ if (data->origin &&
+ (strcmp(data->origin, "local") == 0) &&
+ origin_id != InvalidRepOriginId)
+ return true;
+
return false;
}

8a.
Could rewrite the condition by putting the strcmp last so you can
avoid doing unnecessary strcmp.

e.g
+ if (data->origin &&
+ origin_id != InvalidRepOriginId &&
+ strcmp(data->origin, "local" == 0)

8b.
I also wondered if it might be worth considering caching the origin
parameter value when it was parsed so that you can avoid doing any
strcmp at all during this function. Because otherwise this might be
called millions of times, right?

======

9. src/include/catalog/pg_subscription.h

@@ -87,6 +87,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId)
BKI_SHARED_RELATION BKI_ROW

/* List of publications subscribed to */
text subpublications[1] BKI_FORCE_NOT_NULL;
+
+ /* Publish the data originated from the specified origin */
+ text suborigin;
#endif
} FormData_pg_subscription;

SUGGESTION (for the comment text)
/* Only publish data originating from the specified origin */

~~~

10 src/include/catalog/pg_subscription.h - Subscription

@@ -118,6 +121,9 @@ typedef struct Subscription
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
List *publications; /* List of publication names to subscribe to */
+ char *origin; /* Publish the data originated from the
+ * specified origin */
+
} Subscription;

10a.
Reword comment same as suggested in review comment #9

10b.
Remove spurious blank line

======

11. src/include/replication/walreceiver.h

@@ -183,6 +183,8 @@ typedef struct
bool streaming; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
+ char *origin; /* Publish the data originated from the
+ * specified origin */
} logical;

Reword comment same as suggested in review comment #9

======

12. src/test/subscription/t/032_origin.pl

+# Test logical replication using origin option.

# Test the CREATE SUBSCRIPTION 'origin' parameter

~~~

13. src/test/subscription/t/032_origin.pl

+# check that the data published from node_C to node_B is not sent to node_A
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;");
+is($result, qq(11
+12), 'Remote data originating from another node (not the publisher)
is not replicated when origin option is local'
+);

"option" -> "parameter"

------
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Masahiko Sawada 2022-06-03 05:39:52 Re: Add index scan progress to pg_stat_progress_vacuum
Previous Message Peter Smith 2022-06-03 05:26:29 Re: Handle infinite recursion in logical replication setup