RE: Synchronizing slots from primary to standby

From: "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, shveta malik <shveta(dot)malik(at)gmail(dot)com>, "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Nisha Moond <nisha(dot)moond412(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org>
Subject: RE: Synchronizing slots from primary to standby
Date: 2023-12-12 09:47:49
Message-ID: OS0PR01MB57167FE05F08D20C0158E4B8948EA@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Monday, December 11, 2023 3:32 PM Peter Smith <smithpb2250(at)gmail(dot)com>
>
> Here are some review comments for v44-0001
>
> ======
> src/backend/replication/slot.c
>
>
> 1. ReplicationSlotCreate
>
> * during getting changes, if the two_phase option is enabled it can skip
> * prepare because by that time start decoding point has been moved. So
> the
> * user will only get commit prepared.
> + * failover: Allows the slot to be synced to physical standbys so that logical
> + * replication can be resumed after failover.
> */
> void
> ReplicationSlotCreate(const char *name, bool db_specific,
>
> ~
>
> /Allows the slot.../If enabled, allows the slot.../

Changed.

>
> ======
>
> 2. validate_standby_slots
>
> +validate_standby_slots(char **newval)
> +{
> + char *rawname;
> + List *elemlist;
> + ListCell *lc;
> + bool ok = true;
> +
> + /* Need a modifiable copy of string */ rawname = pstrdup(*newval);
> +
> + /* Verify syntax and parse string into list of identifiers */ if (!(ok
> + = SplitIdentifierString(rawname, ',', &elemlist)))
> + GUC_check_errdetail("List syntax is invalid.");
> +
> + /*
> + * If there is a syntax error in the name or if the replication slots'
> + * data is not initialized yet (i.e., we are in the startup process),
> + skip
> + * the slot verification.
> + */
> + if (!ok || !ReplicationSlotCtl)
> + {
> + pfree(rawname);
> + list_free(elemlist);
> + return ok;
> + }
>
>
> 2a.
> You don't need to initialize 'ok' during declaration because it is assigned
> immediately anyway.
>
> ~
>
> 2b.
> AFAIK assignment within a conditional like this is not a normal PG coding style
> unless there is no other way to do it.
>

Changed.

> ~
>
> 2c.
> /into list/into a list/
>
> SUGGESTION
> /* Verify syntax and parse string into a list of identifiers */ ok =
> SplitIdentifierString(rawname, ',', &elemlist); if (!ok)
> GUC_check_errdetail("List syntax is invalid.");
>
>

Changed.

> ~~~
>
> 3. assign_standby_slot_names
>
> + if (!SplitIdentifierString(standby_slot_names_cpy, ',',
> + &standby_slots)) {
> + /* This should not happen if GUC checked check_standby_slot_names. */
> + elog(ERROR, "list syntax is invalid"); }
>
> This error here and in validate_standby_slots() are different -- "list" versus
> "List".
>

The message has been changed to "invalid list syntax" to be consistent with other elog.

> ======
> src/backend/replication/walsender.c
>
>
> 4. WalSndFilterStandbySlots
>
>
> + foreach(lc, standby_slots_cpy)
> + {
> + char *name = lfirst(lc);
> + XLogRecPtr restart_lsn = InvalidXLogRecPtr; bool invalidated = false;
> + char *warningfmt = NULL;
> + ReplicationSlot *slot;
> +
> + slot = SearchNamedReplicationSlot(name, true);
> +
> + if (slot && SlotIsPhysical(slot))
> + {
> + SpinLockAcquire(&slot->mutex);
> + restart_lsn = slot->data.restart_lsn;
> + invalidated = slot->data.invalidated != RS_INVAL_NONE;
> + SpinLockRelease(&slot->mutex); }
> +
> + /* Continue if the current slot hasn't caught up. */ if (!invalidated
> + && !XLogRecPtrIsInvalid(restart_lsn) && restart_lsn < wait_for_lsn) {
> + /* Log warning if no active_pid for this physical slot */ if
> + (slot->active_pid == 0) ereport(WARNING, errmsg("replication slot
> + \"%s\" specified in parameter \"%s\" does
> not have active_pid",
> + name, "standby_slot_names"),
> + errdetail("Logical replication is waiting on the "
> + "standby associated with \"%s\"", name), errhint("Consider starting
> + standby associated with "
> + "\"%s\" or amend standby_slot_names", name));
> +
> + continue;
> + }
> + else if (!slot)
> + {
> + /*
> + * It may happen that the slot specified in standby_slot_names GUC
> + * value is dropped, so let's skip over it.
> + */
> + warningfmt = _("replication slot \"%s\" specified in parameter
> \"%s\" does not exist, ignoring");
> + }
> + else if (SlotIsLogical(slot))
> + {
> + /*
> + * If a logical slot name is provided in standby_slot_names, issue
> + * a WARNING and skip it. Although logical slots are disallowed in
> + * the GUC check_hook(validate_standby_slots), it is still
> + * possible for a user to drop an existing physical slot and
> + * recreate a logical slot with the same name. Since it is
> + * harmless, a WARNING should be enough, no need to error-out.
> + */
> + warningfmt = _("cannot have logical replication slot \"%s\" in
> parameter \"%s\", ignoring");
> + }
> + else if (XLogRecPtrIsInvalid(restart_lsn) || invalidated) {
> + /*
> + * Specified physical slot may have been invalidated, so there is no
> + point
> + * in waiting for it.
> + */
> + warningfmt = _("physical slot \"%s\" specified in parameter \"%s\"
> has been invalidated, ignoring");
> + }
> + else
> + {
> + Assert(restart_lsn >= wait_for_lsn);
> + }
>
> This if/else chain seems structured awkwardly. IMO it would be tidier to
> eliminate the NULL slot and IsLogicalSlot up-front, which would also simplify
> some of the subsequent conditions
>
> SUGGESTION
>
> slot = SearchNamedReplicationSlot(name, true);
>
> if (!slot)
> {
> ...
> }
> else if (SlotIsLogical(slot))
> {
> ...
> }
> else
> {
> Assert(SlotIsPhysical(slot))
>
> SpinLockAcquire(&slot->mutex);
> restart_lsn = slot->data.restart_lsn;
> invalidated = slot->data.invalidated != RS_INVAL_NONE;
> SpinLockRelease(&slot->mutex);
>
> if (XLogRecPtrIsInvalid(restart_lsn) || invalidated)
> {
> ...
> }
> else if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) && restart_lsn <
> wait_for_lsn)
> {
> ...
> }
> else
> {
> Assert(restart_lsn >= wait_for_lsn);
> }
> }
>

Changed.

> ~~~~
>
> 5. WalSndWaitForWal
>
> + else
> + {
> + /* already caught up and doesn't need to wait for standby_slots */
> break;
> + }
>
> /Already/already/
>

Changed.

> ======
> src/test/recovery/t/050_standby_failover_slots_sync.pl
>
>
> 6.
> +$subscriber1->safe_psql('postgres',
> + "CREATE TABLE tab_int (a int PRIMARY KEY);");
> +
> +# Create a subscription with failover = true
> +$subscriber1->safe_psql('postgres',
> + "CREATE SUBSCRIPTION regress_mysub1 CONNECTION
> '$publisher_connstr' "
> + . "PUBLICATION regress_mypub WITH (slot_name = lsub1_slot,
> failover = true);"
> +);
>
>
> Consider combining these DDL statements.
>

Changed.

> ~~~
>
> 7.
> +$subscriber2->safe_psql('postgres',
> + "CREATE TABLE tab_int (a int PRIMARY KEY);");
> +$subscriber2->safe_psql('postgres',
> + "CREATE SUBSCRIPTION regress_mysub2 CONNECTION
> '$publisher_connstr' "
> + . "PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);");
>
> Consider combining these DDL statements
>

Changed.

> ~~~
>
> 8.
> +# Stop the standby associated with specified physical replication slot
> +so that # the logical replication slot won't receive changes until the
> +standby slot's # restart_lsn is advanced or the slots is removed from
> +the standby_slot_names # list $publisher->safe_psql('postgres',
> +"TRUNCATE tab_int;"); $publisher->wait_for_catchup('regress_mysub1');
> +$standby1->stop;
>
> /with specified/with the specified/
>
> /or the slots is/or the slot is/
>

Changed.

> ~~~
>
> 9.
> +# Create some data on primary
>
> /on primary/on the primary/
>

Changed.

> ~~~
>
> 10.
> +$result =
> + $subscriber1->safe_psql('postgres', "SELECT count(*) = 10 FROM
> +tab_int;"); is($result, 't',
> + "subscriber1 doesn't get data as the sb1_slot doesn't catch up");
>
>
> I felt instead of checking for 10 maybe it's more consistent with the previous
> code to assign again that $primary_row_count variable to 20;
>
> Then check that those primary rows are not all yet received like:
>
> SELECT count(*) < $primary_row_count FROM tab_int;
>

I think we'd better check the accurate number here to make sure the number is what we expect.

> ~~~
>
> 11.
> +# Now that the standby lsn has advanced, primary must send the decoded
> +# changes to the subscription.
> +$publisher->wait_for_catchup('regress_mysub1');
> +$result =
> + $subscriber1->safe_psql('postgres', "SELECT count(*) = 20 FROM
> +tab_int;"); is($result, 't',
> + "subscriber1 gets data from primary after standby1 is removed from
> the standby_slot_names list"
> +);
>
> /primary must/the primary must/
>
> (continuing the suggestion from the previous review comment)
>
> Now this SQL can use the variable too:
>
> subscriber1->safe_psql('postgres', "SELECT count(*) =
> $primary_row_count FROM tab_int;");
>

Changed.

> ~~~
>
> 12.
> +
> +# Create another subscription enabling failover
> +$subscriber1->safe_psql('postgres',
> + "CREATE SUBSCRIPTION regress_mysub3 CONNECTION
> '$publisher_connstr' "
> + . "PUBLICATION regress_mypub WITH (slot_name = lsub3_slot,
> copy_data=false, failover = true, create_slot = false);"
> +);
>
>
> Maybe give some more information in that comment:
>
> SUGGESTION
> Create another subscription (using the same slot created above) that enables
> failover.
>

Added.

Best Regards,
Hou zj

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tomas Vondra 2023-12-12 10:01:38 Re: logical decoding and replication of sequences, take 2
Previous Message Zhijie Hou (Fujitsu) 2023-12-12 09:46:05 RE: Synchronizing slots from primary to standby