From b72ea52c865b2d7f0d7d29d0834d71e1ec33d54a Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Thu, 6 Jul 2017 18:16:44 +0200 Subject: [PATCH] Wait for slot to become free in when dropping it --- src/backend/replication/logical/logicalfuncs.c | 2 +- src/backend/replication/slot.c | 43 +++++++++++++++++++++----- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 6 ++-- src/include/replication/slot.h | 8 +++-- 5 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 363ca82..a3ba2b1 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); - ReplicationSlotAcquire(NameStr(*name)); + ReplicationSlotAcquire(NameStr(*name), true); PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index dc7de20..2993bb9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,6 +46,7 @@ #include "pgstat.h" #include "replication/slot.h" #include "storage/fd.h" +#include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" @@ -157,6 +158,7 @@ ReplicationSlotsShmemInit(void) /* everything else is zeroed by the memset above */ SpinLockInit(&slot->mutex); LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS); + ConditionVariableInit(&slot->active_cv); } } } @@ -323,7 +325,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * Find a previously created slot and mark it as used by this backend. */ void -ReplicationSlotAcquire(const char *name) +ReplicationSlotAcquire(const char *name, bool nowait) { ReplicationSlot *slot = NULL; int i; @@ -331,6 +333,8 @@ ReplicationSlotAcquire(const char *name) Assert(MyReplicationSlot == NULL); +retry: + /* Search for the named slot and mark it active if we find it. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) @@ -342,7 +346,10 @@ ReplicationSlotAcquire(const char *name) SpinLockAcquire(&s->mutex); active_pid = s->active_pid; if (active_pid == 0) + { active_pid = s->active_pid = MyProcPid; + ConditionVariableBroadcast(&s->active_cv); + } SpinLockRelease(&s->mutex); slot = s; break; @@ -350,16 +357,33 @@ ReplicationSlotAcquire(const char *name) } LWLockRelease(ReplicationSlotControlLock); - /* If we did not find the slot or it was already active, error out. */ + /* If we did not find the slot, error out. */ if (slot == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); + + /* + * If we did find the slot but it's already acquired by another backend, + * we either error out or retry after short wait, depending on what was + * the behavior requested by caller. + */ if (active_pid != MyProcPid) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("replication slot \"%s\" is active for PID %d", - name, active_pid))); + { + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is active for PID %d", + name, active_pid))); + + /* Wait for condition variable signal from ReplicationSlotRelease. */ + ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK); + ConditionVariableCancelSleep(); + + goto retry; + } + + /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; @@ -393,6 +417,7 @@ ReplicationSlotRelease(void) */ SpinLockAcquire(&slot->mutex); slot->active_pid = 0; + ConditionVariableBroadcast(&slot->active_cv); SpinLockRelease(&slot->mutex); } @@ -451,11 +476,11 @@ ReplicationSlotCleanup(void) * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name) +ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name); + ReplicationSlotAcquire(name, nowait); ReplicationSlotDropAcquired(); } @@ -525,6 +550,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) SpinLockAcquire(&slot->mutex); slot->active_pid = 0; + ConditionVariableBroadcast(&slot->active_cv); SpinLockRelease(&slot->mutex); ereport(fail_softly ? WARNING : ERROR, @@ -543,6 +569,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); slot->active_pid = 0; slot->in_use = false; + ConditionVariableBroadcast(&slot->active_cv); LWLockRelease(ReplicationSlotControlLock); /* diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6dc8088..a5ecc85 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name)); + ReplicationSlotDrop(NameStr(*name), false); PG_RETURN_VOID(); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 002143b..9a2babe 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname); + ReplicationSlotDrop(cmd->slotname, false); EndCommand("DROP_REPLICATION_SLOT", DestRemote); } @@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); /* * Force a disconnect, so that the decoding code doesn't need to care diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a283f4e..f97679e 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -12,6 +12,7 @@ #include "fmgr.h" #include "access/xlog.h" #include "access/xlogreader.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" @@ -93,6 +94,9 @@ typedef struct ReplicationSlot /* Who is streaming out changes for this slot? 0 in unused slots. */ pid_t active_pid; + /* Conditional variable which is signalled when the above changes. */ + ConditionVariable active_cv; + /* any outstanding modifications? */ bool just_dirtied; bool dirty; @@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void); extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency p); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name); +extern void ReplicationSlotDrop(const char *name, bool nowait); -extern void ReplicationSlotAcquire(const char *name); +extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); -- 2.7.4