Re: Issue with logical replication slot during switchover

From: Fabrice Chapuis <fabrice636861(at)gmail(dot)com>
To: shveta malik <shveta(dot)malik(at)gmail(dot)com>
Cc: PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Issue with logical replication slot during switchover
Date: 2025-08-27 14:24:53
Message-ID: CAA5-nLAvyZiAZt65qB-Vr3tuC-syo6VaHMbRYmoiX49-+BBLcw@mail.gmail.com
Views: Whole Thread | Raw Message | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

For the first step (a), the pg_create_logical_replication_slot interface is
extended.
The slot on the new attached standby will be dropped and recreated if the
flag allow_overwrite is set to true.
I tested the modified source, could you please give me a feedback on code
changes.

Regards,

Fabrice

diff --git a/src/backend/catalog/system_functions.sql
b/src/backend/catalog/system_functions.sql
index 566f308..6cd3175 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION
pg_create_logical_replication_slot(
IN temporary boolean DEFAULT false,
IN twophase boolean DEFAULT false,
IN failover boolean DEFAULT false,
+ IN allow_overwrite boolean DEFAULT false,
OUT slot_name name, OUT lsn pg_lsn)
RETURNS RECORD
LANGUAGE INTERNAL
diff --git a/src/backend/replication/logical/slotsync.c
b/src/backend/replication/logical/slotsync.c
index 656e66e..d6332cd 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -627,6 +627,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid
remote_dbid)
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
bool slot_updated = false;
+ bool allow_overwrite = false;

