diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index ff00d43..c1a54e7 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -17134,7 +17134,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); pg_create_physical_replication_slot - pg_create_physical_replication_slot(slot_name name) + pg_create_physical_replication_slot(slot_name name, activate boolean ) (slot_name name, xlog_position pg_lsn) @@ -17144,7 +17144,10 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); slot_name. Streaming changes from a physical slot is only possible with the streaming-replication protocol - see . Corresponds to the replication protocol - command CREATE_REPLICATION_SLOT ... PHYSICAL. + command CREATE_REPLICATION_SLOT ... PHYSICAL. The optional + second parameter, when true, specifies that the replication + slot be activated right away; slots are otherwise activated on first + connection from a streaming client. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index e82a53a..40be6ae 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -914,6 +914,13 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_peek_binary_changes'; +CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( + IN slot_name name, IN activate boolean DEFAULT false, + OUT slot_name name, OUT xlog_position pg_lsn) +RETURNS RECORD +LANGUAGE INTERNAL +AS 'pg_create_physical_replication_slot'; + CREATE OR REPLACE FUNCTION make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0, days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 824bc91..124bb63 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -28,9 +28,6 @@ #include "postgres.h" -#include -#include - #include "miscadmin.h" #include "access/xact.h" @@ -253,52 +250,7 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - /* - * The replication slot mechanism is used to prevent removal of required - * WAL. As there is no interlock between this and checkpoints required WAL - * could be removed before ReplicationSlotsComputeRequiredLSN() has been - * called to prevent that. In the very unlikely case that this happens - * we'll just retry. - */ - while (true) - { - XLogSegNo segno; - - /* - * Let's start with enough information if we can, so log a standby - * snapshot and start decoding at exactly that position. - */ - if (!RecoveryInProgress()) - { - XLogRecPtr flushptr; - - /* start at current insert position */ - slot->data.restart_lsn = GetXLogInsertRecPtr(); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); - } - else - slot->data.restart_lsn = GetRedoRecPtr(); - - /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); - - /* - * If all required WAL is still there, great, otherwise retry. The - * slot should prevent further removal of WAL, unless there's a - * concurrent ReplicationSlotsComputeRequiredLSN() after we've written - * the new restart_lsn above, so normally we should never need to loop - * more than twice. - */ - XLByteToSeg(slot->data.restart_lsn, segno); - if (XLogGetLastRemovedSegno() < segno) - break; - } - + ReplicationSlotRegisterRestartLSN(); /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 060343f..73522bc 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -40,10 +40,10 @@ #include #include "access/transam.h" +#include "access/xlog_internal.h" #include "common/string.h" #include "miscadmin.h" #include "replication/slot.h" -#include "storage/fd.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -661,7 +661,7 @@ ReplicationSlotsComputeRequiredLSN(void) /* * Compute the oldest WAL LSN required by *logical* decoding slots.. * - * Returns InvalidXLogRecPtr if logical decoding is disabled or no logicals + * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical * slots exist. * * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it @@ -782,6 +782,71 @@ CheckSlotRequirements(void) } /* + * Grab and save an LSN value to prevent WAL recycling past that point. + */ +void +ReplicationSlotRegisterRestartLSN() +{ + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL); + Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + + /* + * The replication slot mechanism is used to prevent removal of required + * WAL. As there is no interlock between this and checkpoints required, WAL + * segment could be removed before ReplicationSlotsComputeRequiredLSN() has + * been called to prevent that. In the very unlikely case that this happens + * we'll just retry. + */ + while (true) + { + XLogSegNo segno; + + /* + * Let's start with enough information if we can, so log a standby + * snapshot and start decoding at exactly that position. + */ + if (!RecoveryInProgress()) + { + XLogRecPtr flushptr; + + /* start at current insert position */ + slot->data.restart_lsn = GetXLogInsertRecPtr(); + + /* + * Log an xid snapshot for logical replication. It's not needed for + * physical slots as it is done in BGWriter on a regular basis. + */ + if (!slot->data.persistency == RS_PERSISTENT) + { + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } + } + else + slot->data.restart_lsn = GetRedoRecPtr(); + + /* prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + /* + * If all required WAL is still there, great, otherwise retry. The + * slot should prevent further removal of WAL, unless there's a + * concurrent ReplicationSlotsComputeRequiredLSN() after we've written + * the new restart_lsn above, so normally we should never need to loop + * more than twice. + */ + XLByteToSeg(slot->data.restart_lsn, segno); + if (XLogGetLastRemovedSegno() < segno) + break; + } +} + +/* * Flush all replication slots to disk. * * This needn't actually be part of a checkpoint, but it's a convenient @@ -876,7 +941,7 @@ StartupReplicationSlots(void) } /* ---- - * Manipulation of ondisk state of replication slots + * Manipulation of on-disk state of replication slots * * NB: none of the routines below should take any notice whether a slot is the * current one or not, that's all handled a layer above. diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 9a2793f..bd526fa 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -40,6 +40,7 @@ Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); + bool activate = PG_GETARG_BOOL(1); Datum values[2]; bool nulls[2]; TupleDesc tupdesc; @@ -58,10 +59,28 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); - values[0] = NameGetDatum(&MyReplicationSlot->data.name); + if (activate) + { + /* Allocate restart-LSN, if the user asked for it */ + ReplicationSlotRegisterRestartLSN(); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); - nulls[0] = false; - nulls[1] = true; + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); + + nulls[0] = false; + nulls[1] = false; + } + else + { + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + + nulls[0] = false; + nulls[1] = true; + } tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index fe06ec2..312edfc 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5169,7 +5169,7 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 DESCR("SP-GiST support for quad tree over range"); /* replication slots */ -DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); +DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,activate,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 78cff07..2bafe62 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -171,6 +171,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); +extern void ReplicationSlotRegisterRestartLSN(void); + extern void CheckSlotRequirements(void); /* SQL callable functions */