From 17d503ec1970e23806004f6d39dc0318113f467a Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 9 Jul 2018 16:37:17 +0900 Subject: [PATCH v4] Copy function for logical and physical replication slot. --- contrib/test_decoding/expected/slot.out | 228 ++++++++++++++++++ contrib/test_decoding/logical.conf | 2 +- contrib/test_decoding/sql/slot.sql | 80 +++++++ doc/src/sgml/func.sgml | 41 ++++ src/backend/replication/logical/logical.c | 5 +- src/backend/replication/slot.c | 89 ++++--- src/backend/replication/slotfuncs.c | 380 ++++++++++++++++++++++++++---- src/backend/replication/walsender.c | 3 +- src/include/catalog/pg_proc.dat | 35 +++ src/include/replication/logical.h | 1 + src/include/replication/slot.h | 2 +- 11 files changed, 780 insertions(+), 86 deletions(-) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 2737a8a..b87ef37 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -148,3 +148,231 @@ SELECT pg_drop_replication_slot('regression_slot3'); (1 row) +-- +-- Test copy function for logical replication slots +-- +-- Create original logical slot +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true); + ?column? +---------- + init +(1 row) + +-- Preserve all values of the original slot +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change'); + ?column? +---------- + copy +(1 row) + +-- Change output plugin, preserve the persisitence +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', 'pgoutput'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', 'pgoutput'); + ?column? +---------- + copy +(1 row) + +-- Change both output plugin and persistence +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_both', 'pgoutput', true); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_both', 'pgoutput', false); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + slot_name | plugin | temporary | slot_name | plugin | temporary +------------+---------------+-----------+----------------------------+---------------+----------- + orig_slot1 | test_decoding | f | copied_slot1_change_both | pgoutput | t + orig_slot1 | test_decoding | f | copied_slot1_change_plugin | pgoutput | f + orig_slot1 | test_decoding | f | copied_slot1_no_change | test_decoding | f + orig_slot2 | test_decoding | t | copied_slot2_change_both | pgoutput | f + orig_slot2 | test_decoding | t | copied_slot2_change_plugin | pgoutput | t + orig_slot2 | test_decoding | t | copied_slot2_no_change | test_decoding | t +(6 rows) + +-- Now we have maximum 8 replication slots. Check slots are properly +-- released even when raise error during creating the target slot. +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error +ERROR: all replication slots are in use +HINT: Free one or increase max_replication_slots. +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot1_no_change'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot1_change_plugin'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot2_change_both'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- +-- Test copy function for physical replication slots +-- +-- Create original physical slot +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', false, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot3', true, true); + ?column? +---------- + init +(1 row) + +-- Preserve all values of the original slot +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot3', 'copied_slot3_no_change'); + ?column? +---------- + copy +(1 row) + +-- Change persistence +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot3', 'copied_slot3_temp', false); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status +SELECT slot_name, slot_type, temporary FROM pg_replication_slots; + slot_name | slot_type | temporary +------------------------+-----------+----------- + orig_slot1 | physical | f + orig_slot2 | physical | f + orig_slot3 | physical | t + copied_slot1_no_change | physical | f + copied_slot2_no_change | physical | f + copied_slot3_no_change | physical | t + copied_slot1_temp | physical | t + copied_slot3_temp | physical | f +(8 rows) + +SELECT + o.slot_name, c.slot_name +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + slot_name | slot_name +------------+------------------------ + orig_slot2 | copied_slot2_no_change + orig_slot2 | copied_slot3_no_change + orig_slot2 | copied_slot3_temp + orig_slot2 | orig_slot3 + orig_slot3 | copied_slot2_no_change + orig_slot3 | copied_slot3_no_change + orig_slot3 | copied_slot3_temp + orig_slot3 | orig_slot2 +(8 rows) + +-- Now we have maximum 8 replication slots. Check slots are properly +-- released even when raise error during creating the target slot. +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'failed'); -- error +ERROR: all replication slots are in use +HINT: Free one or increase max_replication_slots. +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('orig_slot2'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot1_no_change'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot2_no_change'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot3_temp'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/logical.conf b/contrib/test_decoding/logical.conf index 367f706..bc09e43 100644 --- a/contrib/test_decoding/logical.conf +++ b/contrib/test_decoding/logical.conf @@ -1,2 +1,2 @@ wal_level = logical -max_replication_slots = 4 +max_replication_slots = 8 diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 24cdf71..7a846f3 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -74,3 +74,83 @@ SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3'); SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error SELECT pg_drop_replication_slot('regression_slot3'); + +-- +-- Test copy function for logical replication slots +-- + +-- Create original logical slot +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true); + +-- Preserve all values of the original slot +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change'); + +-- Change output plugin, preserve the persisitence +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', 'pgoutput'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', 'pgoutput'); + +-- Change both output plugin and persistence +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_both', 'pgoutput', true); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_both', 'pgoutput', false); + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +-- Now we have maximum 8 replication slots. Check slots are properly +-- released even when raise error during creating the target slot. +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); +SELECT pg_drop_replication_slot('copied_slot1_no_change'); +SELECT pg_drop_replication_slot('copied_slot1_change_plugin'); +SELECT pg_drop_replication_slot('copied_slot2_change_both'); + +-- +-- Test copy function for physical replication slots +-- + +-- Create original physical slot +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', false, false); +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, false); +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot3', true, true); + +-- Preserve all values of the original slot +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change'); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change'); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot3', 'copied_slot3_no_change'); + +-- Change persistence +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot3', 'copied_slot3_temp', false); + +-- Check all copied slots status +SELECT slot_name, slot_type, temporary FROM pg_replication_slots; +SELECT + o.slot_name, c.slot_name +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +-- Now we have maximum 8 replication slots. Check slots are properly +-- released even when raise error during creating the target slot. +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); +SELECT pg_drop_replication_slot('orig_slot2'); +SELECT pg_drop_replication_slot('copied_slot1_no_change'); +SELECT pg_drop_replication_slot('copied_slot2_no_change'); +SELECT pg_drop_replication_slot('copied_slot3_temp'); diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index edc9be9..93aff1b 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -19252,6 +19252,47 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); + pg_copy_physical_replication_slot + + pg_copy_physical_replication_slot(src_slot_name name, dst_slot_name , temporary bool) + + + (slot_name name, lsn pg_lsn) + + + Copies an existing physical replication slot name src_slot_name + to a physical replicaiton slot named dst_slot_name. + The copied physical slot starts to reserve WAL from the same LSN as the + source slot if the source slot already reserves WAL. + temporary is optional. If temporary + is omitted, the same value as the source slot is used. + + + + + + + pg_copy_logical_replication_slot + + pg_copy_logical_replication_slot(src_slot_name name, dst_slot_name , plugin name , temporary boolean) + + + (slot_name name, lsn pg_lsn) + + + Copies an existing logical replication slot name src_slot_name + to a logical replication slot named dst_slot_name + while changing the output plugin and persistence. The copied logical slot starts + from the same LSN as the source logical slot. Both plugin and + temporary are optional. If plugin + or temporary are omitted, the same values as + the source logical slot are used. + + + + + + pg_logical_slot_get_changes pg_logical_slot_get_changes(slot_name name, upto_lsn pg_lsn, upto_nchanges int, VARIADIC options text[]) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c2d0e0c..5b0381e 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -223,6 +223,7 @@ LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -266,7 +267,7 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - ReplicationSlotReserveWal(); + ReplicationSlotReserveWal(restart_lsn); /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start @@ -311,7 +312,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, + ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, true, read_page, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fb95b44..514a1f9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -986,11 +986,13 @@ CheckSlotRequirements(void) /* * Reserve WAL for the currently active slot. * - * Compute and set restart_lsn in a manner that's appropriate for the type of - * the slot and concurrency safe. + * If an lsn to reserve is not requested, compute and set restart_lsn + * in a manner that's appropriate for the type of the slot and concurrency safe. + * If requested, set restart_lsn and check if the corresponding wal segment + * is available. */ void -ReplicationSlotReserveWal(void) +ReplicationSlotReserveWal(XLogRecPtr requested_lsn) { ReplicationSlot *slot = MyReplicationSlot; @@ -1001,47 +1003,57 @@ ReplicationSlotReserveWal(void) * The replication slot mechanism is used to prevent removal of required * WAL. As there is no interlock between this routine and checkpoints, WAL * segments could concurrently be removed when a now stale return value of - * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that - * this happens we'll just retry. + * ReplicationSlotsComputeRequiredLSN() is used. If the lsn to reserve is + * not requested, in the unlikely case that this happens we'll just retry. */ while (true) { XLogSegNo segno; XLogRecPtr restart_lsn; - /* - * For logical slots log a standby snapshot and start logical decoding - * at exactly that position. That allows the slot to start up more - * quickly. - * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. - */ - if (!RecoveryInProgress() && SlotIsLogical(slot)) + if (!XLogRecPtrIsInvalid(requested_lsn)) { - XLogRecPtr flushptr; - - /* start at current insert position */ - restart_lsn = GetXLogInsertRecPtr(); + /* Set the requested lsn */ SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; + slot->data.restart_lsn = requested_lsn; SpinLockRelease(&slot->mutex); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); } else { - restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); + /* + * For logical slots log a standby snapshot and start logical decoding + * at exactly that position. That allows the slot to start up more + * quickly. + * + * That's not needed (or indeed helpful) for physical slots as they'll + * start replay at the last logged checkpoint anyway. Instead return + * the location of the last redo LSN. While that slightly increases + * the chance that we have to retry, it's where a base backup has to + * start replay at. + */ + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { + XLogRecPtr flushptr; + + /* start at current insert position */ + restart_lsn = GetXLogInsertRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } + else + { + restart_lsn = GetRedoRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + } } /* prevent WAL removal as fast as possible */ @@ -1057,6 +1069,19 @@ ReplicationSlotReserveWal(void) XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); if (XLogGetLastRemovedSegno() < segno) break; + + /* + * The requested wal lsn is no longer available. We don't want to retry + * it, so raise an error. + */ + if (!XLogRecPtrIsInvalid(restart_lsn)) + { + char filename[MAXFNAMELEN]; + + XLogFileName(filename, ThisTimeLineID, segno, wal_segment_size); + ereport(ERROR, + (errmsg("could not reserve WAL segment %s", filename))); + } } } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 23af323..d3d26e8 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -17,10 +17,12 @@ #include "miscadmin.h" #include "access/htup_details.h" +#include "access/xlog_internal.h" #include "replication/decode.h" #include "replication/slot.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" +#include "storage/ipc.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/pg_lsn.h" @@ -36,86 +38,228 @@ check_permissions(void) } /* - * SQL function for creating a new physical (streaming replication) - * replication slot. + * Error cleanup callback for copy replication slot functions. Release + * both MyReplicationSlot and the saved replication slot. */ -Datum -pg_create_physical_replication_slot(PG_FUNCTION_ARGS) +static void +copy_replication_slot_callback(int code, Datum arg) { - Name name = PG_GETARG_NAME(0); - bool immediately_reserve = PG_GETARG_BOOL(1); - bool temporary = PG_GETARG_BOOL(2); - Datum values[2]; - bool nulls[2]; - TupleDesc tupdesc; - HeapTuple tuple; - Datum result; + ReplicationSlot *savedslot = (ReplicationSlot *) DatumGetPointer(arg); - Assert(!MyReplicationSlot); + if (MyReplicationSlot) + ReplicationSlotRelease(); - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); + /* Release the saved slot if exist while preventing double releasing */ + if (savedslot && savedslot != MyReplicationSlot) + { + MyReplicationSlot = savedslot; + ReplicationSlotRelease(); + } +} + +/* + * Helper function for creating a new physical replication slot with + * given arguments. Return a restart_lsn of new replication slot or + * InvalidXLogRecPtr if WAL reservation is not required. + */ +static XLogRecPtr +create_physical_replication_slot(char *name, bool immediately_reserve, + bool temporary, XLogRecPtr restart_lsn) +{ + XLogRecPtr result = InvalidXLogRecPtr; + + Assert(!MyReplicationSlot); check_permissions(); CheckSlotRequirements(); /* acquire replication slot, this will check for conflicting names */ - ReplicationSlotCreate(NameStr(*name), false, + ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT); - values[0] = NameGetDatum(&MyReplicationSlot->data.name); - nulls[0] = false; - if (immediately_reserve) { /* Reserve WAL as the user asked for it */ - ReplicationSlotReserveWal(); + ReplicationSlotReserveWal(restart_lsn); /* Write this slot to disk */ ReplicationSlotMarkDirty(); ReplicationSlotSave(); - values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); - nulls[1] = false; + result = MyReplicationSlot->data.restart_lsn; } + + ReplicationSlotRelease(); + + return result; +} + +/* + * SQL function for creating a new physical (streaming replication) + * replication slot. + */ +Datum +pg_create_physical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + bool immediately_reserve = PG_GETARG_BOOL(1); + bool temporary = PG_GETARG_BOOL(2); + Datum values[2]; + bool nulls[2]; + XLogRecPtr result_lsn; + TupleDesc tupdesc; + HeapTuple tuple; + Datum result; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + result_lsn = create_physical_replication_slot(NameStr(*name), + immediately_reserve, + temporary, + InvalidXLogRecPtr); + + values[0] = NameGetDatum(name); + nulls[0] = false; + + if (XLogRecPtrIsInvalid(result_lsn)) + nulls[1] = true; else { - nulls[1] = true; + values[1] = LSNGetDatum(result_lsn); + nulls[1] = false; } tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); - ReplicationSlotRelease(); - PG_RETURN_DATUM(result); } - /* - * SQL function for creating a new logical replication slot. + * Copy physical replication slot (3 arguments) + * + * note: this wrapper is necessary to pass the sanity check in opr_sanity, + * which checks that all built-in functions that share the implementing C + * function take the same number of arguments */ Datum -pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +pg_copy_physical_replication_slot_no_temp(PG_FUNCTION_ARGS) { - Name name = PG_GETARG_NAME(0); - Name plugin = PG_GETARG_NAME(1); - bool temporary = PG_GETARG_BOOL(2); - - LogicalDecodingContext *ctx = NULL; + return pg_copy_physical_replication_slot(fcinfo); +} - TupleDesc tupdesc; - HeapTuple tuple; - Datum result; +/* + * SQL function for copying a physical replication slot. + */ +Datum +pg_copy_physical_replication_slot(PG_FUNCTION_ARGS) +{ + Name src_name = PG_GETARG_NAME(0); + Name dst_name = PG_GETARG_NAME(1); + bool temporary; /* optional argument */ + bool immediately_reserve; + ReplicationSlot *saveslot = NULL; + XLogRecPtr restart_lsn; + XLogRecPtr result_lsn; Datum values[2]; bool nulls[2]; - - Assert(!MyReplicationSlot); + TupleDesc tupdesc; + HeapTuple tuple; if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); + /* Acquire the source slot so we own it */ + ReplicationSlotAcquire(NameStr(*src_name), true); + + /* Check type of replication slot */ + if (SlotIsLogical(MyReplicationSlot)) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + (errmsg("cannot copy a logical replication slot to a physical replication slot")))); + } + + /* Save values of the source slot */ + restart_lsn = MyReplicationSlot->data.restart_lsn; + temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY); + + /* Reserve WAL at creation if the source slot already reserves */ + immediately_reserve = !XLogRecPtrIsInvalid(restart_lsn); + + /* check the optional argument */ + if (PG_NARGS() >= 3) + temporary = PG_GETARG_BOOL(2); + + /* + * To prevent the restart_lsn WAL of the source slot from removal + * during copying a new slot, we copy it while holding the source slot. + * Since we are not allowed to create a new one while holding another + * one, we temporarily save the acquired slot and restore it after + * creation. Set callback function to ensure we release replication + * slots if fail below. + */ + if (immediately_reserve) + saveslot = MyReplicationSlot; + else + ReplicationSlotRelease(); + + PG_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot)); + { + if (immediately_reserve) + MyReplicationSlot = NULL; + + result_lsn = create_physical_replication_slot(NameStr(*dst_name), + immediately_reserve, + temporary, + restart_lsn); + Assert(MyReplicationSlot == NULL); + + /* + * Restore source slot, if saved. We must not change the saveslot + * to cancel the callback function. + */ + if (saveslot) + MyReplicationSlot = saveslot; + } + PG_END_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot)); + + /* Release the source slot, if not yet */ + if (immediately_reserve) + ReplicationSlotRelease(); + + values[0] = NameGetDatum(dst_name); + nulls[0] = false; + + if (XLogRecPtrIsInvalid(result_lsn)) + nulls[1] = true; + else + { + values[1] = LSNGetDatum(result_lsn); + nulls[1] = false; + } + + tuple = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); +} + +/* + * Helper function for creating a new logical replication slot with + * given arguments. Return a confirmed_lsn of new replication slot. + */ +static XLogRecPtr +create_logical_replication_slot(char *name, char *plugin, + bool temporary, XLogRecPtr start_lsn) +{ + LogicalDecodingContext *ctx = NULL; + XLogRecPtr result; + + Assert(!MyReplicationSlot); + check_permissions(); CheckLogicalDecodingRequirements(); @@ -128,39 +272,177 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) * slots can be created as temporary from beginning as they get dropped on * error as well. */ - ReplicationSlotCreate(NameStr(*name), true, + ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* * Create logical decoding context, to build the initial snapshot. */ - ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, + ctx = CreateInitDecodingContext(plugin, NIL, false, /* do not build snapshot */ + start_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); - values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name)); - values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); - /* don't need the decoding context anymore */ FreeDecodingContext(ctx); - memset(nulls, 0, sizeof(nulls)); - - tuple = heap_form_tuple(tupdesc, values, nulls); - result = HeapTupleGetDatum(tuple); - /* ok, slot is now fully created, mark it as persistent if needed */ if (!temporary) ReplicationSlotPersist(); + + result = MyReplicationSlot->data.confirmed_flush; + ReplicationSlotRelease(); - PG_RETURN_DATUM(result); + return result; } +/* + * SQL function for creating a new logical replication slot. + */ +Datum +pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + Name plugin = PG_GETARG_NAME(1); + bool temporary = PG_GETARG_BOOL(2); + XLogRecPtr confirmed_flush; + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[2]; + bool nulls[2]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + confirmed_flush = create_logical_replication_slot(NameStr(*name), + NameStr(*plugin), + temporary, + InvalidXLogRecPtr); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = CStringGetTextDatum(NameStr(*name)); + values[1] = LSNGetDatum(confirmed_flush); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_POINTER(HeapTupleGetDatum(tuple)); +} + +/* + * Copy logical replication slot (2 arguments) + * + * note: this wrapper is necessary to pass the sanity check in opr_sanity, + * which checks that all built-in functions that share the implementing C + * function take the same number of arguments + */ +Datum +pg_copy_logical_replication_slot_no_plugin_temp(PG_FUNCTION_ARGS) +{ + return pg_copy_logical_replication_slot(fcinfo); +} + +/* + * Copy logical replication slot (3 arguments) + * + * note: this wrapper is necessary to pass the sanity check in opr_sanity, + * which checks that all built-in functions that share the implementing C + * function take the same number of arguments + */ +Datum +pg_copy_logical_replication_slot_no_plugin(PG_FUNCTION_ARGS) +{ + return pg_copy_logical_replication_slot(fcinfo); +} + +/* + * SQL function for copying a logical replication slot. + */ +Datum +pg_copy_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name src_name = PG_GETARG_NAME(0); + Name dst_name = PG_GETARG_NAME(1); + char *plugin; /* optional argument */ + bool temporary; /* optional argument */ + ReplicationSlot *saveslot = NULL; + XLogRecPtr confirmed_flush; + XLogRecPtr restart_lsn; + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[2]; + bool nulls[2]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* Acquire the source slot so we own it */ + ReplicationSlotAcquire(NameStr(*src_name), true); + + /* Check type of replication slot */ + if (SlotIsPhysical(MyReplicationSlot)) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + (errmsg("cannot copy a physical replication slot to a logical replication slot")))); + } + + /* Save values of the source slot */ + restart_lsn = MyReplicationSlot->data.restart_lsn; + plugin = pstrdup(NameStr(MyReplicationSlot->data.plugin)); + temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY); + + /* Check the optional arguments */ + if (PG_NARGS() >= 3) + plugin = NameStr(*(PG_GETARG_NAME(2))); + if (PG_NARGS() >= 4) + temporary = PG_GETARG_BOOL(3); + + /* + * To prevent the restart_lsn WAL of the source slot from removal + * during copying a new slot, we copy it while holding the source slot. + * Since we are not allowed to create a new one while holding another + * one, we temporarily save the acquired slot and restore it after + * creation. Set callback function to ensure we release replication + * slots if fail below. + */ + saveslot = MyReplicationSlot; + PG_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot)); + { + MyReplicationSlot = NULL; + + confirmed_flush = create_logical_replication_slot(NameStr(*dst_name), + plugin, + temporary, + restart_lsn); + Assert(MyReplicationSlot == NULL); + + /* + * Restore source slot. We must not change the saveslot to cancel the + * callback function. + */ + MyReplicationSlot = saveslot; + } + PG_END_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot)); + + /* Release the source slot */ + ReplicationSlotRelease(); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = NameGetDatum(dst_name); + values[1] = LSNGetDatum(confirmed_flush); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_POINTER(HeapTupleGetDatum(tuple)); +} /* * SQL function for dropping a replication slot. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e47ddca..85c47b5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -918,6 +918,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) } ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, + InvalidXLogRecPtr, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -960,7 +961,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) } else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal) { - ReplicationSlotReserveWal(); + ReplicationSlotReserveWal(InvalidXLogRecPtr); ReplicationSlotMarkDirty(); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 00b59fd..be33d7f 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9787,6 +9787,20 @@ proargmodes => '{i,i,i,o,o}', proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}', prosrc => 'pg_create_physical_replication_slot' }, +{ oid => '4008', descr => 'copy a physical replication slot', + proname => 'pg_copy_physical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool', + proallargtypes => '{name,name,bool,name,pg_lsn}', + proargmodes => '{i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}', + prosrc => 'pg_copy_physical_replication_slot' }, +{ oid => '4009', descr => 'copy a physical replication slot', + proname => 'pg_copy_physical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name', + proallargtypes => '{name,name,name,pg_lsn}', + proargmodes => '{i,i,o,o}', + proargnames => '{slot_name,dst_name,slot_name,lsn}', + prosrc => 'pg_copy_physical_replication_slot_no_temp' }, { oid => '3780', descr => 'drop a replication slot', proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', @@ -9807,6 +9821,27 @@ proargmodes => '{i,i,i,o,o}', proargnames => '{slot_name,plugin,temporary,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, +{ oid => '4005', descr => 'copy a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name name bool', + proallargtypes => '{name,name,name,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,plugin,temporary,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot' }, +{ oid => '4006', descr => 'copy a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name name', + proallargtypes => '{name,name,name,name,pg_lsn}', + proargmodes => '{i,i,i,o,o}', + proargnames => '{src_slot_name,plugin,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_no_plugin' }, +{ oid => '4007', descr => 'copy a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name', + proallargtypes => '{name,name,name,pg_lsn}', + proargmodes => '{i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_no_plugin_temp' }, { oid => '3782', descr => 'get changes from replication slot', proname => 'pg_logical_slot_get_changes', procost => '1000', prorows => '1000', provariadic => 'text', proisstrict => 'f', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c25ac1f..afc32ff 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 7964ae2..d5c8953 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -193,7 +193,7 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern bool ReplicationSlotValidateName(const char *name, int elevel); -extern void ReplicationSlotReserveWal(void); +extern void ReplicationSlotReserveWal(XLogRecPtr requested_lsn); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); -- 2.10.5