Re: Issue with logical replication slot during switchover - Mailing list pgsql-hackers
From | Fabrice Chapuis |
---|---|
Subject | Re: Issue with logical replication slot during switchover |
Date | |
Msg-id | CAA5-nLAvyZiAZt65qB-Vr3tuC-syo6VaHMbRYmoiX49-+BBLcw@mail.gmail.com Whole thread Raw |
In response to | Re: Issue with logical replication slot during switchover (shveta malik <shveta.malik@gmail.com>) |
Responses |
Re: Issue with logical replication slot during switchover
|
List | pgsql-hackers |
For the first step (a), the pg_create_logical_replication_slot interface is extended.
The slot on the new attached standby will be dropped and recreated if the flag allow_overwrite is set to true.
I tested the modified source, could you please give me a feedback on code changes.
Regards,
Fabrice
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308..6cd3175 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
IN temporary boolean DEFAULT false,
IN twophase boolean DEFAULT false,
IN failover boolean DEFAULT false,
+ IN allow_overwrite boolean DEFAULT false,
OUT slot_name name, OUT lsn pg_lsn)
RETURNS RECORD
LANGUAGE INTERNAL
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 656e66e..d6332cd 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -627,6 +627,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
bool slot_updated = false;
+ bool allow_overwrite = false;
/*
* Make sure that concerned WAL is received and flushed before syncing
@@ -649,24 +650,46 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
return false;
}
-
- /* Search for the named slot */
+ // Both local and remote slot have the same name
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
{
- bool synced;
+ bool synced;
SpinLockAcquire(&slot->mutex);
synced = slot->data.synced;
+ allow_overwrite = slot->data.allow_overwrite;
SpinLockRelease(&slot->mutex);
-
- /* User-created slot with the same name exists, raise ERROR. */
- if (!synced)
- ereport(ERROR,
+
+ if (!synced){
+ /*
+ * Check if we need to overwrite an existing
+ * logical slot
+ */
+ if (allow_overwrite){
+ /*
+ * Get rid of a replication slot that is no
+ *longer wanted
+ */
+ ReplicationSlotDrop(remote_slot->name,true);
+
+ /* Get rid of a replication slot that is no longer wanted */
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("slot \"%s\" already exists"
+ " on the standby but it will be dropped because flag allow_overwrite is set to true",
+ remote_slot->name));
+
+ /* Going back to the main loop after droping the failover slot */
+ return false;
+ }
+ else
+ /* User-created slot with the same name exists, raise ERROR. */
+ ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("exiting from slot synchronization because same"
- " name slot \"%s\" already exists on the standby",
- remote_slot->name));
-
+ " name slot \"%s\" already exists on the standby",
+ remote_slot->name));
+ }
/*
* The slot has been synchronized before.
*
@@ -761,6 +784,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
remote_slot->two_phase,
remote_slot->failover,
+ allow_overwrite,
true);
/* For shorter lines. */
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 600b87f..d6bc5c6 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -323,7 +323,7 @@ ReplicationSlotValidateName(const char *name, int elevel)
void
ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase, bool failover, bool synced)
+ bool two_phase, bool failover, bool allow_overwrite, bool synced)
{
ReplicationSlot *slot = NULL;
int i;
@@ -413,6 +413,11 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->data.two_phase_at = InvalidXLogRecPtr;
slot->data.failover = failover;
slot->data.synced = synced;
+ slot->data.allow_overwrite = allow_overwrite;
+
+ elog(LOG, "Logical replication slot %s created with option allow_overwrite to %s",
+ NameStr(slot->data.name),
+ slot->data.allow_overwrite ? "true" : "false");
/* and then data only present in shared memory */
slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 36cc2ed..6bd430f 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -40,7 +40,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
- temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
+ temporary ? RS_TEMPORARY : RS_PERSISTENT, false, false,
false, false);
if (immediately_reserve)
@@ -116,7 +116,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
static void
create_logical_replication_slot(char *name, char *plugin,
bool temporary, bool two_phase,
- bool failover,
+ bool failover, bool allow_overwrite,
XLogRecPtr restart_lsn,
bool find_startpoint)
{
@@ -134,7 +134,7 @@ create_logical_replication_slot(char *name, char *plugin,
*/
ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
- failover, false);
+ failover, allow_overwrite, false);
/*
* Create logical decoding context to find start point or, if we don't
@@ -173,6 +173,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
bool temporary = PG_GETARG_BOOL(2);
bool two_phase = PG_GETARG_BOOL(3);
bool failover = PG_GETARG_BOOL(4);
+ bool allow_overwrite = PG_GETARG_BOOL(5);
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;
@@ -191,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
temporary,
two_phase,
failover,
+ allow_overwrite,
InvalidXLogRecPtr,
true);
@@ -210,6 +212,47 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(result);
}
+/*
+ * This function is intended to modify a logical replication slot with
+ * given arguments.
+ */
+static void
+alter_logical_replication_slot(char *name, bool two_phase,
+ bool failover,
+ bool allow_overwrite)
+{
+ Assert(!MyReplicationSlot);
+
+ ReplicationSlotAcquire(name, true, true);
+ MyReplicationSlot->data.allow_overwrite = allow_overwrite;
+ ReplicationSlotMarkDirty();
+
+ ReplicationSlotRelease();
+}
+
+/*
+ * SQL function for altering logical replication slot properties.
+ */
+Datum
+pg_alter_logical_replication_slot(PG_FUNCTION_ARGS)
+{
+ Name name = PG_GETARG_NAME(0);
+ bool two_phase = PG_GETARG_BOOL(1);
+ bool failover = PG_GETARG_BOOL(2);
+ bool allow_overwrite = PG_GETARG_BOOL(3);
+
+ CheckSlotPermissions();
+
+ CheckLogicalDecodingRequirements();
+
+ alter_logical_replication_slot(NameStr(*name),
+ two_phase,
+ failover,
+ allow_overwrite);
+
+ PG_RETURN_NAME(name);
+}
+
/*
* SQL function for dropping a replication slot.
@@ -726,6 +769,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
temporary,
false,
false,
+ false,
src_restart_lsn,
false);
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb..ef22695 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1198,7 +1198,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
- false, false, false);
+ false, false, false, false);
if (reserve_wal)
{
@@ -1229,7 +1229,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- two_phase, failover, false);
+ two_phase, failover, false, false);
/*
* Do options check early so that we can bail before calling the
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 62beb71..074805d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11480,10 +11480,10 @@
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
proparallel => 'u', prorettype => 'record',
- proargtypes => 'name name bool bool bool',
- proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}',
- proargmodes => '{i,i,i,i,i,o,o}',
- proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}',
+ proargtypes => 'name name bool bool bool bool',
+ proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}',
+ proargmodes => '{i,i,i,i,i,i,o,o}',
+ proargnames => '{slot_name,plugin,temporary,twophase,failover,allow_overwrite,slot_name,lsn}',
prosrc => 'pg_create_logical_replication_slot' },
{ oid => '4222',
descr => 'copy a logical replication slot, changing temporality and plugin',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index eb0b93b..1fd6445 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -134,6 +134,11 @@ typedef struct ReplicationSlotPersistentData
* for logical slots on the primary server.
*/
bool failover;
+ /*
+ * Allow Postgres to drop logical replication slot on standby server to ensure
+ * creation of new failover slot when sync_replication_slots is true.
+ */
+ bool allow_overwrite;
} ReplicationSlotPersistentData;
/*
@@ -267,7 +272,7 @@ extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase, bool failover,
+ bool two_phase, bool failover, bool allow_overwrite,
bool synced);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
On Wed, Aug 13, 2025 at 8:04 AM shveta malik <shveta.malik@gmail.com> wrote:
On Fri, Aug 8, 2025 at 7:01 PM Fabrice Chapuis <fabrice636861@gmail.com> wrote:
>
> Thanks Shveta for coming on this point again and fixing the link.
> The idea is to check if the slot has same name to try to resynchronize it with the primary.
> ok the check on the failover status for the remote slot is perhaps redundant.
> I'm not sure what impact setting the synced flag to true might have. But if you run an additional switchover, it works fine because the synced flag on the new primary is set to true now.
> If we come back to the idea of the GUC or the API, adding an allow_overwrite parameter to the pg_create_logical_replication_slot function and removing the logical slot when set to true could be a suitable approach.
>
> What is your opinion?
>
If implemented as a GUC, it would address only a specific corner case,
making it less suitable to be added as a GUC.
OTOH, adding it as a slot's property makes more sense. You can start
with introducing a new slot property, allow_overwrite. By default,
this property will be set to false.
a) The function pg_create_logical_replication_slot() can be extended
to accept this parameter.
b) A new API pg_alter_logical_replication_slot() can be introduced, to
modify this property after slot creation if needed.
c) The commands CREATE SUBSCRIPTION and ALTER SUBSCRIPTION are not
needed to include an allow_overwrite parameter. When CREATE
SUBSCRIPTION creates a slot, it will always set allow_overwrite to
false by default. If users need to change this later, they can use the
new API pg_alter_logical_replication_slot() to update the property.
d) Additionally, pg_alter_logical_replication_slot() can serve as a
generic API to modify other slot properties as well.
This appears to be a reasonable idea with potential use cases beyond
just allowing synchronization post switchover. Thoughts?
~~~
Another problem as you pointed out is inconsistent behaviour across
switchovers. On the first switchover, we get the error on new standby:
"Exiting from slot synchronization because a slot with the same name
already exists on the standby."
But in the case of a double switchover, this error does not occur.
This is due to the 'synced' flag not set on new standby on first
switchover while set in double switchover. I think the behaviour
should be the same. In both cases, it should emit the same error. We
are thinking of a potential solution here and will start a new thread
if needed.
thanks
Shveta
pgsql-hackers by date: