diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index ff00d43..c1a54e7 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -17134,7 +17134,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
pg_create_physical_replication_slot
- pg_create_physical_replication_slot(slot_name name)
+ pg_create_physical_replication_slot(slot_name name, activate> boolean> )
(slot_name name, xlog_position pg_lsn)
@@ -17144,7 +17144,10 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
slot_name. Streaming changes from a physical slot
is only possible with the streaming-replication protocol - see . Corresponds to the replication protocol
- command CREATE_REPLICATION_SLOT ... PHYSICAL.
+ command CREATE_REPLICATION_SLOT ... PHYSICAL. The optional
+ second parameter, when true>, specifies that the replication
+ slot be activated right away; slots are otherwise activated on first
+ connection from a streaming client.
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index e82a53a..40be6ae 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -914,6 +914,13 @@ LANGUAGE INTERNAL
VOLATILE ROWS 1000 COST 1000
AS 'pg_logical_slot_peek_binary_changes';
+CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
+ IN slot_name name, IN activate boolean DEFAULT false,
+ OUT slot_name name, OUT xlog_position pg_lsn)
+RETURNS RECORD
+LANGUAGE INTERNAL
+AS 'pg_create_physical_replication_slot';
+
CREATE OR REPLACE FUNCTION
make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 824bc91..124bb63 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -28,9 +28,6 @@
#include "postgres.h"
-#include
-#include
-
#include "miscadmin.h"
#include "access/xact.h"
@@ -253,52 +250,7 @@ CreateInitDecodingContext(char *plugin,
StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
SpinLockRelease(&slot->mutex);
- /*
- * The replication slot mechanism is used to prevent removal of required
- * WAL. As there is no interlock between this and checkpoints required WAL
- * could be removed before ReplicationSlotsComputeRequiredLSN() has been
- * called to prevent that. In the very unlikely case that this happens
- * we'll just retry.
- */
- while (true)
- {
- XLogSegNo segno;
-
- /*
- * Let's start with enough information if we can, so log a standby
- * snapshot and start decoding at exactly that position.
- */
- if (!RecoveryInProgress())
- {
- XLogRecPtr flushptr;
-
- /* start at current insert position */
- slot->data.restart_lsn = GetXLogInsertRecPtr();
-
- /* make sure we have enough information to start */
- flushptr = LogStandbySnapshot();
-
- /* and make sure it's fsynced to disk */
- XLogFlush(flushptr);
- }
- else
- slot->data.restart_lsn = GetRedoRecPtr();
-
- /* prevent WAL removal as fast as possible */
- ReplicationSlotsComputeRequiredLSN();
-
- /*
- * If all required WAL is still there, great, otherwise retry. The
- * slot should prevent further removal of WAL, unless there's a
- * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
- * the new restart_lsn above, so normally we should never need to loop
- * more than twice.
- */
- XLByteToSeg(slot->data.restart_lsn, segno);
- if (XLogGetLastRemovedSegno() < segno)
- break;
- }
-
+ ReplicationSlotRegisterRestartLSN();
/* ----
* This is a bit tricky: We need to determine a safe xmin horizon to start
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 060343f..73522bc 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -40,10 +40,10 @@
#include
#include "access/transam.h"
+#include "access/xlog_internal.h"
#include "common/string.h"
#include "miscadmin.h"
#include "replication/slot.h"
-#include "storage/fd.h"
#include "storage/proc.h"
#include "storage/procarray.h"
@@ -661,7 +661,7 @@ ReplicationSlotsComputeRequiredLSN(void)
/*
* Compute the oldest WAL LSN required by *logical* decoding slots..
*
- * Returns InvalidXLogRecPtr if logical decoding is disabled or no logicals
+ * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
* slots exist.
*
* NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
@@ -782,6 +782,71 @@ CheckSlotRequirements(void)
}
/*
+ * Grab and save an LSN value to prevent WAL recycling past that point.
+ */
+void
+ReplicationSlotRegisterRestartLSN()
+{
+ ReplicationSlot *slot = MyReplicationSlot;
+
+ Assert(slot != NULL);
+ Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
+
+ /*
+ * The replication slot mechanism is used to prevent removal of required
+ * WAL. As there is no interlock between this and checkpoints required, WAL
+ * segment could be removed before ReplicationSlotsComputeRequiredLSN() has
+ * been called to prevent that. In the very unlikely case that this happens
+ * we'll just retry.
+ */
+ while (true)
+ {
+ XLogSegNo segno;
+
+ /*
+ * Let's start with enough information if we can, so log a standby
+ * snapshot and start decoding at exactly that position.
+ */
+ if (!RecoveryInProgress())
+ {
+ XLogRecPtr flushptr;
+
+ /* start at current insert position */
+ slot->data.restart_lsn = GetXLogInsertRecPtr();
+
+ /*
+ * Log an xid snapshot for logical replication. It's not needed for
+ * physical slots as it is done in BGWriter on a regular basis.
+ */
+ if (!slot->data.persistency == RS_PERSISTENT)
+ {
+ /* make sure we have enough information to start */
+ flushptr = LogStandbySnapshot();
+
+ /* and make sure it's fsynced to disk */
+ XLogFlush(flushptr);
+ }
+ }
+ else
+ slot->data.restart_lsn = GetRedoRecPtr();
+
+ /* prevent WAL removal as fast as possible */
+ ReplicationSlotsComputeRequiredLSN();
+
+ /*
+ * If all required WAL is still there, great, otherwise retry. The
+ * slot should prevent further removal of WAL, unless there's a
+ * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
+ * the new restart_lsn above, so normally we should never need to loop
+ * more than twice.
+ */
+ XLByteToSeg(slot->data.restart_lsn, segno);
+ if (XLogGetLastRemovedSegno() < segno)
+ break;
+ }
+}
+
+/*
* Flush all replication slots to disk.
*
* This needn't actually be part of a checkpoint, but it's a convenient
@@ -876,7 +941,7 @@ StartupReplicationSlots(void)
}
/* ----
- * Manipulation of ondisk state of replication slots
+ * Manipulation of on-disk state of replication slots
*
* NB: none of the routines below should take any notice whether a slot is the
* current one or not, that's all handled a layer above.
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 9a2793f..bd526fa 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -40,6 +40,7 @@ Datum
pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
{
Name name = PG_GETARG_NAME(0);
+ bool activate = PG_GETARG_BOOL(1);
Datum values[2];
bool nulls[2];
TupleDesc tupdesc;
@@ -58,10 +59,28 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
- values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+ if (activate)
+ {
+ /* Allocate restart-LSN, if the user asked for it */
+ ReplicationSlotRegisterRestartLSN();
+
+ /* Write this slot to disk */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
- nulls[0] = false;
- nulls[1] = true;
+ values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+ values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
+
+ nulls[0] = false;
+ nulls[1] = false;
+ }
+ else
+ {
+ values[0] = NameGetDatum(&MyReplicationSlot->data.name);
+
+ nulls[0] = false;
+ nulls[1] = true;
+ }
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index fe06ec2..312edfc 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5169,7 +5169,7 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0
DESCR("SP-GiST support for quad tree over range");
/* replication slots */
-DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,activate,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 78cff07..2bafe62 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -171,6 +171,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void);
+extern void ReplicationSlotRegisterRestartLSN(void);
+
extern void CheckSlotRequirements(void);
/* SQL callable functions */