/*
* Make sure that concerned WAL is received and flushed before
syncing
@@ -649,24 +650,46 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid
remote_dbid)

return false;
}
-
- /* Search for the named slot */
+ // Both local and remote slot have the same name
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
{
- bool synced;
+ bool synced;

SpinLockAcquire(&slot->mutex);
synced = slot->data.synced;
+ allow_overwrite = slot->data.allow_overwrite;
SpinLockRelease(&slot->mutex);
-
- /* User-created slot with the same name exists, raise
ERROR. */
- if (!synced)
- ereport(ERROR,
+
+ if (!synced){
+ /*
+ * Check if we need to overwrite an existing
+ * logical slot
+ */
+ if (allow_overwrite){
+ /*
+ * Get rid of a replication slot that is no
+ *longer wanted
+ */
+ ReplicationSlotDrop(remote_slot->name,true);
+
+ /* Get rid of a replication slot that is no
longer wanted */
+ ereport(WARNING,
+
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("slot \"%s\" already exists"
+ " on the standby but it will be dropped
because flag allow_overwrite is set to true",
+ remote_slot->name));
+
+ /* Going back to the main loop after
droping the failover slot */
+ return false;
+ }
+ else
+ /* User-created slot with the same name
exists, raise ERROR. */
+ ereport(ERROR,

errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("exiting from slot
synchronization because same"
- " name slot \"%s\"
already exists on the standby",
- remote_slot->name));
-
+ " name slot \"%s\"
already exists on the standby",
+ remote_slot->name));
+ }
/*
* The slot has been synchronized before.
*
@@ -761,6 +784,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid
remote_dbid)
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,

remote_slot->two_phase,

remote_slot->failover,
+ allow_overwrite,
true);

/* For shorter lines. */
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 600b87f..d6bc5c6 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -323,7 +323,7 @@ ReplicationSlotValidateName(const char *name, int
elevel)
void
ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency
persistency,
- bool two_phase, bool failover,
bool synced)
+ bool two_phase, bool failover,
bool allow_overwrite, bool synced)
{
ReplicationSlot *slot = NULL;
int i;
@@ -413,6 +413,11 @@ ReplicationSlotCreate(const char *name, bool
db_specific,
slot->data.two_phase_at = InvalidXLogRecPtr;
slot->data.failover = failover;
slot->data.synced = synced;
+ slot->data.allow_overwrite = allow_overwrite;
+
+ elog(LOG, "Logical replication slot %s created with option
allow_overwrite to %s",
+ NameStr(slot->data.name),
+ slot->data.allow_overwrite ? "true" : "false");

/* and then data only present in shared memory */
slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c
b/src/backend/replication/slotfuncs.c
index 36cc2ed..6bd430f 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -40,7 +40,7 @@ create_physical_replication_slot(char *name, bool
immediately_reserve,

/* acquire replication slot, this will check for conflicting names
*/
ReplicationSlotCreate(name, false,
- temporary ? RS_TEMPORARY
: RS_PERSISTENT, false,
+ temporary ? RS_TEMPORARY
: RS_PERSISTENT, false, false,
false, false);

if (immediately_reserve)
@@ -116,7 +116,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
static void
create_logical_replication_slot(char *name, char *plugin,
bool
temporary, bool two_phase,
- bool
failover,
+ bool
failover, bool allow_overwrite,
XLogRecPtr
restart_lsn,
bool
find_startpoint)
{
@@ -134,7 +134,7 @@ create_logical_replication_slot(char *name, char
*plugin,
*/
ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY
: RS_EPHEMERAL, two_phase,
- failover, false);
+ failover,
allow_overwrite, false);

/*
* Create logical decoding context to find start point or, if we
don't
@@ -173,6 +173,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
bool temporary = PG_GETARG_BOOL(2);
bool two_phase = PG_GETARG_BOOL(3);
bool failover = PG_GETARG_BOOL(4);
+ bool allow_overwrite = PG_GETARG_BOOL(5);
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;
@@ -191,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)

temporary,

two_phase,

failover,
+
allow_overwrite,

InvalidXLogRecPtr,

true);

@@ -210,6 +212,47 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(result);
}

+/*
+ * This function is intended to modify a logical replication slot with
+ * given arguments.
+ */
+static void
+alter_logical_replication_slot(char *name, bool two_phase,
+ bool
failover,
+ bool
allow_overwrite)
+{
+ Assert(!MyReplicationSlot);
+
+ ReplicationSlotAcquire(name, true, true);
+ MyReplicationSlot->data.allow_overwrite = allow_overwrite;
+ ReplicationSlotMarkDirty();
+
+ ReplicationSlotRelease();
+}
+
+/*
+ * SQL function for altering logical replication slot properties.
+ */
+Datum
+pg_alter_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+ bool two_phase = PG_GETARG_BOOL(1);
+ bool failover = PG_GETARG_BOOL(2);
+ bool allow_overwrite = PG_GETARG_BOOL(3);
+
+ CheckSlotPermissions();
+
+ CheckLogicalDecodingRequirements();
+
+ alter_logical_replication_slot(NameStr(*name),
+
two_phase,
+
failover,
+
allow_overwrite);
+
+ PG_RETURN_NAME(name);
+}
+

/*
* SQL function for dropping a replication slot.
@@ -726,6 +769,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool
logical_slot)

temporary,

false,

false,
+
false,

src_restart_lsn,

false);
}
diff --git a/src/backend/replication/walsender.c
b/src/backend/replication/walsender.c
index 9fa8beb..ef22695 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1198,7 +1198,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ?
RS_TEMPORARY : RS_PERSISTENT,
- false, false,
false);
+ false, false,
false, false);

if (reserve_wal)
{
@@ -1229,7 +1229,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ?
RS_TEMPORARY : RS_EPHEMERAL,
- two_phase,
failover, false);
+ two_phase,
failover, false, false);

/*
* Do options check early so that we can bail before
calling the
diff --git a/src/include/catalog/pg_proc.dat
b/src/include/catalog/pg_proc.dat
index 62beb71..074805d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11480,10 +11480,10 @@
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
proparallel => 'u', prorettype => 'record',
- proargtypes => 'name name bool bool bool',
- proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}',
- proargmodes => '{i,i,i,i,i,o,o}',
- proargnames =>
'{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}',
+ proargtypes => 'name name bool bool bool bool',
+ proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}',
+ proargmodes => '{i,i,i,i,i,i,o,o}',
+ proargnames =>
'{slot_name,plugin,temporary,twophase,failover,allow_overwrite,slot_name,lsn}',
prosrc => 'pg_create_logical_replication_slot' },
{ oid => '4222',
descr => 'copy a logical replication slot, changing temporality and
plugin',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index eb0b93b..1fd6445 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -134,6 +134,11 @@ typedef struct ReplicationSlotPersistentData
* for logical slots on the primary server.
*/
bool failover;
+ /*
+ * Allow Postgres to drop logical replication slot on standby
server to ensure
+ * creation of new failover slot when sync_replication_slots is
true.
+ */
+ bool allow_overwrite;
} ReplicationSlotPersistentData;

/*
@@ -267,7 +272,7 @@ extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,

ReplicationSlotPersistency persistency,
- bool
two_phase, bool failover,
+ bool
two_phase, bool failover, bool allow_overwrite,
bool
synced);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);

On Wed, Aug 13, 2025 at 8:04 AM shveta malik <shveta(dot)malik(at)gmail(dot)com> wrote:

> On Fri, Aug 8, 2025 at 7:01 PM Fabrice Chapuis <fabrice636861(at)gmail(dot)com>
> wrote:
> >
> > Thanks Shveta for coming on this point again and fixing the link.
> > The idea is to check if the slot has same name to try to resynchronize
> it with the primary.
> > ok the check on the failover status for the remote slot is perhaps
> redundant.
> > I'm not sure what impact setting the synced flag to true might have. But
> if you run an additional switchover, it works fine because the synced flag
> on the new primary is set to true now.
> > If we come back to the idea of the GUC or the API, adding an
> allow_overwrite parameter to the pg_create_logical_replication_slot
> function and removing the logical slot when set to true could be a suitable
> approach.
> >
> > What is your opinion?
> >
>
> If implemented as a GUC, it would address only a specific corner case,
> making it less suitable to be added as a GUC.
>
> OTOH, adding it as a slot's property makes more sense. You can start
> with introducing a new slot property, allow_overwrite. By default,
> this property will be set to false.
>
> a) The function pg_create_logical_replication_slot() can be extended
> to accept this parameter.
> b) A new API pg_alter_logical_replication_slot() can be introduced, to
> modify this property after slot creation if needed.
> c) The commands CREATE SUBSCRIPTION and ALTER SUBSCRIPTION are not
> needed to include an allow_overwrite parameter. When CREATE
> SUBSCRIPTION creates a slot, it will always set allow_overwrite to
> false by default. If users need to change this later, they can use the
> new API pg_alter_logical_replication_slot() to update the property.
> d) Additionally, pg_alter_logical_replication_slot() can serve as a
> generic API to modify other slot properties as well.
>
> This appears to be a reasonable idea with potential use cases beyond
> just allowing synchronization post switchover. Thoughts?
>
> ~~~
>
> Another problem as you pointed out is inconsistent behaviour across
> switchovers. On the first switchover, we get the error on new standby:
> "Exiting from slot synchronization because a slot with the same name
> already exists on the standby."
>
> But in the case of a double switchover, this error does not occur.
> This is due to the 'synced' flag not set on new standby on first
> switchover while set in double switchover. I think the behaviour
> should be the same. In both cases, it should emit the same error. We
> are thinking of a potential solution here and will start a new thread
> if needed.
>
> thanks
> Shveta
>

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Tomas Vondra 2025-08-27 14:36:53 Re: index prefetching
Previous Message Peter Eisentraut 2025-08-27 14:17:42 Re: Potential problem in commit f777d773878 and 4f7f7b03758